## Решение инструментом PySpark

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import DateType, IntegerType, StringType

spark = SparkSession.builder.appName("WindowFunctionExample").getOrCreate()

In [2]:
df = spark.read.parquet('df/couriers_orders.parquet', header=True, inferSchema=True)

                                                                                

#### Вопрос №1.1:

В конце каждого месяца компания выдает премию для своих курьеров, средняя скорость доставки за прошедший месяц которых больше средней скорости среди всех курьеров. С колько курьеров получили премию за июнь 2021 года.


In [3]:
df.show(5)

                                                                                

+-------------------+----------+--------+--------+-----------+
|               date|courier_id|order_id|distance|travel_time|
+-------------------+----------+--------+--------+-----------+
|2021-07-12 00:00:00|        10|       1|     1.9|      36.17|
|2021-07-02 00:00:00|         3|       2|    3.98|      21.34|
|2021-04-15 00:00:00|         6|       3|    3.98|      43.33|
|2021-07-16 00:00:00|        10|       4|    2.85|      14.01|
|2021-06-11 00:00:00|        10|       5|    4.89|      32.09|
+-------------------+----------+--------+--------+-----------+
only showing top 5 rows



In [4]:
df.dtypes

[('date', 'timestamp_ntz'),
 ('courier_id', 'bigint'),
 ('order_id', 'bigint'),
 ('distance', 'double'),
 ('travel_time', 'double')]

In [5]:
# Преобразование типа дынных в столбце 'date'
df = df.withColumn("date", F.col("date").cast(DateType()))

In [6]:
# Сохраним выборку за июнь 2021 года
df_2021_06 = df.filter((df["date"] >= "2021-06-01") & (df["date"] <= "2021-06-30"))
df_2021_06.show(10)

+----------+----------+--------+--------+-----------+
|      date|courier_id|order_id|distance|travel_time|
+----------+----------+--------+--------+-----------+
|2021-06-11|        10|       5|    4.89|      32.09|
|2021-06-14|         4|       9|    4.13|      29.34|
|2021-06-27|         8|      10|    1.04|      12.56|
|2021-06-27|         1|      19|    1.85|      13.56|
|2021-06-28|         2|      25|    4.02|      12.43|
|2021-06-03|         7|      32|    1.61|       57.3|
|2021-06-12|         5|      37|    4.81|      16.35|
|2021-06-28|         3|      40|    4.96|      22.29|
|2021-06-29|         2|      44|    2.15|      57.45|
|2021-06-23|         9|      47|    1.13|      37.74|
+----------+----------+--------+--------+-----------+
only showing top 10 rows



In [7]:
# Вычисление среднего значения столбца 'value'
average_value = df_2021_06.select(F.avg('travel_time')).collect()[0][0]
average_value

33.53234482758622

In [8]:
# Выберем курьеров со скоростью доставки выше среднего
couriers_bonus = df_2021_06.filter(df_2021_06['travel_time'] > average_value)
couriers_bonus.show(10)

+----------+----------+--------+--------+-----------+
|      date|courier_id|order_id|distance|travel_time|
+----------+----------+--------+--------+-----------+
|2021-06-03|         7|      32|    1.61|       57.3|
|2021-06-29|         2|      44|    2.15|      57.45|
|2021-06-23|         9|      47|    1.13|      37.74|
|2021-06-21|         6|      69|    4.81|      37.35|
|2021-06-28|         1|      78|    2.96|      53.72|
|2021-06-10|        10|      79|    2.45|      51.53|
|2021-06-11|         2|      94|     3.7|       44.6|
|2021-06-01|         9|      99|    3.17|      35.98|
|2021-06-29|         7|     113|    3.73|      34.19|
|2021-06-02|         8|     116|     3.5|      39.69|
+----------+----------+--------+--------+-----------+
only showing top 10 rows



In [9]:
couriers_bonus.select('courier_id').distinct().count()

10

#### Вопрос №1.2 (используйте данные из предыдущего вопроса №1.1):

Компания хочет понять, насколько равномерно курьеры работают в течение месяца. Для этого нужно найти ID курьера с наибольшей разницей между максимальной и минимальной средней дневной скоростью в июне 2021 года.

In [10]:
# Сохраним выборку за июнь 2021 года
df_2021_06 = df.filter((df["date"] >= "2021-06-01") & (df["date"] <= "2021-06-30"))
df_2021_06.show(10)

