# Preparing the walmart data. 

AIM: The notebook helps download walmart data and turn it to a format similar to M&S hierarchical data. [Link to the dataset](https://www.kaggle.com/competitions/m5-forecasting-accuracy)

<br></br>
<div style="text-align: center; line-height: 5; padding-top: 20px;  padding-bottom: 20px;">
  <img src="https://raw.githubusercontent.com/puneet-jain159/Image_dump/2a8b03eef9bd111b98b261846d57b72ce98fde38/walmart_data_description.png" alt='Push compute' height="1000" width="1600">
</div>

### Install Dependencies

In [0]:
import pandas as pd
import numpy as np

### Clone Data from Kaggle

In [0]:
dbutils.widgets.dropdown("reset", "False", ["True", "False"], label="reset data?")

In [0]:
# dbutils.widgets.removeAll()

Below is shell script to download the data from kaggle .</br> You will need to sign in and generate access key to download the data and also agree to the data disclaimer other will hit a 403 error</br> After downloading to the local disk the data is copied to dbfs
```
kaggle competitions download -c m5-forecasting-accuracy -p /dbfs/walmart/data/
```

In [0]:
%pip install kaggle

In [0]:
dbutils.library.restartPython()

In [0]:
# create a scope or use an existing scope to put keys
# databricks secrets put --scope tokens --key kaggle_key --string-value <string value>

import os

kaggle_key = dbutils.secrets.get("tokens", "kaggle_key")
os.environ['KAGGLE_USERNAME']="cchalc"
os.environ['KAGGLE_KEY']=kaggle_key
project = "boostedtrees"

In [0]:
dbutils.fs.rm("/Users/christopher.chalcraft@databricks.com/boostedtrees", recurse=True)

In [0]:
base_path = f"/Users/christopher.chalcraft@databricks.com/{project}"
dbutils.fs.mkdirs(base_path)

print("Use this to copy into shell script: /dbfs" + base_path)

In [0]:
%sh
kaggle competitions download -c m5-forecasting-accuracy -p /dbfs/Users/christopher.chalcraft@databricks.com/boostedtrees
cd /dbfs/Users/christopher.chalcraft@databricks.com/boostedtrees &&  unzip /dbfs/Users/christopher.chalcraft@databricks.com/boostedtrees/m5-forecasting-accuracy.zip 

In [0]:
dbutils.fs.ls(base_path)

### Read the downloaded data and analyize all the different files

In [0]:
sdf_calendar = spark.read.csv(f'{base_path}/calendar.csv', header="True")
sdf_calendar.cache()
print(sdf_calendar.count())
sdf_calendar.display()

In [0]:
sdf_sales_train_evaluation = spark.read.csv(f'{base_path}/sales_train_evaluation.csv', header="True")
sdf_sales_train_evaluation.cache()
print( sdf_sales_train_evaluation.count())
sdf_sales_train_evaluation.display()

In [0]:
sdf_sell_prices= spark.read.csv(f'{base_path}/sell_prices.csv', header="True")
sdf_sell_prices.cache()
print( sdf_sell_prices.count())
sdf_sell_prices.display()

### Transformation and clean the data
We transform and melt the column into rows

In [0]:
# make sure these are created beforehand
spark.sql("use catalog cjc")
spark.sql("use schema scratch")

In [0]:
ids =['id','item_id','dept_id','cat_id','store_id','state_id'] 
cols = [col for col in sdf_sales_train_evaluation.columns if col not in ids ] 

# pivot the data
sdf_sales_train_evaluation_pivot = (sdf_sales_train_evaluation.melt(
    ids=ids, values=cols,
    variableColumnName="date", valueColumnName="sale_quantity"))

# sdf_sales_train_evaluation_pivot.write.mode("overwrite").format('delta').save(f'{base_path}/clean_data/sdf_sales_train_evaluation/')
sdf_sales_train_evaluation_pivot.write.mode("overwrite").saveAsTable("m5_sales_train_evaluation")

In [0]:
# sdf_sell_prices.filter((sdf_sell_prices.item_id == 'HOUSEHOLD_1_335') & (sdf_sell_prices.wm_yr_wk == '11105') & (sdf_sell_prices.store_id == 'CA_2')).display()

In [0]:
# merge to get date 
sdf_calendar = sdf_calendar.withColumnRenamed("date", 'date_time')
cond = [sdf_sales_train_evaluation_pivot.date == sdf_calendar.d]
sdf_sales_train_evaluation_pivot = sdf_sales_train_evaluation_pivot.join(sdf_calendar.select(['d','wm_yr_wk','date_time']),cond, 'left')

# merge to get sales price
cond = [sdf_sales_train_evaluation_pivot.wm_yr_wk == sdf_sell_prices.wm_yr_wk,
        sdf_sales_train_evaluation_pivot.item_id == sdf_sell_prices.item_id,
        sdf_sales_train_evaluation_pivot.store_id == sdf_sell_prices.store_id ]
final_table = sdf_sales_train_evaluation_pivot.join(sdf_sell_prices,cond, 'left'). \
      select(sdf_sales_train_evaluation_pivot.item_id,sdf_sales_train_evaluation_pivot.dept_id,
             sdf_sales_train_evaluation_pivot.cat_id,sdf_sales_train_evaluation_pivot.store_id,
             sdf_sales_train_evaluation_pivot.state_id ,sdf_sales_train_evaluation_pivot.date_time,
             sdf_sell_prices.sell_price ,sdf_sales_train_evaluation_pivot.sale_quantity)

# write out the final table
# final_table.write.mode("overwrite").format('delta').save('dbfs:/walmart/data/clean_data/final_cleaned_table/')
final_table.write.mode("overwrite").saveAsTable("m5_final_cleaned_table")

In [0]:
# Clean the final table 

# remove nulls sell_price and sales_quantity = 0 
final_table = final_table.filter(final_table.sale_quantity > 0)
display(final_table)

### Write out the final table as a Delta Table

In [0]:
#
# final_table.write.mode("overwrite").format('delta').save('dbfs:/walmart/data/clean_data/final_cleaned_filtered')
final_table.write.mode("overwrite").saveAsTable('m5_final_cleaned_filtered')
