<a href='https://www.kaggle.com/c/m5-forecasting-accuracy'>Ссылка</a> на kaggle

### Импортируем библиотеки

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import numpy as np
import pandas as pd

### Считываем данные

Что содержится в наших таблицах:

* **calendar.csv** - Содержит информацию о датах продажи товаров.
* **sales_train_validation.csv** - Содержит исторические данные о продажах за день для каждого продукта и магазина [d_1 - d_1913].
* **sales_train_evaluation.csv** - Включает продажи [d_1 - d_1941] (ярлыки, используемые для публичной таблицы лидеров).
* **sell_prices.csv** - Содержит информацию о цене проданных товаров в каждом магазине и дате.
* **sample_submission.csv** - Правильный формат для подачи. См. Вкладку «Оценка» для получения дополнительной информации.

In [0]:
# File location and type
file_calendar = "/FileStore/tables/calendar.csv"
file_validation = "/FileStore/tables/sales_train_validation.csv"
file_evaluation = "/FileStore/tables/sales_train_evaluation.csv"
file_prices = "/FileStore/tables/sell_prices.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_calendar = spark.read.format(file_type) \
                        .option("inferSchema", infer_schema) \
                        .option("header", first_row_is_header) \
                        .option("sep", delimiter) \
                        .load(file_calendar)

df_validation = spark.read.format(file_type) \
                          .option("inferSchema", infer_schema) \
                          .option("header", first_row_is_header) \
                          .option("sep", delimiter) \
                          .load(file_validation)

df_evaluation = spark.read.format(file_type) \
                          .option("inferSchema", infer_schema) \
                          .option("header", first_row_is_header) \
                          .option("sep", delimiter) \
                          .load(file_evaluation)

df_prices = spark.read.format(file_type) \
                      .option("inferSchema", infer_schema) \
                      .option("header", first_row_is_header) \
                      .option("sep", delimiter) \
                      .load(file_prices)

**calendar.csv**

In [0]:
df_calendar.limit(5).toPandas()

Unnamed: 0,date,wm_yr_wk,weekday,wday,month,year,d,event_name_1,event_type_1,event_name_2,event_type_2,snap_CA,snap_TX,snap_WI
0,2011-01-29,11101,Saturday,1,1,2011,d_1,,,,,0,0,0
1,2011-01-30,11101,Sunday,2,1,2011,d_2,,,,,0,0,0
2,2011-01-31,11101,Monday,3,1,2011,d_3,,,,,0,0,0
3,2011-02-01,11101,Tuesday,4,2,2011,d_4,,,,,1,1,0
4,2011-02-02,11101,Wednesday,5,2,2011,d_5,,,,,1,0,1


**sales_train_validation.csv**

In [0]:
df_validation.limit(5).toPandas()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,d_1,d_2,d_3,d_4,d_5,d_6,d_7,d_8,d_9,d_10,d_11,d_12,d_13,d_14,d_15,d_16,d_17,d_18,d_19,d_20,d_21,d_22,d_23,d_24,d_25,d_26,d_27,d_28,d_29,d_30,d_31,d_32,d_33,d_34,...,d_1874,d_1875,d_1876,d_1877,d_1878,d_1879,d_1880,d_1881,d_1882,d_1883,d_1884,d_1885,d_1886,d_1887,d_1888,d_1889,d_1890,d_1891,d_1892,d_1893,d_1894,d_1895,d_1896,d_1897,d_1898,d_1899,d_1900,d_1901,d_1902,d_1903,d_1904,d_1905,d_1906,d_1907,d_1908,d_1909,d_1910,d_1911,d_1912,d_1913
0,HOBBIES_1_001_CA_1_validation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,1,1,3,1,3,1,2,2,0,1,1,1,1,0,0,0,0,0,1,0,4,2,3,0,1,2,0,0,0,1,1,3,0,1,1,1,3,0,1,1
1,HOBBIES_1_002_CA_1_validation,HOBBIES_1_002,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0
2,HOBBIES_1_003_CA_1_validation,HOBBIES_1_003,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,0,0,1,0,0,0,0,0,0,1,1,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,1,2,2,1,2,1,1,1,0,1,1,1
3,HOBBIES_1_004_CA_1_validation,HOBBIES_1_004,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,1,3,4,2,1,4,1,3,5,0,6,6,0,0,0,0,3,1,2,1,3,1,0,2,5,4,2,0,3,0,1,0,5,4,1,0,1,3,7,2
4,HOBBIES_1_005_CA_1_validation,HOBBIES_1_005,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,1,0,3,2,2,2,3,1,0,0,0,0,1,0,4,4,0,1,4,0,1,0,1,0,1,1,2,0,1,1,2,1,1,0,1,1,2,2,2,4


