In [35]:
import pandas as pd
import numpy as np

In [36]:
# Установка PySpark в Google Colab
!pip install pyspark

# Импорт необходимых библиотек
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import datetime as dt



In [37]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Создаем сессию Spark
spark = SparkSession.builder.appName("Forecast").getOrCreate()

# Создаем DataFrame для информации о среднедневном спросе
demand_data = [(1, 1, 1, 100),
               (1, 2, 2, 110),
               (2, 1, 3, 120),
               (2, 2, 3, 90),
               (3, 1, 1, 70),
               (3, 2, 2, 80)]

demand_columns = ["product", "location", "demand_id", "demand"]
demand_df = spark.createDataFrame(demand_data, demand_columns)

# Создаем DataFrame для информации о складских запасах
stock_data = [(1, 1, 1000),
              (1, 2, 400),
              (2, 2, 250)]

stock_columns = ["product", "location", "stock"]
stock_df = spark.createDataFrame(stock_data, stock_columns)

# Создаем DataFrame с информацией о технических неделях
technical_weeks = [("2023-08-01", "2023-08-06"),
                   ("2023-08-07", "2023-08-13"),
                   ("2023-08-14", "2023-08-20"),
                   ("2023-08-21", "2023-08-27"),
                   ("2023-08-28", "2023-08-31")]

weeks_columns = ["start_date", "end_date"]
weeks_df = spark.createDataFrame(technical_weeks, weeks_columns)

# Объединяем данные
joined_data = demand_df.join(stock_df, ["product", "location"])

# Присоединяем данные о технических неделях
joined_data_with_weeks = joined_data.crossJoin(weeks_df)

# Преобразуем даты в формат timestamp
date_format = "yyyy-MM-dd"
joined_data_with_weeks = joined_data_with_weeks \
    .withColumn("start_date", F.to_date(F.col("start_date"), date_format)) \
    .withColumn("end_date", F.to_date(F.col("end_date"), date_format))

# Определяем окно по продукту, локации, технической неделе
window_spec = Window.partitionBy("product", "location", "start_date", "end_date").orderBy("start_date")

# Рассчитываем прогнозируемое количество проданных товаров и количество остатков на складе
forecast_result = joined_data_with_weeks \
    .withColumn("forecast_demand", F.sum("demand").over(window_spec)) \
    .withColumn("forecast_stock", F.last("stock").over(window_spec))

# Выводим результат
forecast_result.select("product", "location", "start_date", "end_date", "forecast_demand", "forecast_stock").show()

+-------+--------+----------+----------+---------------+--------------+
|product|location|start_date|  end_date|forecast_demand|forecast_stock|
+-------+--------+----------+----------+---------------+--------------+
|      1|       1|2023-08-01|2023-08-06|            100|          1000|
|      1|       1|2023-08-07|2023-08-13|            100|          1000|
|      1|       1|2023-08-14|2023-08-20|            100|          1000|
|      1|       1|2023-08-21|2023-08-27|            100|          1000|
|      1|       1|2023-08-28|2023-08-31|            100|          1000|
|      1|       2|2023-08-01|2023-08-06|            110|           400|
|      1|       2|2023-08-07|2023-08-13|            110|           400|
|      1|       2|2023-08-14|2023-08-20|            110|           400|
|      1|       2|2023-08-21|2023-08-27|            110|           400|
|      1|       2|2023-08-28|2023-08-31|            110|           400|
|      2|       2|2023-08-01|2023-08-06|             90|        