# Sales Data

In [0]:
from pyspark.sql import SparkSession
import sys
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, FloatType
from pyspark.sql.types import *

date_str = sys.argv[1]

#creating Spark Session
#change to yarn for EMR Cluster
spark = SparkSession.builder.master("local[*]").appName("demo").getOrCreate()

salesSchema = StructType([
    StructField("TRANS_ID", IntegerType(), True),
    StructField("PROD_KEY", IntegerType(), True),
    StructField("STORE_KEY", IntegerType(), True),
    StructField("TRANS_DT", DateType(), True),
    StructField("TRANS_TIME", IntegerType(), True),
    StructField("SALES_QTY", FloatType(), True),
    StructField("SALES_PRICE", FloatType(), True),
    StructField("SALES_AMT", FloatType(), True),
    StructField("DISCOUNT", FloatType(), True),
    StructField("SALES_COST", FloatType(), True),
    StructField("SALES_MGRN", FloatType(), True),
    StructField("SHIP_COST", FloatType(), True),
    ]
)

#read a file from S3
#change to s3 bucket when running in EMR
sales_df = spark.read.option("header", "true").option("delimiter", ",").schema(salesSchema).csv("/FileStore/tables/sales_20230723.csv")#"file:////home/ec2-user/midterm/data/sales_20230722.csv")

#total sales quantity by week, store, and product
sales_df.createOrReplaceTempView("sales")

df_sum_sales_quantity = spark.sql("select * from sales limit 5;")
df_sum_sales_quantity.show() 

df_sum_sales_quantity.printSchema()

+--------+--------+---------+----------+----------+---------+-----------+---------+--------+----------+----------+---------+
|TRANS_ID|PROD_KEY|STORE_KEY|  TRANS_DT|TRANS_TIME|SALES_QTY|SALES_PRICE|SALES_AMT|DISCOUNT|SALES_COST|SALES_MGRN|SHIP_COST|
+--------+--------+---------+----------+----------+---------+-----------+---------+--------+----------+----------+---------+
|  244054|  455222|     8103|2020-10-09|        12|     25.0|      37.94|   721.06|     0.1|    610.39|    338.01|     5.08|
|  244056|  637817|     8103|2020-06-04|        16|      2.4|     999.99|  2423.98|     0.0|   4819.97|   -1820.0|    13.99|
|  244058|  492902|     8103|2022-10-25|        17|     30.0|      14.03|   356.94|    0.08|    569.08|   -148.26|     9.37|
|  244060|  612619|     8103|2022-01-10|        18|     13.6|     107.53|  1782.68|    0.08|   1547.29|    280.64|     5.81|
|  244062| 1039077|     8103|2020-05-25|        18|     34.0|      27.18|   622.89|     0.1|    719.53|    204.49|     8.23|


# Product Data

In [0]:
productSchema = StructType([
    StructField("PROD_KEY", IntegerType(), True),
    StructField("PROD_NAME", StringType(), True),
    StructField("VOL", FloatType(), True),
    StructField("WGT", FloatType(), True),
    StructField("BRAND_NAME", StringType(), True),
    StructField("STATUS_CODE", IntegerType(), True),
    StructField("STATUS_CODE_NAME", StringType(), True),
    StructField("CATEGORY_KEY", IntegerType(), True),
    StructField("CATEGORY_NAME", StringType(), True),
    StructField("SUBCATEGORY_KEY", IntegerType(), True),
    StructField("SUBCATEGORY_NAME", StringType(), True),
    ]
)

#read a file from S3
#change to s3 bucket when running in EMR
product_df = spark.read.option("header", "true").option("delimiter", ",").schema(productSchema).csv("/FileStore/tables/product_20230723.csv")#"file:////home/ec2-user/midterm/data/sales_20230722.csv")

#total sales quantity by week, store, and product
product_df.createOrReplaceTempView("product")

product_df = spark.sql("select * from product limit 5;")
product_df.show() 

product_df.printSchema()

