In [None]:
pip install pyspark



In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("PySpark_Tutorial")\
        .getOrCreate()

orders = spark.read.parquet('/content/couriers_orders.parquet')
purchases = spark.read.parquet('/content/purchases.parquet')

orders.registerTempTable("orders")
purchases.registerTempTable("purchases")



# Задача 1.1

In [None]:
## Считаем общее количество строк в подзапросе:
### 1. Берем строки со значением месяца 6 в столбце date
### 2. Группируем по courier_id
### 3. По courier_id за указанный период выбираем те id, у которых среднее скорости (дистанция делить на время) выше значения из подзапроса:
####      - среднее значение скорости (дистанция/время) по всем заказам июня

print(spark.sql(
   """SELECT COUNT(*) AS prize_numbers
      FROM (SELECT courier_id
            FROM orders
            WHERE EXTRACT(month FROM date)=6
            GROUP BY courier_id
            HAVING MEAN(distance/travel_time)>( SELECT MEAN(distance/travel_time)  FROM orders  WHERE EXTRACT(month FROM date)=6))
      """
    ).show())

+-------------+
|prize_numbers|
+-------------+
|            6|
+-------------+

None


# Задача 1.2

In [None]:
## 1. Подзапрос во FROM:
###  --- Берем данные по июню, группируем по айди курьера и дате, выводим айди курьера и среднюю скорость в день
## 2. Далее работаем с этим подзапросом:
###  --- Группируем по курьер_айди, упорядочиваем от большего к меньшему разницы между макс и мин средней дневной скоростью,
###  --- выводим айди курьера из верхней строки

print(spark.sql(
    """
    SELECT courier_id AS courier_id_with_biggest_difference
    FROM (SELECT courier_id, MEAN(distance/travel_time) AS mean_speed_day
          FROM orders
          WHERE EXTRACT(month FROM date)=6
          GROUP BY courier_id, date)
    GROUP BY courier_id
    ORDER BY MAX(mean_speed_day) - MIN(mean_speed_day) DESC
    LIMIT 1
    """
    ).show())

+----------------------------------+
|courier_id_with_biggest_difference|
+----------------------------------+
|                                 4|
+----------------------------------+

None


# Задача 2.1

In [None]:
## 1. Подзапрос во FROM:
###  --- Оконная функция: для каждой строки вычитаем из даты дату предыдущего заказа для соответствующего покупателя, добавляем как новый столбец
###  --- С помощью EXTRACT выводим из получившегося результата одно число - количество дней
## 2. Считаем общее количество строк, для которых верно условие, что значение в новом столбце - NULL

print(spark.sql(
    """
    SELECT COUNT(*)
    FROM (
      SELECT customer_id,
             purchase_date,
             EXTRACT(day FROM (purchase_date - LAG(purchase_date) OVER (PARTITION BY customer_id ORDER BY purchase_date))) AS days_between_purchases
      FROM purchases)
    WHERE days_between_purchases IS NULL
    """
    ).show())

+--------+
|count(1)|
+--------+
|      50|
+--------+

None


# Задача 2.2

In [None]:
## 1. Подзапрос во FROM:
###  --- Та же таблица с добавлением количества дней
###  --- WHERE - только те строки, где days_between_purchases=20, а дата заказа совпадает с самой большой датой в датасете (считаем "текущими" покупки от этой даты)

print(spark.sql(
    """
    SELECT COUNT(DISTINCT(customer_id))
    FROM (SELECT customer_id,
          purchase_date,
          EXTRACT(day FROM (purchase_date - LAG(purchase_date) OVER (PARTITION BY customer_id ORDER BY purchase_date))) AS days_between_purchases
          FROM purchases)
    WHERE days_between_purchases=20 AND purchase_date=(SELECT max(purchase_date) FROM  purchases)
    """
    ).show())

+---------------------------+
|count(DISTINCT customer_id)|
+---------------------------+
|                          0|
+---------------------------+

None
