In [57]:
#Check if container is passing through + general imports 
import torch
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


### Making Databricks Connection and Creating Dataset from Spark

In [58]:
# Make Connection
from databricks_connect import connect_explicit
spark = connect_explicit()

#Making initial data query
data = spark.sql("""
    SELECT 
        ss_sold_date_sk,
        ss_item_sk,
        ss_store_sk,
        ss_promo_sk,
        ss_quantity,
        d_date,
        d_day_name,
        d_holiday,
        d_following_holiday,
        d_weekend,
        i_item_id,
        s_store_id
        
    FROM samples.tpcds_sf1.store_sales AS ss
    INNER JOIN samples.tpcds_sf1.date_dim AS dd
    ON ss.ss_sold_date_sk = dd.d_date_sk
    INNER JOIN samples.tpcds_sf1.item AS i
    ON ss.ss_item_sk = i.i_item_sk
    INNER JOIN samples.tpcds_sf1.store AS s
    ON ss.ss_store_sk = s.s_store_sk
    """)

# Promotion table with actual dates
promo = spark.sql("""
    SELECT
        p.p_item_sk, 
        p.p_promo_sk,
        p.p_promo_id,
        dd_start.d_date as promo_start_date,
        dd_end.d_date as promo_end_date

    FROM samples.tpcds_sf1.promotion AS p
    LEFT JOIN samples.tpcds_sf1.date_dim AS dd_start
    ON p.p_start_date_sk = dd_start.d_date_sk
    LEFT JOIN samples.tpcds_sf1.date_dim AS dd_end
    ON p.p_end_date_sk = dd_end.d_date_sk
    """)

sales_promo = data.join(
    promo,
    (data.ss_item_sk == promo.p_item_sk) & 
    (data.d_date >= promo.promo_start_date) & 
    (data.d_date <= promo.promo_end_date),
    "left"
).select(
    data["*"],
    promo.p_promo_id,
    promo.promo_start_date,
    promo.promo_end_date
)

sales_promo.show()

+---------------+----------+-----------+-----------+-----------+----------+----------+---------+-------------------+---------+----------------+----------------+----------+----------------+--------------+
|ss_sold_date_sk|ss_item_sk|ss_store_sk|ss_promo_sk|ss_quantity|    d_date|d_day_name|d_holiday|d_following_holiday|d_weekend|       i_item_id|      s_store_id|p_promo_id|promo_start_date|promo_end_date|
+---------------+----------+-----------+-----------+-----------+----------+----------+---------+-------------------+---------+----------------+----------------+----------+----------------+--------------+
|        2451181|     14386|          1|        251|         77|1999-01-02|  Saturday|        N|                  Y|        Y|AAAAAAAACDIDAAAA|AAAAAAAABAAAAAAA|      NULL|            NULL|          NULL|
|        2451181|     11323|          1|          1|         84|1999-01-02|  Saturday|        N|                  Y|        Y|AAAAAAAALDMCAAAA|AAAAAAAABAAAAAAA|      NULL|            N

In [59]:
#Clean and aggregate data
#spc = "Sales Promo Cleaned"

from pyspark.sql.functions import sum as spark_sum, when, col
from pyspark.sql.types import DateType, StringType, FloatType

spc = sales_promo.select(               sales_promo.d_date,
                                        sales_promo.ss_quantity, 
                                        sales_promo.d_day_name,
                                        sales_promo.d_holiday,
                                        sales_promo.d_following_holiday,
                                        sales_promo.d_weekend,
                                        sales_promo.i_item_id,
                                        sales_promo.s_store_id,
                                        sales_promo.p_promo_id,
                                        sales_promo.promo_start_date,
                                        sales_promo.promo_end_date)

spc = spc.withColumn(
    "promo_indicator", 
        when(col("p_promo_id").isNotNull(), "Y").otherwise("N")
)

spc_typed = spc.select(
    col("d_date").cast(DateType()).alias("date"),
    col("ss_quantity").cast(FloatType()).alias("quantity"),
    col("d_day_name").cast(StringType()).alias("day"),
    col("d_holiday").cast(StringType()).alias("holiday"),
    col("d_following_holiday").cast(StringType()).alias("following_holiday"),
    col("d_weekend").cast(StringType()).alias("weekend"),
    col("i_item_id").cast(StringType()).alias("item_id"),
    col("s_store_id").cast(StringType()).alias("store_id"),
    col("promo_indicator").cast(StringType()).alias("promo_indicator"),
    col("p_promo_id").cast(StringType()).alias("promo_id"),
    col("promo_start_date").cast(DateType()).alias("promo_start"),
    col("promo_end_date").cast(DateType()).alias("promo_end")
)

spc_typed.show()