**sales_train_evaluation**

In [0]:
df_evaluation.limit(5).toPandas()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,d_1,d_2,d_3,d_4,d_5,d_6,d_7,d_8,d_9,d_10,d_11,d_12,d_13,d_14,d_15,d_16,d_17,d_18,d_19,d_20,d_21,d_22,d_23,d_24,d_25,d_26,d_27,d_28,d_29,d_30,d_31,d_32,d_33,d_34,...,d_1902,d_1903,d_1904,d_1905,d_1906,d_1907,d_1908,d_1909,d_1910,d_1911,d_1912,d_1913,d_1914,d_1915,d_1916,d_1917,d_1918,d_1919,d_1920,d_1921,d_1922,d_1923,d_1924,d_1925,d_1926,d_1927,d_1928,d_1929,d_1930,d_1931,d_1932,d_1933,d_1934,d_1935,d_1936,d_1937,d_1938,d_1939,d_1940,d_1941
0,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,0,1,1,3,0,1,1,1,3,0,1,1,0,0,0,2,0,3,5,0,0,1,1,0,2,1,2,2,1,0,2,4,0,0,0,0,3,3,0,1
1,HOBBIES_1_002_CA_1_evaluation,HOBBIES_1_002,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,2,1,1,0,0,0,0,0
2,HOBBIES_1_003_CA_1_evaluation,HOBBIES_1_003,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,1,2,2,1,2,1,1,1,0,1,1,1,0,0,1,1,0,2,1,0,0,0,0,2,1,3,0,0,1,0,1,0,2,0,0,0,2,3,0,1
3,HOBBIES_1_004_CA_1_evaluation,HOBBIES_1_004,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,3,0,1,0,5,4,1,0,1,3,7,2,0,0,1,2,4,1,6,4,0,0,0,2,2,4,2,1,1,1,1,1,0,4,0,1,3,0,2,6
4,HOBBIES_1_005_CA_1_evaluation,HOBBIES_1_005,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,1,1,2,1,1,0,1,1,2,2,2,4,1,0,2,3,1,0,3,2,3,1,1,3,2,3,2,2,2,2,0,0,0,2,1,0,0,2,1,0


Как мы видим, каждому дню соответствует своя колонка. Название колонки, соответствующей дню, начинается на d_.

**sales_train_evaluation.csv**

In [0]:
df_prices.limit(5).toPandas()

Unnamed: 0,store_id,item_id,wm_yr_wk,sell_price
0,CA_1,HOBBIES_1_001,11325,9.58
1,CA_1,HOBBIES_1_001,11326,9.58
2,CA_1,HOBBIES_1_001,11327,8.26
3,CA_1,HOBBIES_1_001,11328,8.26
4,CA_1,HOBBIES_1_001,11329,8.26


### Преобразуем наши таблицы

Преобразуем `df_evaluation` так, чтобы дни были в одной колонке, а не в нескольких.

Создадим список названий колонок, начинающихся на `d_`

In [0]:
d_columns = [col for col in df_evaluation.columns if col.startswith('d_')]
n = len(d_columns)
d_columns[:10], n

Сформируем конанду для преобразования `DataFrame`

In [0]:
unpivot_expr = f"stack({n}, " + ', '.join([f"'{col}', {col}" for col in d_columns]) + ") as (d, sales)"
unpivot_expr[:47]

Ну и собственно само преобразование

In [0]:
df_evaluation_unpivot = df_evaluation.select(*df_evaluation.columns[:6], F.expr(unpivot_expr)).where("sales is not null")
print(f'Количество строк в новом DataFrame: {df_evaluation_unpivot.count()}')

