In [183]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as sf

In [178]:
spark = SparkSession.builder.master("local[*]").appName("jupyterlab").getOrCreate()

sdf_sales = (
    spark.read.format("csv").option("header", "true")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .schema(StructType([
      StructField("saleId",  IntegerType(), False),
      StructField("netSales", FloatType(), False),
      StructField("salesUnits", IntegerType(), False),
      StructField("storeId", IntegerType(), False),
      StructField("dateId", IntegerType(), False),
      StructField("productId", LongType(), False)
    ]))
    .load("resources/sales.csv")
)

sdf_calendar = (
    spark.read.format("csv").option("header", "true")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .schema(StructType([
      StructField("datekey",  IntegerType(), False),
      StructField("datecalendarday", IntegerType(), False),
      StructField("datecalendaryear", IntegerType(), False),
      StructField("weeknumberofseason", IntegerType(), False)
    ]))
    .load("resources/calendar.csv")
)

sdf_product = (
    spark.read.format("csv").option("header", "true")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .schema(StructType([
      StructField("productid",  LongType(), False),
      StructField("division", StringType(), False),
      StructField("gender", StringType(), False),
      StructField("category", StringType(), False)
    ]))
    .load("resources/product.csv")
)

sdf_store = (
    spark.read.format("csv").option("header", "true")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .schema(StructType([
      StructField("storeid",  IntegerType(), False),
      StructField("channel", StringType(), False),
      StructField("country", StringType(), False)
    ]))
    .load("resources/store.csv")
)

In [180]:
def reduceByKeyAndSum(default):
    def func(row):
        res = { "W" + str(w): default for w in range(1, 53) }        
        for e in row:
            (key, value) = e
            key = "W" + str(key)
            if key in res:
                res[key] += value
        return res
    return func

netSalesByWeekUdf = sf.udf(reduceByKeyAndSum(0.0), MapType(StringType(), FloatType()))
salesUnitsByWeekUdf = sf.udf(reduceByKeyAndSum(0), MapType(StringType(), IntegerType()))
    

sdf = sdf_sales \
    .join(sdf_calendar, sf.col("dateId") == sf.col("datekey")) \
    .join(sdf_product.withColumnRenamed("productid", "productkey"), sf.col("productId") == sf.col("productid")) \
    .join(sdf_store.withColumnRenamed("storeid", "storekey"), sf.col("storeId") == sf.col("storeid")) \
    .drop('dateId', 'datekey', 'productId', 'productkey', 'storeId', 'storekey') \
    .groupBy("datecalendaryear", "channel", "division", "gender", "category") \
    .agg(
        netSalesByWeekUdf(sf.collect_list(sf.struct("weeknumberofseason", "netSales"))).alias("netsalesbyweek"),
        salesUnitsByWeekUdf(sf.collect_list(sf.struct("weeknumberofseason", "salesUnits"))).alias("salesunitsbyweek")
    )

In [192]:
def getWeeklyDataRow(dictionary):
    return { "W" + str(w): dictionary["W" + str(w)] for w in range(1, 53) }


for row in sdf.collect():
    year = f"RY{str(row['datecalendaryear'])[2:]}"
    uniqueKey = f"{year}_{row['channel']}_{row['division']}_{row['gender']}_{row['category']}"
    with open("results/" + uniqueKey + ".json", 'w') as file:
        res = {
            "uniqueKey": uniqueKey,
            "division": row["division"],
            "gender": row["gender"],
            "category": row["category"],
            "channel": row["channel"],
            "year": year,
            "dataRows": [
                {
                    "rowId": "Net Sales",
                    "dataRow": getWeeklyDataRow(row["netsalesbyweek"])
                },
                {
                    "rowId": "Sales Units",
                    "dataRow": getWeeklyDataRow(row["salesunitsbyweek"])
                }
            ]
        }
        json.dump(res, file, indent=4)