+----------+----------+--------+--------+-----------+
|      date|courier_id|order_id|distance|travel_time|
+----------+----------+--------+--------+-----------+
|2021-06-11|        10|       5|    4.89|      32.09|
|2021-06-14|         4|       9|    4.13|      29.34|
|2021-06-27|         8|      10|    1.04|      12.56|
|2021-06-27|         1|      19|    1.85|      13.56|
|2021-06-28|         2|      25|    4.02|      12.43|
|2021-06-03|         7|      32|    1.61|       57.3|
|2021-06-12|         5|      37|    4.81|      16.35|
|2021-06-28|         3|      40|    4.96|      22.29|
|2021-06-29|         2|      44|    2.15|      57.45|
|2021-06-23|         9|      47|    1.13|      37.74|
+----------+----------+--------+--------+-----------+
only showing top 10 rows



In [11]:
# рассчитаем общий пройденный путь и время в движении за каждый рабочий день
result_sum_dis_trav = df_2021_06.groupby('date', 'courier_id').agg(F.sum('distance').alias('sum_distance'), F.sum('travel_time').alias('sum_travel_time'))
result_sum_dis_trav.show(10)

+----------+----------+-----------------+------------------+
|      date|courier_id|     sum_distance|   sum_travel_time|
+----------+----------+-----------------+------------------+
|2021-06-28|         8|             1.48|             22.18|
|2021-06-29|         2|             2.15|             57.45|
|2021-06-21|         6|             6.58|             49.81|
|2021-06-09|         7|             3.87|             56.79|
|2021-06-08|         1|             3.56|             32.21|
|2021-06-04|         8|             2.19|             34.13|
|2021-06-14|         7|5.859999999999999|34.760000000000005|
|2021-06-26|         7|             4.54| 52.57000000000001|
|2021-06-30|         1|             1.59|             40.49|
|2021-06-11|         1|             4.38|             10.37|
+----------+----------+-----------------+------------------+
only showing top 10 rows



In [12]:
# рассчитаем среднюю дневную скорость для каждого дня и курьера (также преобразуем минуты в часы)
result_sum_dis_trav = result_sum_dis_trav.withColumn('mean_speed', F.col('sum_distance') / (F.col('sum_travel_time') / 60))
result_sum_dis_trav.show(10)

+----------+----------+-----------------+------------------+------------------+
|      date|courier_id|     sum_distance|   sum_travel_time|        mean_speed|
+----------+----------+-----------------+------------------+------------------+
|2021-06-28|         8|             1.48|             22.18|  4.00360685302074|
|2021-06-29|         2|             2.15|             57.45|2.2454308093994775|
|2021-06-21|         6|             6.58|             49.81| 7.926119253162015|
|2021-06-09|         7|             3.87|             56.79| 4.088748019017433|
|2021-06-08|         1|             3.56|             32.21|6.6314809065507605|
|2021-06-04|         8|             2.19|             34.13| 3.849985350131848|
|2021-06-14|         7|5.859999999999999|34.760000000000005|10.115074798619101|
|2021-06-26|         7|             4.54| 52.57000000000001| 5.181662545177858|
|2021-06-30|         1|             1.59|             40.49| 2.356137317856261|
|2021-06-11|         1|             4.38

In [13]:
# сгрупируем по курьерам. Расчитаем максимальную и минимальную среднюю дневную скорость для каждого курьера
result = result_sum_dis_trav.groupby('courier_id').agg(F.max('mean_speed').alias('max_mean_speed'), F.min('mean_speed').alias('min_mean_speed'))
result.show(10)

+----------+------------------+------------------+
|courier_id|    max_mean_speed|    min_mean_speed|
+----------+------------------+------------------+
|         7|10.115074798619101|1.0535557506584723|
|         6| 25.49718574108818| 1.403067638923812|
|         9|13.315508021390373|0.7547857793983591|
|         5|17.651376146788987|1.1114560695436202|
|         1|25.342333654773388|1.6655313351498635|
|        10| 18.60500379075057|1.7512012813667912|
|         3|13.351278600269179| 1.253071253071253|
|         8|14.946159368269923|0.7925696594427245|
|         2|19.404666130329844|1.5167888846005404|
|         4|25.776566757493192|0.9739600946905647|
+----------+------------------+------------------+



In [14]:
# вычислите разницу для каждого курьера
result = result.withColumn('delta', F.col('max_mean_speed') - F.col('min_mean_speed'))
result.show(10)