Что вполне логично:

$$ \text{Кол-во строк изначально} \cdot \text{Кол-во дней} = 30490 \cdot 1941 = 59181090 $$

Все сходится.

Обьединим таблицы `df_calendar`, `df_evaluation`, `df_prices` в одну

In [0]:
df_joined = \
df_evaluation_unpivot.join(df_calendar, on=['d']) \
                     .join(df_prices, on=['item_id', 'store_id', 'wm_yr_wk']) \
                     .withColumn('snap', F.expr("case when state_id = 'CA' then snap_CA \
                                             when state_id = 'TX' then snap_TX \
                                             when state_id = 'WI' then snap_WI end")) \
                     .drop('snap_CA', 'snap_TX', 'snap_WI')

И сохраним промежуточный результат

In [0]:
df_joined.write.mode('overwrite').parquet('./tmp_df')

In [0]:
df_joined = spark.read.option("inferSchema", 'true').parquet('dbfs:/tmp_df')
df_joined.cache().limit(5).toPandas()

Unnamed: 0,item_id,store_id,wm_yr_wk,d,id,dept_id,cat_id,state_id,sales,date,weekday,wday,month,year,event_name_1,event_type_1,event_name_2,event_type_2,sell_price,snap
0,FOODS_1_001,CA_1,11104,d_22,FOODS_1_001_CA_1_evaluation,FOODS_1,FOODS,CA,1,2011-02-19,Saturday,1,2,2011,,,,,2.0,0
1,FOODS_1_001,CA_1,11104,d_23,FOODS_1_001_CA_1_evaluation,FOODS_1,FOODS,CA,2,2011-02-20,Sunday,2,2,2011,,,,,2.0,0
2,FOODS_1_001,CA_1,11104,d_24,FOODS_1_001_CA_1_evaluation,FOODS_1,FOODS,CA,0,2011-02-21,Monday,3,2,2011,PresidentsDay,National,,,2.0,0
3,FOODS_1_001,CA_1,11104,d_25,FOODS_1_001_CA_1_evaluation,FOODS_1,FOODS,CA,2,2011-02-22,Tuesday,4,2,2011,,,,,2.0,0
4,FOODS_1_001,CA_1,11104,d_26,FOODS_1_001_CA_1_evaluation,FOODS_1,FOODS,CA,2,2011-02-23,Wednesday,5,2,2011,,,,,2.0,0


Изучим наши колонки

In [0]:
qty_unique_values = df_joined.agg(*(F.countDistinct(F.col(c)).alias(c) for c in df_joined.columns))
stack_expr = f"stack({len(qty_unique_values.columns)}, " + \
               ', '.join([f"'{c}', {c}" for c in qty_unique_values.columns]) + ') as (column, n_unique)'
qty_unique_values_unpivot = qty_unique_values.select(F.expr(stack_expr))
qty_unique_values_unpivot.show(len(qty_unique_values.columns))

* **item_id** - id продукта. Состотит из: `[dept_id]_[num]`;
* **store_id** - id магазина: `[state_id]_[num]`. Всего 10-ть уникальных значений:
    * штат `WI`: `WI_1`, `WI_2`, `WI_3` - три магазина;
    * штат `CA`: `CA_1`, `CA_2`, `CA_3`, `CA_4` - четыре магазина;
    * штат `TX`: `TX_1`, `TX_2`, `TX_3` - три магазина;
* **wm_yr_wk** - порядковый номер недели (никак не связан с годом, месяцем и т.д.) с 11101 до 11617 (но с пропусками значений, т.к. 11617 - 11101 = 516, а всего уникальных `wm_yr_wk` - 278). Важно отметить, что неделя начинается с Субботы и заканчивается в Пятницу.
* **d** - id дня, колонка получилась после unpivot;
* **id** - конгломерация нескольких значений: `[item_id]_[store_id]`
* **dept_id** - id департамента, которому принадлежит продукт;
    * Категория продукта `FOODS`: три департамента - `FOODS_1`, `FOODS_2`, `FOODS_3`;
    * Категория продукта `HOUSEHOLD`: два департамента - `HOUSEHOLD_1`, `HOUSEHOLD_2`;
    * Категория продукта `HOBBIES`: два департамента - `HOBBIES_1`, `HOBBIES_2`;
