# Preprocessing with Fugue

## Loading in Data

We'll take a quick look at the data given to us to understand the problem more. Most of the code snippets here are taken from [Rob Mulla's Starter Notebook](https://www.kaggle.com/code/robikscube/m5-forecasting-starter-data-exploration). We're not going to go to deep to understand everything. We're only interested in setting up an end-to-end modelling pipeline.

In [1]:
import pandas as pd
import os

download_path = os.path.abspath(os.path.join(".","..","data","m5-forecasting-accuracy.zip"))
unzipped_path = os.path.abspath(os.path.join(".","..","data","m5-forecasting-accuracy-unzipped"))

# Read in the data
INPUT_DIR = unzipped_path
WORKING_DIR = os.path.join(unzipped_path, "..", "working")
calendar = pd.read_csv(f'{INPUT_DIR}/calendar.csv')
training_data = pd.read_csv(f'{INPUT_DIR}/sales_train_evaluation.csv')
sell_prices = pd.read_csv(f'{INPUT_DIR}/sell_prices.csv')


## Preprocessing

The format above is not friendly to work with. We will preprocess each team series using Fugue to run on top of Ray. Let's focus on the logic for one timeseries first.

In [8]:
def read_calendar() -> pd.DataFrame:
    df = pd.read_csv(f'{INPUT_DIR}/calendar.csv')
    df["date"] = pd.to_datetime(df["date"])
    return df.sort_values("date")

CALENDAR = read_calendar()
CALENDAR_START = CALENDAR.iloc[0]["date"]
CALENDAR_END = CALENDAR.iloc[-1]["date"]

In [9]:
from typing import List, Dict, Any, Iterable
from datetime import date

def prices_to_series(df:pd.DataFrame) -> List[Dict[str,Any]]:
    # Assert each date has a price entry
    assert df.shape[0] == (df.date.iloc[-1]-df.date.iloc[0]).days + 1
    return [dict(store_id=df.iloc[0]["store_id"],
                 item_id=df.iloc[0]['item_id'],
                 price_start=df.iloc[0]['date'], 
                 prices=df["sell_price"].tolist())]

df = pd.DataFrame([["store1","item1",date(2020,1,2),2.2], 
                   ["store1","item1",date(2020,1,3),3.3],
                   ["store1","item1", date(2020,1,4),4.4]], 
                   columns=["store_id", "item_id", "date","sell_price"])
print(prices_to_series(df))

[{'store_id': 'store1', 'item_id': 'item1', 'price_start': datetime.date(2020, 1, 2), 'prices': [2.2, 3.3, 4.4]}]


In [10]:
calendar = read_calendar()[["date","wm_yr_wk"]]
joined = sell_prices.merge(calendar, how="inner", on="wm_yr_wk")
joined.head()

Unnamed: 0,store_id,item_id,wm_yr_wk,sell_price,date
0,CA_1,HOBBIES_1_001,11325,9.58,2013-07-13
1,CA_1,HOBBIES_1_001,11325,9.58,2013-07-14
2,CA_1,HOBBIES_1_001,11325,9.58,2013-07-15
3,CA_1,HOBBIES_1_001,11325,9.58,2013-07-16
4,CA_1,HOBBIES_1_001,11325,9.58,2013-07-17


In [11]:
from fugue import transform

In [None]:
%%time
out = transform(joined, 
                prices_to_series, 
                schema="store_id:str,item_id:str,price_start:date,prices:[double]",
                partition={"by": ["store_id", "item_id"], "presort": "date asc"})
out.head()

CPU times: user 2min 32s, sys: 12 s, total: 2min 44s
Wall time: 2min 54s


Unnamed: 0,store_id,item_id,price_start,prices
0,CA_1,FOODS_1_001,2011-01-29,"[2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, ..."
1,CA_1,FOODS_1_002,2011-01-29,"[7.88, 7.88, 7.88, 7.88, 7.88, 7.88, 7.88, 7.8..."
2,CA_1,FOODS_1_003,2011-01-29,"[2.88, 2.88, 2.88, 2.88, 2.88, 2.88, 2.88, 2.8..."
3,CA_1,FOODS_1_004,2012-03-03,"[1.78, 1.78, 1.78, 1.78, 1.78, 1.78, 1.78, 1.7..."
4,CA_1,FOODS_1_005,2011-01-29,"[2.94, 2.94, 2.94, 2.94, 2.94, 2.94, 2.94, 2.9..."


In [None]:
%%time
ddf = transform(joined, 
                prices_to_series, 
                schema="store_id:str,item_id:str,price_start:date,prices:[double]",
                partition={"by": ["store_id", "item_id"], "presort": "date asc"},
                engine="dask")
ddf.compute().head()

CPU times: user 12.7 s, sys: 6.31 s, total: 19 s
Wall time: 1min 37s


Unnamed: 0,store_id,item_id,price_start,prices
0,CA_1,FOODS_1_024,2011-01-29,"[3.97, 3.97, 3.97, 3.97, 3.97, 3.97, 3.97, 3.9..."
1,CA_1,FOODS_1_031,2011-05-28,"[3.28, 3.28, 3.28, 3.28, 3.28, 3.28, 3.28, 3.2..."
2,CA_1,FOODS_1_034,2011-04-09,"[5.61, 5.61, 5.61, 5.61, 5.61, 5.61, 5.61, 5.6..."
3,CA_1,FOODS_1_038,2011-02-05,"[7.97, 7.97, 7.97, 7.97, 7.97, 7.97, 7.97, 7.9..."
4,CA_1,FOODS_1_039,2012-07-14,"[2.48, 2.48, 2.48, 2.48, 2.48, 2.48, 2.48, 2.4..."


In [15]:
from fugue import FugueWorkflow as DAG

def save_price_series() -> DAG:
    dag = DAG()
    df = dag.load(f'{INPUT_DIR}/sell_prices.csv', header=True, infer_schema=True).alter_columns("sell_price:double")  # make sure sell_price column is in double
    cal = dag.load(f'{INPUT_DIR}/calendar.csv', header=True, infer_schema=True)[["date","wm_yr_wk"]].alter_columns("date:date")  # make sure date column is in date
    joined = df.inner_join(cal)
    transformed = joined.partition(by=["store_id","item_id"], presort="date").transform(prices_to_series, schema="store_id:str,item_id:str,price_start:date,prices:[double]")
    transformed.save(f"{WORKING_DIR}/price_series.parquet")
    return dag

In [None]:
%%time
save_price_series().run()

CPU times: user 2min 46s, sys: 9.11 s, total: 2min 55s
Wall time: 3min


DataFrames()

## Preprocessing Sales Data

In [19]:
# schema: id:str,item_id:str,dept_id:str,cat_id:str,store_id:str,state_id:str,start:date,sales:[int]
def sales_to_series(df:Iterable[List[Any]]) -> Iterable[List[Any]]:
    calendar = read_calendar()
    start = calendar['date'].min()
    for row in df:
        yield row[:6] + [start, row[6:]]
        
def save_sales_series() -> DAG:
    dag = DAG()
    df = dag.load(f"{INPUT_DIR}/sales_train_evaluation.csv", header=True, infer_schema=True)
    df.transform(sales_to_series).save(f"{WORKING_DIR}/sales_series.parquet")
    return dag

In [21]:
list(sales_to_series([training_data.iloc[0].to_list()]))

[['HOBBIES_1_001_CA_1_evaluation',
  'HOBBIES_1_001',
  'HOBBIES_1',
  'HOBBIES',
  'CA_1',
  'CA',
  Timestamp('2011-01-29 00:00:00'),
  [0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,


In [None]:
%%time
save_sales_series().run()

CPU times: user 25.5 s, sys: 1.05 s, total: 26.5 s
Wall time: 26.7 s


DataFrames()

In [25]:
import pickle 

# input_has: start:date,sales:[int],price_start:date,prices:[double]
# schema: *,series:binary-sales,prices,price_start
def merge_series(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    for row in df:
        dr1 = pd.date_range(row["start"],periods=len(row["sales"]), freq="d")
        df = pd.DataFrame({"quantity":row["sales"]},index = dr1)
        dr2 = pd.date_range(row["price_start"],periods=len(row["prices"]), freq="d")
        df["price"] = pd.Series(row["prices"],index = dr2)
        df=df.dropna()
        row["start"] = df.index[0].date()
        row["series"] = pickle.dumps(df)
        yield row
    

In [26]:
df = [
    dict(
        start=date(2020,1,2),
        sales=[2,3,4,5],
        price_start=date(2020,1,3),
        prices=[30,40]
    )
]

print(pickle.loads(list(merge_series(df))[0]["series"]))

            quantity  price
2020-01-03         3   30.0
2020-01-04         4   40.0


In [27]:
def run_merge() -> DAG:
    dag = DAG()
    sales=dag.load(f"{WORKING_DIR}/sales_series.parquet")
    prices=dag.load(f"{WORKING_DIR}/price_series.parquet")
    sales.inner_join(prices).partition(num=16).transform(merge_series).save(f"{WORKING_DIR}/series.parquet")
    return dag

In [28]:
%%time
run_merge().run("spark")

                                                                                

22/11/06 23:43:36 WARN MemoryStore: Not enough space to cache broadcast_5 in memory! (computed 377.0 MiB so far)
22/11/06 23:43:36 WARN BlockManager: Persisting block broadcast_5 to disk instead.
22/11/06 23:43:37 WARN MemoryStore: Not enough space to cache broadcast_5 in memory! (computed 377.0 MiB so far)


[Stage 6:>                                                         (0 + 8) / 16]

22/11/06 23:43:47 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers




22/11/06 23:44:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers




CPU times: user 69.3 ms, sys: 16.5 ms, total: 85.8 ms
Wall time: 46.4 s


                                                                                

DataFrames()

In [30]:
pickle.loads(pd.read_parquet(f"{WORKING_DIR}/series.parquet").iloc[0]['series'])

Unnamed: 0,quantity,price
2013-07-13,0,1.77
2013-07-14,3,1.77
2013-07-15,1,1.77
2013-07-16,2,1.77
2013-07-17,3,1.77
...,...,...
2016-05-18,0,1.77
2016-05-19,0,1.77
2016-05-20,0,1.77
2016-05-21,0,1.77
