In [93]:
import pandas as pd
from datetime import datetime as dt
from datetime import timedelta as td

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.types import TimestampType, StringType


# Инициализируем Spark-сессию
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("SparkFirst")\
        .config("spark.executor.memory", "6G")\
        .config("spark.executor.cores", 6)\
        .getOrCreate()

# Задача 1. Для каждого id рассчитайте усредненную длину сессии в рамках суток:

data = [
    (1, dt.fromtimestamp(1562007679)), (1, dt.fromtimestamp(1562007710)),
    (1, dt.fromtimestamp(1562007720)), (1, dt.fromtimestamp(1562007750)),
    (2, dt.fromtimestamp(1564682430)), (2, dt.fromtimestamp(1564682450)),
    (2, dt.fromtimestamp(1564682480))
]

schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("timestamp", TimestampType(), False)
])

w = Window.partitionBy('id', 'date')
w2 = Window.partitionBy('id')

df = spark.createDataFrame(data=data, schema=schema)\
     .withColumn("date", F.to_date(F.col("timestamp")))\
     .withColumn(
        "session_len",
        (F.max(F.col("timestamp").cast("long")).over(w)
         - F.min(F.col("timestamp").cast("long")).over(w))
      )

# Задача 2. Необходимо сформировать витрину данных, в которой для каждой пары
#           товар-локация на уровне каждой технической недели будет рассчитано
#           прогнозируемое значение количества проданных товаров (с учетом
#           среднедневного спроса) и количество остатков товара на складе:

# создадим df с данными о среднедневном спросе
data = [
    (1, '01', 100), (1, '02', 110), (2, '01', 120),
    (2, '02', 90), (3, '01', 70), (3, '02', 80)
]

schema = StructType([
    StructField("product", IntegerType(), False),
    StructField("location", StringType(), False),
    StructField("demand", IntegerType(), False)
])

df_demand = spark.createDataFrame(data=data, schema=schema)

# теперь создадим df с остатками
data = [
    (1, '01', 1000), (1, '02', 400),
    (2, '01', 300), (2, '02', 250)
]

schema = StructType([
    StructField("product", IntegerType(), False),
    StructField("location", StringType(), False),
    StructField("stock", IntegerType(), False)
])

df_stock = spark.createDataFrame(data=data, schema=schema)

# сделаем отдельный df c неделями и днями за июнь 2023 по условию задачи
start_date = dt.strptime('01.06.2023', '%d.%m.%Y')
end_date = dt.strptime('30.06.2023', '%d.%m.%Y')

weeks = []
while start_date <= end_date:
    weeks.append((start_date.isocalendar().week, 1))
    start_date += td(days=1)

df_weeks = spark.createDataFrame(
    data=weeks,
    schema=['week', 'day']
).groupby('week').agg(F.sum('day').alias('days'))

# сделаем декартово произведение и рассчитаем прогноз продаж
df_join = df_demand.crossJoin(df_weeks)\
                   .withColumn(
                    'sales_forecast', F.col('demand') * F.col('days')
                   ).orderBy('week', 'location', 'product')

# теперь итеративно рассчитаем сток от недели к неделе
weeks = list(df_weeks.toPandas()['week'])

# первая неделя отдельно. Присоединим начальный сток
df_result = df_join.join(
        df_stock, [
            df_stock.product == df_demand.product,
            df_stock.location == df_demand.location
        ],
        'left'
    ).select(
        df_join.product, df_join.location, df_stock.stock.alias('stock_start'),
        df_join.demand, df_join.week, df_join.days, df_join.sales_forecast
    ).na.fill(0).filter(F.col('week') == weeks[0]).withColumn(
        'stock_forecast', F.col('stock_start') - F.col('sales_forecast')
    )

# далее остальные
for week in weeks[1:]:
    df_join_iter = df_join.filter(F.col('week') == week)
    df_result_iter = df_result.filter(F.col('week') == week - 1)

    df_result = df_result.union(
        df_join_iter.join(
            df_result_iter, [
                df_join_iter.product == df_result_iter.product,
                df_join_iter.location == df_result_iter.location
            ]
        ).select(
            df_join_iter.product, df_join_iter.location,
            df_result_iter.stock_forecast.alias('stock_start'),
            df_join_iter.demand, df_join_iter.week,
            df_join_iter.days, df_join_iter.sales_forecast
        ).withColumn(
            'stock_forecast', F.col('stock_start') - F.col('sales_forecast')
        )
    )

# финальный вывод, откорректируем расчёты.
df_result.select(
    F.col('product'),
    F.col('location'),
    # стартовый сток не меньше нуля
    F.greatest(F.col('stock_start'), F.lit(0)).alias('stock_start'),
    F.col('demand'),
    F.col('week'),
    F.col('days'),
    # продажи не больше, чем стока в наличии
    (F.greatest(F.col('stock_start'), F.lit(0))
     - F.greatest(F.col('stock_forecast'), F.lit(0))
    ).alias('sales_forecast'),
    # прогнозируемый сток на конец недели не меньше нуля
    F.greatest(F.col('stock_forecast'), F.lit(0)).alias('stock_forecast')
).orderBy('product', 'location', 'week').show(50)

spark.stop()

                                                                                

+-------+--------+-----------+------+----+----+--------------+--------------+
|product|location|stock_start|demand|week|days|sales_forecast|stock_forecast|
+-------+--------+-----------+------+----+----+--------------+--------------+
|      1|      01|       1000|   100|  22|   4|           400|           600|
|      1|      01|        600|   100|  23|   7|           600|             0|
|      1|      01|          0|   100|  24|   7|             0|             0|
|      1|      01|          0|   100|  25|   7|             0|             0|
|      1|      01|          0|   100|  26|   5|             0|             0|
|      1|      02|        400|   110|  22|   4|           400|             0|
|      1|      02|          0|   110|  23|   7|             0|             0|
|      1|      02|          0|   110|  24|   7|             0|             0|
|      1|      02|          0|   110|  25|   7|             0|             0|
|      1|      02|          0|   110|  26|   5|             0|  