* **cat_id** - id категории продукта. Всего три категории: `FOODS`, `HOUSEHOLD`, `HOBBIES`;
* **state_id** - id штата. 3 уникальных значения - `WI`, `CA`, `TX`;
* **sales** - количество продаж;
* **date** - дата в формате year-month-day; 
* **weekday** - день недели в формате `Monday`, `Tuesday` ... `Sunday`;
* **wday** - день недели в числовом формате от 1 до 7, причем:
    * 1 - суббота;
    * 2 - воскресение;
    * 3 - понедельник;
    * ...
    * 7 - пятница;
* **month** - месяц в числовом формате от 1 до 12;
* **year** - год в числовом формате от 2011 до 2016 включительно (т.е. данные за 6-ть лет);
* **event_name_1** - событие 1 в этот день:
    * `Cultural`:
        * Cinco De Mayo - Пятое Мая, национальный праздник Мексики в честь победы мексиканских войск в битве при Пуэбло;
        * Halloween - Хэллоуин;
        * Mother's day - День матери;
        * StPatricksDay - День Святого Патрика;
        * Father's day - День отца;
        * ValentinesDay - День Святого Валентина;
        * Easter - Пасха;
    * `National`:
        * LaborDay - День Труда;
        * IndependenceDay - День Независимости;
        * MartinLutherKingDay - День Мартина Лютера Кинга;
        * Thanksgiving - День Благодарения;
        * NewYear - Новый Год;
        * VeteransDay - День Ветеранов;
        * MemorialDay - День поминовения;
        * PresidentsDay - Дрезидентский День;
        * ColumbusDay - День Колумба;
        * Christmas - Рождество;
    * `Religious`:
        * Ramadan starts - начало Рамадана;
        * Eid al-Fitr - конец Рамадана;
        * Purim End;
        * OrtodoxEaster;
        * LentStart;
        * Chanukah End;
        * OrthodoxChristmas;
        * LentWeek2;
        * Pesach End;
        * EidAlAdha;
    *  `Sporting`:
        * SuperBowl;
        * NBAFinalsEnd;
        * NBAFinalsStart;
* **event_type_1** - тип события 1. Всего 4-ре уникальных значения: `null`, `Cultural`, `National`, `Religious`, `Sporting`;
* **event_name_2** - событие 2 в этот день:
    * `Cultural`:
        * Cinco De Mayo - Пятое Мая, национальный праздник Мексики в честь победы мексиканских войск в битве при Пуэбло;
        * Father's day - День отца;
        * Easter - Пасха;
    * `Religious`:
        * OrthodoxEaster - Православная Пасха;
* **event_type_2** - тип события 2. Всего  три уникальных значения: `null`, `Cultural`, `Religious`;
* **snap_CA, snap_TX, snap_WI** - Двоичная переменная (0 или 1), указывающая, разрешают ли магазины CA, TX или WI покупки по протоколу SNAP в проверяемую дату. 1 означает, что покупки по протоколу SNAP разрешены;
* **sell_price** - стоимость продукта по магазинам / неделям. Стоимость - средняя в неделю. Если `null`, значит продукт в этом магазине и в данную неделю не продавался.

Федеральное правительство США предоставляет пособие по питанию, называемое "Программой помощи в области дополнительного питания" (SNAP). Программа SNAP предоставляет семьям и частным лицам с низким доходом дебетовую карту для электронного перевода пособий для покупки продуктов питания. Во многих штатах денежные пособия распределяются между людьми в течение 10 дней в месяц, и каждый из этих дней 1/10 людей будет получать пособие по своей карте.

Сгруппируем ```df``` по дате и ```store_id```. Сохраним его.