+----------+--------+--------+-------+-----------------+-------+----------------+----------------+---------------+--------+-----------+---------+
|      date|quantity|     day|holiday|following_holiday|weekend|         item_id|        store_id|promo_indicator|promo_id|promo_start|promo_end|
+----------+--------+--------+-------+-----------------+-------+----------------+----------------+---------------+--------+-----------+---------+
|1999-01-02|    77.0|Saturday|      N|                Y|      Y|AAAAAAAACDIDAAAA|AAAAAAAABAAAAAAA|              N|    NULL|       NULL|     NULL|
|1999-01-02|    84.0|Saturday|      N|                Y|      Y|AAAAAAAALDMCAAAA|AAAAAAAABAAAAAAA|              N|    NULL|       NULL|     NULL|
|1999-01-02|    96.0|Saturday|      N|                Y|      Y|AAAAAAAANJHCAAAA|AAAAAAAABAAAAAAA|              N|    NULL|       NULL|     NULL|
|1999-01-02|    51.0|Saturday|      N|                Y|      Y|AAAAAAAALHPBAAAA|AAAAAAAABAAAAAAA|              N|    NULL| 

In [72]:
from pyspark.sql.functions import asc

spc_typed_cleaned = spc_typed.select(
    col("date"),
    col("quantity"),
    col("day"),
    col("holiday"),
    col("following_holiday"),
    col("weekend"),
    col("item_id"),
    col("store_id"),
    col("promo_indicator")
).sort(asc("date"))

spc_typed_cleaned.show()

+----------+--------+------+-------+-----------------+-------+----------------+----------------+---------------+
|      date|quantity|   day|holiday|following_holiday|weekend|         item_id|        store_id|promo_indicator|
+----------+--------+------+-------+-----------------+-------+----------------+----------------+---------------+
|1998-01-02|    84.0|Friday|      N|                Y|      Y|AAAAAAAAEAKDAAAA|AAAAAAAABAAAAAAA|              N|
|1998-01-02|    86.0|Friday|      N|                Y|      Y|AAAAAAAAMDACAAAA|AAAAAAAABAAAAAAA|              N|
|1998-01-02|    48.0|Friday|      N|                Y|      Y|AAAAAAAADNJAAAAA|AAAAAAAABAAAAAAA|              N|
|1998-01-02|    36.0|Friday|      N|                Y|      Y|AAAAAAAALHFCAAAA|AAAAAAAAHAAAAAAA|              N|
|1998-01-02|    44.0|Friday|      N|                Y|      Y|AAAAAAAACCGEAAAA|AAAAAAAAHAAAAAAA|              N|
|1998-01-02|    17.0|Friday|      N|                Y|      Y|AAAAAAAAEAJBAAAA|AAAAAAAABAAAAAAA|

### Light Preparation and Cleaning in Pandas 

In [73]:
df = spc_typed_cleaned.toPandas()
df = df.dropna()
df = df.groupby(["date", "item_id", "store_id"]).sum().reset_index()
df = df.set_index("date")
df.head()

Unnamed: 0_level_0,item_id,store_id,quantity,day,holiday,following_holiday,weekend,promo_indicator
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1998-01-02,AAAAAAAAAAACAAAA,AAAAAAAACAAAAAAA,50.0,Friday,N,Y,Y,N
1998-01-02,AAAAAAAAAAHBAAAA,AAAAAAAAHAAAAAAA,90.0,Friday,N,Y,Y,N
1998-01-02,AAAAAAAAAAHDAAAA,AAAAAAAAHAAAAAAA,13.0,Friday,N,Y,Y,N
1998-01-02,AAAAAAAAAALDAAAA,AAAAAAAAHAAAAAAA,21.0,Friday,N,Y,Y,N
1998-01-02,AAAAAAAAAAMCAAAA,AAAAAAAAIAAAAAAA,15.0,Friday,N,Y,Y,N


In [74]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2608127 entries, 1998-01-02 to 2003-01-02
Data columns (total 8 columns):
 #   Column             Dtype  
---  ------             -----  
 0   item_id            object 
 1   store_id           object 
 2   quantity           float32
 3   day                object 
 4   holiday            object 
 5   following_holiday  object 
 6   weekend            object 
 7   promo_indicator    object 
dtypes: float32(1), object(7)
memory usage: 169.1+ MB


In [75]:
unique_items = df["item_id"].nunique()
print(f"Unique Items: {unique_items}")
unique_stores = df["store_id"].nunique()
print(f"Unique Stores: {unique_stores}")
unique_dates = df.index.nunique()
print(f"Unique Dates: {unique_dates}")

Unique Items: 9000
Unique Stores: 6
Unique Dates: 1823
