За последние несколько лет онлайн-покупки быстро развиваются, делая нашу жизнь проще. Но за кулисами компании e-commerce сталкиваются со сложной проблемой.

Неопределенность играет большую роль в том, как цепочки поставок планируют и организуют свои операции, чтобы гарантировать своевременную доставку продукции. Эта неопределенность может привести к таким проблемам, как дефицит, задержки поставок и увеличение эксплуатационных расходов.

Компания e-commerce требовалась помощь в планировании предстоящих продаж на конец года. Они хотели спланировать рекламные возможности и управлять своими запасами. Целью этих усилий является обеспечение наличия на складе нужных продуктов, когда это необходимо, и обеспечение того, чтобы их клиенты были удовлетворены быстрой доставкой.


## Данные:

Краткое изложение датасета о продажах:

# Online Retail.csv

| Столбец     | Описание              |
|------------|--------------------------|
| `'InvoiceNo'` | 6-значный уникальный номер для каждой транзакции |
| `'StockCode'` | Уникальный пятизначный номер для каждого отдельного продукта |
| `'Description'` | Название продукта |
| `'Quantity'` | Количество каждого товара за транзакцию |
| `'UnitPrice'` | Цена продукта за единицу |
| `'CustomerID'` | 5-значный уникальный номер для каждого клиента |
| `'Country'` | Название страны, в которой проживает каждый клиент |
| `'InvoiceDate'` | День и время создания каждой транзакции `"ММ/ДД/ГГГГ"` |
| `'Year'` | Год, когда была создана каждая транзакция |
| `'Month'` | Месяц, в котором была создана каждая транзакция |
| `'Week'` | Неделя, когда была создана каждая транзакция (`1`-`52`) |
| `'Day'` | День месяца, когда была создана каждая транзакция (`1`-`31`) |
| `'DayOfWeek'` | День недели, когда была сгенерирована каждая транзакция <br>(`0` = Понедельник, `6` = Воскресенье) |

In [None]:
# Импорт необходимых библиотек
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Инициализация сеанса Spark
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Импорт данных о продажах
sales_data = my_spark.read.csv("Online Retail.csv", header=True, inferSchema=True, sep=",")

# Преобразование 'InvoiceDate' в формат временного ряда
sales_data = sales_data.withColumn("InvoiceDate", to_date(to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

# Агрегация данных по суточным интервалам
daily_sales_data = sales_data.groupBy("Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", "DayOfWeek").agg({"Quantity": "sum", "UnitPrice": "avg"})

# Переименование целевого столбца
daily_sales_data = daily_sales_data.withColumnRenamed("sum(Quantity)", "Quantity")

# Разделение данных на два набора на основе даты разделения "2011-09-25" (<= для обучения, > для тестирования).
# Возврат Dataframe, 'pd_daily_train_data', содержащего столбцы ["Country", "StockCode", "InvoiceDate", "Quantity"].
split_date_train_test = "2011-09-25"

# Создание обучающих и тестовых наборов
train_data = daily_sales_data.filter(col("InvoiceDate") <= split_date_train_test)
test_data = daily_sales_data.filter(col("InvoiceDate") > split_date_train_test)

pd_daily_train_data = train_data.toPandas()

# Создание индексатора для категориальных столбцов
country_indexer = StringIndexer(inputCol="Country", outputCol="CountryIndex").setHandleInvalid("keep")
stock_code_indexer = StringIndexer(inputCol="StockCode", outputCol="StockCodeIndex").setHandleInvalid("keep")

# Выбор столбцов функций
feature_cols = ["CountryIndex", "StockCodeIndex", "Month", "Year", "DayOfWeek", "Day", "Week"]

# Использование векторного ассемблера для объединения функций
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Инициализация модели Random Forest
rf = RandomForestRegressor(featuresCol="features", labelCol="Quantity", maxBins=4000)

# Создание пайплайн для постановки процессов
pipeline = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, rf])

# Обучение модели
model = pipeline.fit(train_data)

# Получение тестовых прогнозов и назначение их новому столбцу
test_predictions = model.transform(test_data)
test_predictions = test_predictions.withColumn("prediction", col("prediction").cast("double"))

# Инициализация оценщика регрессии
mae_evaluator = RegressionEvaluator(labelCol="Quantity", predictionCol="prediction", metricName="mae")

# Получение средней абсолютной ошибки (MAE)
mae = mae_evaluator.evaluate(test_predictions)

# Вопрос: Сколько единиц будет продано на 39-й неделе 2011 года? Возврат целого числа 'quantity_sold_w39'
# Получение еженедельных продаж всех стран
weekly_test_predictions = test_predictions.groupBy("Year", "Week").agg({"prediction": "sum"})

# Нахождение количества проданного товара на 39 неделе 
promotion_week = weekly_test_predictions.filter(col('Week')==39)

# Сохранение прогноза как 'quantity_sold_w30'
quantity_sold_w39 = int(promotion_week.select("sum(prediction)").collect()[0][0])

# Остановка сеанса Spark
my_spark.stop()