+--------+--------------+-----+-----+----------+-----------+----------------+------------+-------------+---------------+----------------+
|PROD_KEY|     PROD_NAME|  VOL|  WGT|BRAND_NAME|STATUS_CODE|STATUS_CODE_NAME|CATEGORY_KEY|CATEGORY_NAME|SUBCATEGORY_KEY|SUBCATEGORY_NAME|
+--------+--------------+-----+-----+----------+-----------+----------------+------------+-------------+---------------+----------------+
|  657768|Product-657768| 1.22| 28.6|  brand-14|          1|          active|           4|   category-4|              1|   subcategory-1|
|  293693|Product-293693|10.54| 6.29|  brand-13|          1|          active|           1|   category-1|              4|   subcategory-4|
|  484597|Product-484597| 3.11|14.88|   brand-7|          1|          active|           1|   category-1|              4|   subcategory-4|
|  939925|Product-939925|16.12| 0.93|  brand-18|          1|          active|           5|   category-5|              3|   subcategory-3|
|  234470|Product-234470|12.04|70.

# Inventory Data

In [0]:
inventorySchema = StructType([
    StructField("CAL_DT", DateType(), True),
    StructField("STORE_KEY", IntegerType(), True),
    StructField("PROD_KEY", IntegerType(), True),
    StructField("INVENTORY_ON_HAND_QTY", FloatType(), True),
    StructField("INVENTORY_ON_ORDER_QTY", FloatType(), True),
    StructField("OUT_OF_STOCK_FLG", IntegerType(), True),
    StructField("WASTE_QTY", FloatType(), True),
    StructField("PROMOTION_FLG", BooleanType(), True),
    StructField("NEXT_DELIVERY_DT", DateType(), True),
    ]
)

#read a file from S3
#change to s3 bucket when running in EMR
inventory_df = spark.read.option("header", "true").option("delimiter", ",").schema(inventorySchema).csv("/FileStore/tables/inventory_20230723.csv")#"file:////home/ec2-user/midterm/data/sales_20230722.csv")

#total sales quantity by week, store, and product
inventory_df.createOrReplaceTempView("inventory")

inventory_df = spark.sql("select * from inventory limit 5;")
inventory_df.show() 

inventory_df.printSchema()

+----------+---------+--------+---------------------+----------------------+----------------+---------+-------------+----------------+
|    CAL_DT|STORE_KEY|PROD_KEY|INVENTORY_ON_HAND_QTY|INVENTORY_ON_ORDER_QTY|OUT_OF_STOCK_FLG|WASTE_QTY|PROMOTION_FLG|NEXT_DELIVERY_DT|
+----------+---------+--------+---------------------+----------------------+----------------+---------+-------------+----------------+
|2020-01-01|      248|  539839|                33.28|                 28.16|               1|      0.0|         true|      2009-01-14|
|2020-01-01|      248| 1064589|                 7.56|                  8.19|               0|      1.0|        false|      2009-01-06|
|2020-01-01|     1054|  539839|                 38.4|                  57.6|               0|      1.0|         true|      2009-01-10|
|2020-01-01|     1054| 1064589|                13.68|                  7.92|               1|      1.0|         true|      2009-01-16|
|2020-01-01|     1103|  539839|                 8.64|  

# Store Data