In [0]:
df_joined.withColumn('turnover', F.col('sales')*F.col('sell_price')) \
         .groupBy(['date', 'store_id']) \
         .agg(F.first('wday').alias('wday'),
              F.first('month').alias('month'),
              F.first('year').alias('year'),
              F.first('state_id').alias('state_id'),
              F.first('event_name_1').alias('event_name_1'), 
              F.first('event_type_1').alias('event_type_1'),
              F.first('event_name_2').alias('event_name_2'),
              F.first('event_type_2').alias('event_type_2'),
              F.first('snap').alias('snap'), 
              F.sum('sales').alias('sum_sales'),
              F.sum('turnover').alias('sum_turnover')).write.mode('overwrite').parquet('./tmp/grouped_data.parquet')

В качестве метрики качества выберем метрику MAPE. Для использования этой метрики в pyspark.ml pipeline и просто для удобства, создадим custom evaluator.

In [0]:
from pyspark.ml.evaluation import Evaluator


class Mape(Evaluator):
  """
  Custom evaluator Mape для использования в pipeline pyspark.ml
  """
  def __init__(self, predictionCol="prediction", labelCol="label"):
    self.predictionCol = predictionCol
    self.labelCol = labelCol

  def _evaluate(self, dataset):
    sub = F.col(self.labelCol) - F.col(self.predictionCol)
    
    mape = dataset.select(F.mean(F.abs(F.col(self.labelCol) - F.col(self.predictionCol))/(F.col(self.labelCol)))*100).head()[0]
    return mape

  def isLargerBetter(self):
    return False

### Baseline (linear model)

Загрузим ранее сгруппированый ```df```.

In [0]:
grouped_df = spark.read.option("inferSchema", 'true').parquet('dbfs:/tmp/grouped_data.parquet')
grouped_df.limit(5).toPandas()

Unnamed: 0,date,store_id,wday,month,year,state_id,event_name_1,event_type_1,event_name_2,event_type_2,snap,sum_sales,sum_turnover
0,2011-01-29,TX_3,1,1,2011,TX,,,,,0,3030,7597.99
1,2011-01-30,CA_2,2,1,2011,CA,,,,,0,3046,8417.53
2,2011-01-30,WI_2,2,1,2011,WI,,,,,0,1922,4636.86
3,2011-01-31,WI_3,3,1,2011,WI,,,,,0,3317,7551.65
4,2011-02-01,CA_1,4,2,2011,CA,,,,,1,3051,7407.74


В качестве бэйзлайна выберем модель линейной регрессии. Будем предсказывать количество продаж за день по каждому магазину.  
Отрисуем изменение продаж по магазинам за имеющийся период времени.

In [0]:
import plotly.graph_objects as go


fig = go.Figure()
for elem in grouped_df.select('store_id').distinct().orderBy('store_id').collect():
  store = elem[0]
  fig.add_trace(go.Scatter(x=grouped_df.orderBy('date').where(F.col('store_id') == store) \
                                       .select(F.col('date').cast('timestamp')).toPandas()['date'], 
                           y=grouped_df.orderBy('date').where(F.col('store_id') == store) \
                                       .select('sum_sales').toPandas()['sum_sales'], name=store))

fig.update_layout(template='simple_white', 
                  title={'text': 'Продажи по магазинам', 'x': 0.5, 'xanchor': 'center'}, 
                  xaxis_title='Дата',
                  yaxis_title='Продажи')
fig.show()

По графикам видно, что присутствуют дни, когда продажи в магазинах отсутствуют или их очень мало. Например, Рождество - это главный национальный празник и все магазины в этот день закрыты.  
Такие дни мы при построении модели исключим из выборки.

Посмотрим, есть ли в нашем рассматриваемом промежутке времени пропущенные дни.

In [0]:
grouped_df.orderBy('date') \
          .withColumn('day_num', F.col('date').cast('timestamp').cast('long') / 86400) \
          .withColumn('day_lag', F.lag('day_num', 1).over(Window.partitionBy('store_id').orderBy('day_num'))) \
          .withColumn('delta_day', F.col('day_num') - F.col('day_lag')) \
          .where(F.col('delta_day') > 1).count()

Out[17]: 0

Как мы видим, пропусков нет. Отлично!  
Для того, чтобы оценить сезонность продаж, разложим наши данные по частотам с помощью прямого преобразования Фурье.

In [0]:
from scipy.fft import fft, fftfreq
import plotly.graph_objects as go


