In [1]:
!pip install pyspark



In [11]:
import pandas as pd

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

from google.colab import drive
drive.mount('/content/drive')

MONTHLY_SALES_PATH = "/content/drive/MyDrive/monthly_sales_data"
SELL_PRICES_PATH = "/content/drive/MyDrive/walmart_data/sell_prices.parquet"
CALENDAR_PATH = "/content/drive/MyDrive/walmart_data/calendar.parquet"

OUTPUT_PATH  = "/content/drive/MyDrive/basic_features"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [12]:
spark = SparkSession.builder.getOrCreate()

In [13]:
df_monthly = spark.read.parquet(MONTHLY_SALES_PATH)
df_sell_prices  = spark.read.parquet(SELL_PRICES_PATH)
df_calendar = spark.read.parquet(CALENDAR_PATH)

In [14]:
df_monthly.limit(10).toPandas()

Unnamed: 0,item_id,store_id,date,dept_id,cat_id,state_id,id,sales
0,FOODS_1_001,CA_1,2011-01-01,FOODS_1,FOODS,CA,FOODS_1_001_CA_1_validation,3
1,FOODS_1_001,CA_1,2011-10-01,FOODS_1,FOODS,CA,FOODS_1_001_CA_1_validation,0
2,FOODS_1_001,CA_1,2011-11-01,FOODS_1,FOODS,CA,FOODS_1_001_CA_1_validation,42
3,FOODS_1_001,CA_1,2012-03-01,FOODS_1,FOODS,CA,FOODS_1_001_CA_1_validation,38
4,FOODS_1_001,CA_1,2012-06-01,FOODS_1,FOODS,CA,FOODS_1_001_CA_1_validation,25
5,FOODS_1_001,CA_1,2012-12-01,FOODS_1,FOODS,CA,FOODS_1_001_CA_1_validation,36
6,FOODS_1_001,CA_1,2013-01-01,FOODS_1,FOODS,CA,FOODS_1_001_CA_1_validation,19
7,FOODS_1_001,CA_1,2013-04-01,FOODS_1,FOODS,CA,FOODS_1_001_CA_1_validation,12
8,FOODS_1_001,CA_1,2013-05-01,FOODS_1,FOODS,CA,FOODS_1_001_CA_1_validation,23
9,FOODS_1_001,CA_1,2013-07-01,FOODS_1,FOODS,CA,FOODS_1_001_CA_1_validation,29


# 1. Extraction des caractéristiques temporelles

In [15]:
df_1 = (
    df_monthly
    .withColumn(
        "prediction_date", F.add_months("date", 1)
      )
    .withColumn(
        "month", F.month("prediction_date")
    )
    .withColumn(
        "quarter", F.quarter("prediction_date")
    )
    .withColumn(
        "year", F.year("prediction_date")
    )
)

# 2. Attributs Produits

In [16]:
product_attributes = ["dept_id", "cat_id"]

#3. Attributs Magasins

In [17]:
boutique_attributes = ["store_id", "state_id"]

# 4. Récence de chaque produit

In [18]:
 df_launch_months = (
     df_1
     .filter(F.col("sales") > 0)
     .groupby('item_id')
     .agg(
        F.min("date").alias("launch_month")
     )
)
 df_4 = (
     df_1
     .join(
         df_launch_months,
         on="item_id",
         how="left"
     )
     .withColumn("recency", F.months_between("date", "launch_month"))

 )

# 5. Lags des ventes

In [19]:
ts_date_window = Window.partitionBy("item_id", "store_id").orderBy(F.asc("date"))
n_lags = 12
df_5 = df_4
for i in range(1, n_lags + 1):
    df_5 = (
        df_5
        .withColumn(f"sales_lag_{i}", F.lag("sales", i).over(ts_date_window))
    )

# 6. Prix Moyen

In [20]:
df_sell_prices.limit(5).toPandas()

Unnamed: 0,store_id,item_id,wm_yr_wk,sell_price
0,CA_1,HOBBIES_1_001,11325,9.58
1,CA_1,HOBBIES_1_001,11326,9.58
2,CA_1,HOBBIES_1_001,11327,8.26
3,CA_1,HOBBIES_1_001,11328,8.26
4,CA_1,HOBBIES_1_001,11329,8.26


In [21]:
df_calendar.limit(5).toPandas()

Unnamed: 0,date,wm_yr_wk,weekday,wday,month,year,d,event_name_1,event_type_1,event_name_2,event_type_2,snap_CA,snap_TX,snap_WI
0,2011-01-29,11101,Saturday,1,1,2011,d_1,,,,,0,0,0
1,2011-01-30,11101,Sunday,2,1,2011,d_2,,,,,0,0,0
2,2011-01-31,11101,Monday,3,1,2011,d_3,,,,,0,0,0
3,2011-02-01,11101,Tuesday,4,2,2011,d_4,,,,,1,1,0
4,2011-02-02,11101,Wednesday,5,2,2011,d_5,,,,,1,0,1


In [22]:
df_weekly_sell_prices = (
    df_sell_prices
    .join(
        df_calendar.select("date","wm_yr_wk"),
        on="wm_yr_wk",
        how="left"
    )
    .withColumn("date", F.trunc("date", "month"))
    .groupby(
        "store_id", "item_id", "date"
    )
    .agg(
        F.avg("sell_price").alias("avg_price")
    )
  )

In [23]:
df_6 = (
    df_5
    .join(
        df_weekly_sell_prices,
        on=["store_id", "item_id", "date"],
        how="left"
    )
)

In [24]:
# Check price missing values
df_6.agg(F.sum(F.isnull("avg_price").cast("int"))).collect()[0][0]

413401

In [25]:
# Check price missing value in only month with sales
df_6.filter(F.col("sales") > 0).agg(F.sum(F.isnull("avg_price").cast("int"))).collect()[0][0]

0

# 7 Calcul de la variable cible

In [26]:
df_7 = (
    df_6.withColumn(
        "target_next_month_sales", F.lead("sales",1).over(ts_date_window)
    )
)

# 8 Rajout des Special Dates (j'ai oublié de mettre cette question sur la slide)

In [27]:
df_8 = (
    df_7
    .join(
        (
            df_calendar
            .withColumnRenamed("date", "prediction_date")
            .select(
                "prediction_date",
                'event_name_1',
                'event_type_1',
                'event_name_2',
                'event_type_2',
                'snap_CA',
                'snap_TX',
                'snap_WI'
            )
        ),
        on="prediction_date",
        how="left"
    )
)

# Save Output

In [28]:
df_8.write.mode('overwrite').format('parquet').save(OUTPUT_PATH)

# Sauvegarde des résultats de calcul pour une série temporelle en CSV pour validation

In [None]:
df_8.filter(F.col("id") == "FOODS_1_001_CA_1_validation").toPandas().to_csv("extract.csv", sep=";")