In [0]:
storeSchema = StructType([
    StructField("STORE_KEY", IntegerType(), True),
    StructField("STORE_NUM", IntegerType(), True),
    StructField("STORE_DESC", StringType(), True),
    StructField("ADDR", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("REGION", StringType(), True),
    StructField("CNTRY_CD", StringType(), True),
    StructField("CNTRY_NM", StringType(), True),
    StructField("POSTAL_ZIP_CD", StringType(), True),
    StructField("PROV_STATE_CD", StringType(), True),
    StructField("STORE_TYPE_CD", StringType(), True),
    StructField("FRNCHS_FLG", BooleanType(), True),
    StructField("STORE_SIZE", FloatType(), True),
    StructField("MARKET_NAME", StringType(), True),
    StructField("SUBMARKET_KEY", IntegerType(), True),
    StructField("SUBMARKET_NAME", StringType(), True),
    StructField("LATITUDE", FloatType(), True),
    StructField("LONGITUDE", FloatType(), True),
    ]
)

#schema(storeSchema).

#read a file from S3
#change to s3 bucket when running in EMR
store_df = spark.read.option("header", "true").option("delimiter", ",").schema(storeSchema).csv("/FileStore/tables/store_20230723.csv")#"file:////home/ec2-user/midterm/data/sales_20230722.csv")

#total sales quantity by week, store, and product
store_df.createOrReplaceTempView("store")

store_df = spark.sql("select * from store limit 5;")
#spark.sql("select distinct STORE_SIZE from store order by STORE_SIZE DESC limit 5;")
store_df.show() 

store_df.printSchema()

+---------+---------+------------+-------+-------+------+--------+--------+-------------+-------------+-------------+----------+----------+-----------+-------------+--------------+--------+---------+
|STORE_KEY|STORE_NUM|  STORE_DESC|   ADDR|   CITY|REGION|CNTRY_CD|CNTRY_NM|POSTAL_ZIP_CD|PROV_STATE_CD|STORE_TYPE_CD|FRNCHS_FLG|STORE_SIZE|MARKET_NAME|SUBMARKET_KEY|SUBMARKET_NAME|LATITUDE|LONGITUDE|
+---------+---------+------------+-------+-------+------+--------+--------+-------------+-------------+-------------+----------+----------+-----------+-------------+--------------+--------+---------+
|      248|      248|store_desc42|addr_42|city_42|   \\N|      US|      US|          \\N|           WI|           WI|      null|      null|        \\N|         null|            22|    null|      3.0|
|     1054|     1054| store_desc1| addr_1| city_1|   \\N|      US|      US|          \\N|           IN|           IN|      null|      null|        \\N|         null|            24|    null|      1.0|


In [0]:
store_df.columns

Out[34]: ['STORE_KEY',
 'STORE_NUM',
 'STORE_DESC',
 'ADDR',
 'CITY',
 'REGION',
 'CNTRY_CD',
 'CNTRY_NM',
 'POSTAL_ZIP_CD',
 'PROV_STATE_CD',
 'STORE_TYPE_CD',
 'FRNCHS_FLG',
 'STORE_SIZE',
 'MARKET_NAME',
 'SUBMARKET_KEY',
 'SUBMARKET_NAME',
 'LATITUDE',
 'LONGITUDE']

# Calendar Data

In [0]:
calendarSchema = StructType([
    StructField("CAL_DT", DateType(), True),
    StructField("CAL_TYPE_DESC", StringType(), True),
    StructField("DAY_OF_WK_NUM", IntegerType(), True),
    StructField("DAY_OF_WK_DESC", StringType(), True),
    StructField("YR_NUM", IntegerType(), True),
    StructField("WK_NUM", IntegerType(), True),
    StructField("YR_WK_NUM", IntegerType(), True),
    StructField("MNTH_NUM", IntegerType(), True),
    StructField("YR_MNTH_NUM", IntegerType(), True),
    StructField("QTR_NUM", IntegerType(), True),
    StructField("YR_QTR_NUM", IntegerType(), True),
    ]
)
# .schema(storeSchema)
#read a file from S3
#change to s3 bucket when running in EMR
calendar_df = spark.read.option("header", "true").option("delimiter", ",").schema(calendarSchema).csv("/FileStore/tables/calendar_20230723.csv")#"file:////home/ec2-user/midterm/data/sales_20230722.csv")

#total sales quantity by week, store, and product
calendar_df.createOrReplaceTempView("calendar")

calendar_df = spark.sql("select * from calendar limit 5;")
calendar_df.show() 

calendar_df.printSchema()

+----------+-------------+-------------+--------------+------+------+---------+--------+-----------+-------+----------+
|    CAL_DT|CAL_TYPE_DESC|DAY_OF_WK_NUM|DAY_OF_WK_DESC|YR_NUM|WK_NUM|YR_WK_NUM|MNTH_NUM|YR_MNTH_NUM|QTR_NUM|YR_QTR_NUM|
+----------+-------------+-------------+--------------+------+------+---------+--------+-----------+-------+----------+
|1998-06-04|       Fiscal|            4|      Thursday|  1998|    23|   199823|       6|      19986|      2|     19982|
|1998-04-27|       Fiscal|            1|        Monday|  1998|    18|   199818|       5|      19985|      2|     19982|
|1998-05-13|       Fiscal|            3|     Wednesday|  1998|    20|   199820|       5|      19985|      2|     19982|
|1998-02-08|       Fiscal|            0|        Sunday|  1998|     7|   199807|       2|      19982|      1|     19981|
|1998-04-08|       Fiscal|            3|     Wednesday|  1998|    15|   199815|       4|      19984|      2|     19982|
+----------+-------------+-------------+

##Q1 : stock level by then end of the week : stock_on_hand_qty by the end of the week (only the stock level at the end day of the week)

In [0]:
spark.sql("""select i.CAL_DT, i.INVENTORY_ON_HAND_QTY, c.DAY_OF_WK_NUM
          from inventory i
          join calendar c
          on i.CAL_DT = c.CAL_DT
          where c.DAY_OF_WK_NUM =5
          order by i.cal_dt desc
          limit 10;
          """).show()

+----------+---------------------+-------------+
|    CAL_DT|INVENTORY_ON_HAND_QTY|DAY_OF_WK_NUM|
+----------+---------------------+-------------+
|2023-07-21|                  5.6|            5|
|2023-07-21|                54.18|            5|
|2023-07-21|                31.68|            5|
|2023-07-21|                 5.76|            5|
|2023-07-21|                13.44|            5|
|2023-07-21|                 24.8|            5|
|2023-07-21|                 19.8|            5|
|2023-07-21|                  4.0|            5|
|2023-07-21|                 9.45|            5|
|2023-07-21|                 5.12|            5|
+----------+---------------------+-------------+



In [0]:
spark.sql("""select * 
          from inventory  
          order by cal_dt desc
          limit 5;
          """).show()

spark.sql("""select * 
          from calendar  
          order by cal_dt desc
          limit 5;
          """).show()

spark.sql("""select distinct DAY_OF_WK_NUM
          from calendar  
          limit 5;
          """).show()

+----------+---------+--------+---------------------+----------------------+----------------+---------+-------------+----------------+
|    CAL_DT|STORE_KEY|PROD_KEY|INVENTORY_ON_HAND_QTY|INVENTORY_ON_ORDER_QTY|OUT_OF_STOCK_FLG|WASTE_QTY|PROMOTION_FLG|NEXT_DELIVERY_DT|
+----------+---------+--------+---------------------+----------------------+----------------+---------+-------------+----------------+
|2023-07-23|      248|  752280|                 12.0|                  18.0|               0|      1.0|         true|      2012-08-12|
|2023-07-23|     1103|  752280|                 38.0|                  10.0|               1|      1.0|         true|      2012-08-10|
|2023-07-23|      248|  954944|                 44.8|                 42.56|               0|      1.0|        false|      2012-08-01|
|2023-07-23|     1054|  752280|                  9.8|                  25.2|               0|      1.0|        false|      2012-08-04|
|2023-07-23|     1054|  954944|                 22.4|  

In [0]:
-- join 

In [0]:
SELECT MAX(stock_on_hand_qty) AS stock_level_by_end_of_week
FROM stock_table
JOIN calendar_table ON stock_table.date = calendar_table.date
WHERE calendar_table.week_end_date = (SELECT MAX(week_end_date) FROM calendar_table)


In [0]:
-

In [0]:
# File location and type
file_location = "/FileStore/tables/sales_20230723.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_sales = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df_sales)