### У какого количества уникальных клиентов разница между текущей покупкой и предыдущей покупкой равна 20-ти дням?

In [48]:
import pyspark
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, lag, datediff

In [49]:
spark = SparkSession.builder \
    .appName("ClientsCountByPySpark") \
    .master("local[*]") \
    .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

Spark version: 4.0.1


#### Step 1. Loading data

In [50]:
try:
    df = spark.read.parquet("data/couriers_orders.parquet")
    df.show(7)
except FileNotFoundError as e:
    print(f"{e}")
    spark.stop()


+-------------------+----------+--------+--------+-----------+
|               date|courier_id|order_id|distance|travel_time|
+-------------------+----------+--------+--------+-----------+
|2021-07-12 00:00:00|        10|       1|     1.9|      36.17|
|2021-07-02 00:00:00|         3|       2|    3.98|      21.34|
|2021-04-15 00:00:00|         6|       3|    3.98|      43.33|
|2021-07-16 00:00:00|        10|       4|    2.85|      14.01|
|2021-06-11 00:00:00|        10|       5|    4.89|      32.09|
|2021-04-21 00:00:00|         9|       6|    1.06|      18.17|
|2021-07-12 00:00:00|         1|       7|    0.58|      19.22|
+-------------------+----------+--------+--------+-----------+
only showing top 7 rows


#### Step 2. Window func

In [51]:
windowSpec = Window.partitionBy("courier_id").orderBy("date")

#### Step 3. Searching prev date

In [52]:
prev_date_df = df.withColumn(
    "prev_order_date",
    lag("date", 1).over(windowSpec)
)

prev_date_df.show(7)

+-------------------+----------+--------+--------+-----------+-------------------+
|               date|courier_id|order_id|distance|travel_time|    prev_order_date|
+-------------------+----------+--------+--------+-----------+-------------------+
|2021-04-03 00:00:00|         1|    1331|     1.2|      39.68|               NULL|
|2021-04-04 00:00:00|         1|    1303|    1.23|      49.07|2021-04-03 00:00:00|
|2021-04-05 00:00:00|         1|     347|    2.32|      42.44|2021-04-04 00:00:00|
|2021-04-06 00:00:00|         1|     278|    2.23|      57.29|2021-04-05 00:00:00|
|2021-04-08 00:00:00|         1|    1638|    2.21|      42.41|2021-04-06 00:00:00|
|2021-04-09 00:00:00|         1|    1502|    2.49|       45.9|2021-04-08 00:00:00|
|2021-04-11 00:00:00|         1|     944|    1.57|      33.85|2021-04-09 00:00:00|
+-------------------+----------+--------+--------+-----------+-------------------+
only showing top 7 rows


#### Step 4. Days diff calc

In [53]:
days_diff_df = prev_date_df.withColumn(
    "days_between_purchases",
    datediff(col("date"), col("prev_order_date"))
)

days_diff_df.show(7)

+-------------------+----------+--------+--------+-----------+-------------------+----------------------+
|               date|courier_id|order_id|distance|travel_time|    prev_order_date|days_between_purchases|
+-------------------+----------+--------+--------+-----------+-------------------+----------------------+
|2021-04-03 00:00:00|         1|    1331|     1.2|      39.68|               NULL|                  NULL|
|2021-04-04 00:00:00|         1|    1303|    1.23|      49.07|2021-04-03 00:00:00|                     1|
|2021-04-05 00:00:00|         1|     347|    2.32|      42.44|2021-04-04 00:00:00|                     1|
|2021-04-06 00:00:00|         1|     278|    2.23|      57.29|2021-04-05 00:00:00|                     1|
|2021-04-08 00:00:00|         1|    1638|    2.21|      42.41|2021-04-06 00:00:00|                     2|
|2021-04-09 00:00:00|         1|    1502|    2.49|       45.9|2021-04-08 00:00:00|                     1|
|2021-04-11 00:00:00|         1|     944|    1

#### Step 5. 20-days filtration

In [54]:
twenty_days_gaps = days_diff_df.filter(
    col("days_between_purchases") == 20
)

twenty_days_gaps.show()

+----+----------+--------+--------+-----------+---------------+----------------------+
|date|courier_id|order_id|distance|travel_time|prev_order_date|days_between_purchases|
+----+----------+--------+--------+-----------+---------------+----------------------+
+----+----------+--------+--------+-----------+---------------+----------------------+



#### Step 6. Courier count

In [55]:
unique_couriers = twenty_days_gaps.select(col("courier_id")).distinct().count()

print("=" * 25)
print(f"    {unique_couriers} unique couriers")
print("=" * 25)

spark.stop()

    0 unique couriers