+----------+------------------+------------------+------------------+
|courier_id|    max_mean_speed|    min_mean_speed|             delta|
+----------+------------------+------------------+------------------+
|         7|10.115074798619101|1.0535557506584723|  9.06151904796063|
|         6| 25.49718574108818| 1.403067638923812| 24.09411810216437|
|         9|13.315508021390373|0.7547857793983591|12.560722241992014|
|         5|17.651376146788987|1.1114560695436202|16.539920077245366|
|         1|25.342333654773388|1.6655313351498635|23.676802319623523|
|        10| 18.60500379075057|1.7512012813667912|16.853802509383776|
|         3|13.351278600269179| 1.253071253071253|12.098207347197926|
|         8|14.946159368269923|0.7925696594427245|14.153589708827198|
|         2|19.404666130329844|1.5167888846005404|17.887877245729303|
|         4|25.776566757493192|0.9739600946905647|24.802606662802628|
+----------+------------------+------------------+------------------+



In [15]:
result.sort(F.desc('delta')).limit(1).show()

+----------+------------------+------------------+------------------+
|courier_id|    max_mean_speed|    min_mean_speed|             delta|
+----------+------------------+------------------+------------------+
|         4|25.776566757493192|0.9739600946905647|24.802606662802628|
+----------+------------------+------------------+------------------+



#### Вопрос №2.1:
У нас есть данные о покупках клиентов purchases.parquet. Проанализируйте интервалы времени между последовательными покупками для каждого клиента в наборе данных о покупках - напишите код для вычисления разницы в днях между текущей покупкой и предыдущей покупкой каждого клиента. Отобразите результат в новом столбце days_between_purchases.

In [16]:
df_q2 = spark.read.parquet('df/purchases.parquet', header=True, inferSchema=True)
df_q2.show(10)

+-----------+-------------------+
|customer_id|      purchase_date|
+-----------+-------------------+
|          2|2021-01-01 00:00:00|
|          7|2021-01-01 00:00:00|
|          7|2021-01-01 00:00:00|
|         11|2021-01-01 00:00:00|
|         21|2021-01-01 00:00:00|
|         22|2021-01-01 00:00:00|
|         27|2021-01-01 00:00:00|
|         30|2021-01-01 00:00:00|
|         36|2021-01-01 00:00:00|
|         45|2021-01-01 00:00:00|
+-----------+-------------------+
only showing top 10 rows



In [17]:
# Преобразование типа дынных в столбце 'date'
df_q2 = df_q2.withColumn("purchase_date", F.col("purchase_date").cast(DateType()))

In [18]:
# отсортируем данные по клиенту и датам покупок
df_q2.orderBy("customer_id","purchase_date").show(10)

+-----------+-------------+
|customer_id|purchase_date|
+-----------+-------------+
|          1|   2021-01-14|
|          1|   2021-01-18|
|          1|   2021-01-28|
|          1|   2021-02-05|
|          1|   2021-02-06|
|          1|   2021-02-07|
|          1|   2021-02-11|
|          1|   2021-02-15|
|          1|   2021-02-15|
|          1|   2021-02-17|
+-----------+-------------+
only showing top 10 rows



In [19]:
# посчитаем разницу между соседними покупками
window_spec = Window.partitionBy('customer_id').orderBy('purchase_date')
days_between_purchases = df_q2.withColumn("prev_date", F.lag("purchase_date").over(window_spec))
days_between_purchases = days_between_purchases.withColumn("days_since_last", F.date_diff('purchase_date', 'prev_date'))
days_between_purchases.show(10)

+-----------+-------------+----------+---------------+
|customer_id|purchase_date| prev_date|days_since_last|
+-----------+-------------+----------+---------------+
|          1|   2021-01-14|      NULL|           NULL|
|          1|   2021-01-18|2021-01-14|              4|
|          1|   2021-01-28|2021-01-18|             10|
|          1|   2021-02-05|2021-01-28|              8|
|          1|   2021-02-06|2021-02-05|              1|
|          1|   2021-02-07|2021-02-06|              1|
|          1|   2021-02-11|2021-02-07|              4|
|          1|   2021-02-15|2021-02-11|              4|
|          1|   2021-02-15|2021-02-15|              0|
|          1|   2021-02-17|2021-02-15|              2|
+-----------+-------------+----------+---------------+
only showing top 10 rows



In [20]:
# посчитаем пустые значения NULL
days_between_purchases.filter(F.col("days_since_last").isNull()).count()

50

#### Вопрос №2.2 (используйте данные из предыдущего вопроса №2.1):

У какого количества уникальных клиентов разница между текущей покупкой и предыдущей покупкой равна 20-ти дням?


In [21]:
# выберем покупателей разница между текущей покупкой и предыдущей покупкой равна 20-ти дням
result = days_between_purchases.filter(F.col("days_since_last") == 20)

In [22]:
# выберем уникальные значением и посчтиаем их количество
result.select('customer_id').distinct().count()

10