In [1]:
!apt-get update
!apt-get install -y openjdk-8-jdk-headless

0% [Working]            Get:1 https://cli.github.com/packages stable InRelease [3,917 B]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://cli.github.com/packages stable/main amd64 Packages [344 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [2,102 kB]
Get:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:11 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:12 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:13 https://ppa.launchpadcontent.net

# Запуск Spark сессии и чтение файла

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

spark = (SparkSession.builder
        .appName("pyspark_test")
        .getOrCreate())

In [3]:
PATH = 'couriers_orders.parquet'
PATH_2 = 'purchases.parquet'

In [4]:
df = spark.read.parquet(PATH)
df.show()

+-------------------+----------+--------+--------+-----------+
|               date|courier_id|order_id|distance|travel_time|
+-------------------+----------+--------+--------+-----------+
|2021-07-12 00:00:00|        10|       1|     1.9|      36.17|
|2021-07-02 00:00:00|         3|       2|    3.98|      21.34|
|2021-04-15 00:00:00|         6|       3|    3.98|      43.33|
|2021-07-16 00:00:00|        10|       4|    2.85|      14.01|
|2021-06-11 00:00:00|        10|       5|    4.89|      32.09|
|2021-04-21 00:00:00|         9|       6|    1.06|      18.17|
|2021-07-12 00:00:00|         1|       7|    0.58|      19.22|
|2021-07-31 00:00:00|         5|       8|    3.97|      20.11|
|2021-06-14 00:00:00|         4|       9|    4.13|      29.34|
|2021-06-27 00:00:00|         8|      10|    1.04|      12.56|
|2021-07-26 00:00:00|         1|      11|     1.7|      29.89|
|2021-07-09 00:00:00|         9|      12|    0.58|      35.59|
|2021-07-13 00:00:00|         6|      13|    1.82|     

In [5]:
df.printSchema()

root
 |-- date: timestamp_ntz (nullable = true)
 |-- courier_id: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- travel_time: double (nullable = true)



# Вопрос №1.1:

В конце каждого месяца компания выдает премию для своих курьеров, средняя скорость доставки за прошедший месяц которых больше средней скорости среди всех курьеров. Сколько курьеров получили премию за июнь 2021 года.

In [6]:
df_june_speed = df.where('''year(date) == 2021 AND month(date) == 6''')\
                  .withColumn('speed', F.col('distance') / F.col('travel_time'))

In [7]:
courier_speeds = df_june_speed.groupBy('courier_id').agg(F.avg('speed').alias('avg_speed'))
print('Средняя скорость каждого курьера:')
courier_speeds.show()

Средняя скорость каждого курьера:
+----------+-------------------+
|courier_id|          avg_speed|
+----------+-------------------+
|         7|0.09669425600283378|
|         6|0.11319921886807856|
|         9|0.08287225148692252|
|         5|0.08876160409793776|
|         1|0.11248875682182066|
|        10|0.10967465870150848|
|         3| 0.1037258989146755|
|         8|0.11377286597238084|
|         2|0.10880705251978771|
|         4|0.12820568553917946|
+----------+-------------------+



In [8]:
overall_avg = courier_speeds.agg(F.avg('avg_speed').alias('overall_avg')).collect()[0]['overall_avg']

In [9]:
courier_speeds.where(f'''avg_speed > {overall_avg}''').count()


6

# Вопрос №1.2 (используйте данные из предыдущего вопроса №1.1):

Компания хочет понять, насколько равномерно курьеры работают в течение месяца. Для этого нужно найти ID курьера с наибольшей разницей между максимальной и минимальной средней дневной скоростью в июне 2021 года.

In [10]:
df_day_speed = df.where('''year(date) = 2021 AND month(date) = 6''') \
                 .withColumn('day', F.date_format('date', 'yyyy-MM-dd')) \
                 .withColumn('speed', F.col('distance') / F.col('travel_time')) \
                 .groupBy('courier_id', 'day').agg(F.avg('speed').alias('avg_day_speed'))

In [11]:
day_range = df_day_speed.groupBy('courier_id').agg(
    F.max('avg_day_speed').alias('max_day_speed'),
    F.min('avg_day_speed').alias('min_day_speed'))

In [12]:
speed_dif = day_range.withColumn('speed_dif', F.col('max_day_speed') - F.col('min_day_speed'))

In [13]:
speed_dif.orderBy(F.col('speed_dif').desc()).show(1)

+----------+-------------------+--------------------+------------------+
|courier_id|      max_day_speed|       min_day_speed|         speed_dif|
+----------+-------------------+--------------------+------------------+
|         4|0.42960944595821987|0.016232668244842745|0.4133767777133771|
+----------+-------------------+--------------------+------------------+
only showing top 1 row



# Вопрос №2.1:

Какое количество NaN в столбце days_between_purchases?

У нас есть данные о покупках клиентов purchases.parquet. Проанализируйте интервалы времени между последовательными покупками для каждого клиента в наборе данных о покупках - напишите код для вычисления разницы в днях между текущей покупкой и предыдущей покупкой каждого клиента. Отобразите результат в новом столбце days_between_purchases.

In [14]:
dfp = spark.read.parquet(PATH_2)
dfp.show()

+-----------+-------------------+
|customer_id|      purchase_date|
+-----------+-------------------+
|          2|2021-01-01 00:00:00|
|          7|2021-01-01 00:00:00|
|          7|2021-01-01 00:00:00|
|         11|2021-01-01 00:00:00|
|         21|2021-01-01 00:00:00|
|         22|2021-01-01 00:00:00|
|         27|2021-01-01 00:00:00|
|         30|2021-01-01 00:00:00|
|         36|2021-01-01 00:00:00|
|         45|2021-01-01 00:00:00|
|         50|2021-01-01 00:00:00|
|          3|2021-01-02 00:00:00|
|         15|2021-01-02 00:00:00|
|         18|2021-01-02 00:00:00|
|         24|2021-01-02 00:00:00|
|         34|2021-01-02 00:00:00|
|         35|2021-01-02 00:00:00|
|         40|2021-01-02 00:00:00|
|         41|2021-01-02 00:00:00|
|         12|2021-01-03 00:00:00|
+-----------+-------------------+
only showing top 20 rows



In [15]:
previous_buys = dfp.withColumn('prev_purchase_date', F.lag('purchase_date')
                                                     .over(Window.partitionBy('customer_id').orderBy('purchase_date'))) \
                  .withColumn('days_between_purchases', F.datediff('purchase_date', 'prev_purchase_date'))

In [16]:
previous_buys.where('days_between_purchases IS NULL').count()

50

# Вопрос №2.2 (используйте данные из предыдущего вопроса №2.1):

У какого количества уникальных клиентов разница между текущей покупкой и предыдущей покупкой равна 20-ти дням?

In [17]:
previous_buys.where('''days_between_purchases == 20''')\
             .select('customer_id')\
             .distinct()\
             .count()


10