fig = go.Figure()
for elem in grouped_df.select('store_id').distinct().orderBy('store_id').collect():
  store = elem[0]
  N = grouped_df.where(F.col('store_id') == store).count()
  y = grouped_df.where(F.col('store_id') == store).select('sum_sales').toPandas().iloc[:, 0].values.tolist()
  dt = 1

  xf = fftfreq(N, dt)[:N // 2]
  yf = fft(y)[:N // 2]
  fig.add_trace(go.Scatter(x=xf[1:]*N, y=np.abs(yf[1:])/(N // 2), name=store, mode='lines'))
  
fig.update_layout(template='simple_white', 
                  title={'text': 'Оценка сезонности по магазинам', 'x': 0.5, 'xanchor': 'center'}, 
                  xaxis_title='Дни',
                  yaxis_title='Продажи')
fig.show()

На данном графике видно, что максимальный пик практически для всех магазинов (кроме магазина CA_2) приходит на значение частоты - 7 дней. Это говорит о том, что в данных ярко выражена именно недельная сезонность.  
Также, для многих магазинов наблюдаются пиковые значения частот 15 и 22 дня.

In [0]:
window_lag = Window.partitionBy('store_id').orderBy('date')
window_agg = Window.partitionBy('store_id') \
                   .orderBy(F.col('date').cast('timestamp').cast('long') / (24*60*60)) \
                   .rangeBetween(-7, Window.currentRow)

grouped_df.where(F.expr('day(date) != 25 and month(date) != 12')) \
          .withColumn('lag_1', F.lag('sum_sales', 1).over(window_lag)) \
          .withColumn('lag_2', F.lag('sum_sales', 2).over(window_lag)) \
          .withColumn('lag_3', F.lag('sum_sales', 3).over(window_lag)) \
          .withColumn('lag_4', F.lag('sum_sales', 4).over(window_lag)) \
          .withColumn('lag_5', F.lag('sum_sales', 5).over(window_lag)) \
          .withColumn('lag_6', F.lag('sum_sales', 6).over(window_lag)) \
          .withColumn('lag_7', F.lag('sum_sales', 7).over(window_lag)) \
          .withColumn('sales_wmean', F.mean('lag_1').over(window_agg)) \
          .dropna(how='any', subset=['sum_sales', 'lag_1', 'lag_2', 'lag_3', 'lag_4', 'lag_5', 'lag_6', 'lag_7', 'sales_wmean']) \
          .write.mode('overwrite').parquet('./tmp/BaseLineDF.parquet')

In [0]:
baseline_df = spark.read.option("inferSchema", 'true').parquet('dbfs:/tmp/BaseLineDF.parquet')
baseline_df.limit(5).toPandas()

Unnamed: 0,date,store_id,wday,month,year,state_id,event_name_1,event_type_1,event_name_2,event_type_2,snap,sum_sales,sum_turnover,lag_1,lag_2,lag_3,lag_4,lag_5,lag_6,lag_7,sales_wmean
0,2011-02-05,CA_1,1,2,2011,CA,,,,,1,5437,13375.4,3450,3276,2630,3051,2816,4155,4337,3387.857143
1,2011-02-06,CA_1,2,2,2011,CA,SuperBowl,Sporting,,,1,4340,10591.43,5437,3450,3276,2630,3051,2816,4155,3644.0
2,2011-02-07,CA_1,3,2,2011,CA,,,,,1,3157,7704.54,4340,5437,3450,3276,2630,3051,2816,3644.375
3,2011-02-08,CA_1,4,2,2011,CA,,,,,1,2995,7848.65,3157,4340,5437,3450,3276,2630,3051,3519.625
4,2011-02-09,CA_1,5,2,2011,CA,,,,,1,2710,6937.64,2995,3157,4340,5437,3450,3276,2630,3542.0


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StandardScaler

FEATURES_COL = ['lag_1', 'lag_2', 'lag_3', 'lag_4', 'lag_5', 'lag_6', 'lag_7', 'sales_wmean']
va_1 = [VectorAssembler(inputCols=[f], outputCol=('vec_' + f)) for f in FEATURES_COL]
ss = [StandardScaler(inputCol='vec_' + f, outputCol='scaled_' + f, withMean=True, withStd=True) for f in FEATURES_COL]
va_2 = VectorAssembler(inputCols=['scaled_' + f for f in FEATURES_COL], outputCol="features")
lr = LinearRegression(featuresCol='features', labelCol='sum_sales', predictionCol='prediction', maxIter=100, regParam=0.0)

stages = va_1 + ss + [va_2] + [lr]
p = Pipeline(stages=stages)

In [0]:
(trainData, testData) = baseline_df.randomSplit([0.8, 0.2], seed=42)

In [0]:
baseline_model = p.fit(trainData)

In [0]:
predictions_train = baseline_model.transform(trainData)
predictions_test = baseline_model.transform(testData)

mape = Mape(predictionCol='prediction', labelCol='sum_sales')
print('MAPE for LinearRegression (BaseLine), train: {:.2f}%'.format(mape.evaluate(predictions_train)))
print('MAPE for LinearRegression (BaseLine), test: {:.2f}%'.format(mape.evaluate(predictions_test)))

MAPE for LinearRegression (BaseLine), train: 11.01%
MAPE for LinearRegression (BaseLine), test: 10.47%


### Random Forest

Попробуем улучшить наш BaseLine.  
Построим модель RandomForest и добавим несколько новых признаков.

Загрузим ранее сгруппированый `df`.

In [0]:
grouped_df = spark.read.option("inferSchema", 'true').parquet('dbfs:/tmp/grouped_data.parquet')
grouped_df.limit(5).toPandas()

Unnamed: 0,date,store_id,wday,month,year,state_id,event_name_1,event_type_1,event_name_2,event_type_2,snap,sum_sales,sum_turnover
0,2011-01-29,TX_3,1,1,2011,TX,,,,,0,3030,7597.99
1,2011-01-30,CA_2,2,1,2011,CA,,,,,0,3046,8417.53
2,2011-01-30,WI_2,2,1,2011,WI,,,,,0,1922,4636.86
3,2011-01-31,WI_3,3,1,2011,WI,,,,,0,3317,7551.65
4,2011-02-01,CA_1,4,2,2011,CA,,,,,1,3051,7407.74


In [0]:
window_lag = Window.partitionBy('store_id').orderBy('day_num')
window_day = Window.partitionBy('store_id', 'month', F.dayofmonth(F.col('date').cast('date'))) \
                   .orderBy('year') \
                   .rangeBetween(-3, -1)
window_event = Window.orderBy('day_num').rangeBetween(Window.unboundedPreceding, -1)

grouped_df.where(F.expr('day(date) != 25 and month(date) != 12')) \
          .withColumn('day_num', (F.col('date').cast('timestamp').cast('long')/F.lit(86400)).cast('int')) \
          .withColumn('event', F.expr('case when event_name_1 is not null or event_name_2 is not null then 1 end')) \
          .withColumn('event_group', F.sum('event').over(window_event)).fillna({'event_group': 0}) \
          .withColumn('days_before_event', F.max('day_num').over(Window.partitionBy('event_group')) - F.col('day_num')) \
          .withColumn('lag_1', F.lag('sum_sales', 1).over(window_lag)) \
          .withColumn('lag_2', F.lag('sum_sales', 2).over(window_lag)) \
          .withColumn('lag_3', F.lag('sum_sales', 3).over(window_lag)) \
          .withColumn('lag_4', F.lag('sum_sales', 4).over(window_lag)) \
          .withColumn('lag_5', F.lag('sum_sales', 5).over(window_lag)) \
          .withColumn('lag_6', F.lag('sum_sales', 6).over(window_lag)) \
          .withColumn('lag_7', F.lag('sum_sales', 7).over(window_lag)) \
          .withColumn('mean_day', F.mean('sum_sales').over(window_day)) \
          .withColumn('event_name_1_enc',
                      F.dense_rank().over(Window.orderBy(F.mean('sum_sales').over(Window.partitionBy('event_name_1')).desc())) - F.lit(1)) \
          .withColumn('event_name_2_enc',
                      F.dense_rank().over(Window.orderBy(F.mean('sum_sales').over(Window.partitionBy('event_name_2')).desc())) - F.lit(1)) \
          .withColumn('store_id_enc',
                      F.dense_rank().over(Window.orderBy(F.mean('sum_sales').over(Window.partitionBy('store_id')).desc())) - F.lit(1)) \
          .withColumn('state_id_enc',
                      F.dense_rank().over(Window.orderBy(F.mean('sum_sales').over(Window.partitionBy('state_id')).desc())) - F.lit(1)) \
          .dropna(how='any', subset=['sum_sales', 'lag_1', 'lag_2', 'lag_3', 'lag_4', 'lag_5', 'lag_6', 'lag_7']).fillna(0) \
          .write.mode('overwrite').parquet('./tmp/RandomForest.parquet')

Выведем только колонки с признаками, которые будут использованы при построении модели:

- **store_id_enc** - закодированный `id` магазина;
- **lag_1**, **lag_2**, **lag_3**, **lag_4**, **lag_5**, **lag_6**, **lag_7** - смещение по количеству продаж на 1 - 7 дней соответственно;
- **wday** - день недели;
- **month** - месяц;
- **mean_day** - среднее значение в этот день за три прошедших года по магазинам;
- **event_name_1_enc** - закодированное значение категориальной переменной `event_name_1` (`enc` - encoded);
- **event_name_2_enc** - закодированное значение категориальной переменной `event_name_2` (`enc` - encoded);
- **days_before_event** - осталось дней до `event_name_1` или `event_name_2`;
- **snap** - двоичная переменная (0 или 1), указывающая, разрешают ли магазины покупки по протоколу SNAP в проверяемую дату. 1 означает, что покупки по протоколу SNAP разрешены.

In [0]:
FEATURES_COL = ['store_id_enc',
                'lag_1', 'lag_2', 'lag_3', 'lag_4', 'lag_5', 'lag_6', 'lag_7',
                'wday',
                'month',
                'mean_day',
                'event_name_1_enc', 
                'event_name_2_enc', 
                'days_before_event', 
                'snap']

randf_df = spark.read.option("inferSchema", 'true').parquet('dbfs:/tmp/RandomForest.parquet')
randf_df.select(['date'] + FEATURES_COL).orderBy('date').limit(5).toPandas()

Unnamed: 0,date,store_id_enc,lag_1,lag_2,lag_3,lag_4,lag_5,lag_6,lag_7,wday,month,mean_day,event_name_1_enc,event_name_2_enc,days_before_event,snap
0,2011-02-05,6,2629,2288,1942,2324,2121,3046,3494,1,2,0.0,12,3,1,1
1,2011-02-05,7,1691,2734,1694,2258,1822,2687,2556,1,2,0.0,12,3,1,1
2,2011-02-05,9,1469,1389,1536,1440,1386,1777,1625,1,2,0.0,12,3,1,1
3,2011-02-05,1,3450,3276,2630,3051,2816,4155,4337,1,2,0.0,12,3,1,1
4,2011-02-05,2,2588,3439,2492,2954,2731,3937,3852,1,2,0.0,12,3,1,1


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer


va = VectorAssembler(inputCols=FEATURES_COL, outputCol="features", handleInvalid="keep")
rf = RandomForestRegressor(featuresCol='features', 
                           labelCol='sum_sales', 
                           predictionCol='prediction', 
                           numTrees=100, 
                           maxDepth=10, 
                           seed=42)

p = Pipeline(stages=[va, rf])

In [0]:
(trainData, testData) = randf_df.randomSplit([0.8, 0.2], seed=42)

In [0]:
rf_model = p.fit(trainData)

In [0]:
predictions_train = rf_model.transform(trainData)
predictions_test = rf_model.transform(testData)

mape = Mape(predictionCol='prediction', labelCol='sum_sales')
print('MAPE for RandomForest, train: {:.2f}%'.format(mape.evaluate(predictions_train)))
print('MAPE for RandomForest, test: {:.2f}%'.format(mape.evaluate(predictions_test)))

MAPE for RandomForest, train: 6.75%
MAPE for RandomForest, test: 7.71%
