In [0]:
import pandas as pd
import json
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql import functions as sf
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.master("local").appName("NikeTest").getOrCreate()

In [0]:
calendar_raw = spark.read.csv("/FileStore/tables/calendar.csv",header=True)
product_raw = spark.read.csv('/FileStore/tables/product.csv',header=True)
store_raw = spark.read.csv('/FileStore/tables/store.csv',header=True)
sales_raw = spark.read.csv('/FileStore/tables/sales.csv',header=True)

In [0]:
#check for duplicates
print(f'the original number of rows in calendar data {calendar_raw.count()}')
print(f'the number of duplicates in calendar = {calendar_raw.count() - calendar_raw.dropDuplicates().count()}')

print(f'the original number of rows in sales data {sales_raw.count()}')
print(f'the number of duplicates in sales = {sales_raw.count() - sales_raw.dropDuplicates().count()}')

print(f'the original number of rows in product data {product_raw.count()}')
print(f'the number of duplicates in product = {product_raw.count() - product_raw.dropDuplicates().count()}')

print(f'the original number of rows in store data {store_raw.count()}')
print(f'the number of duplicates in store = {store_raw.count() - store_raw.dropDuplicates().count()}')

In [0]:
#combine all data
combined_data = sales_raw.join(calendar_raw, sales_raw.dateId == calendar_raw.datekey, how = 'inner')\
                    .join(product_raw, sales_raw.productId == product_raw.productid, how = 'inner')\
                    .join(store_raw, sales_raw.storeId == store_raw.storeid, how = 'inner')
combined_data.show(10)

In [0]:
#create unique column
combined_data = combined_data.withColumn('uniqueKey',sf.concat(sf.lit('RY'),combined_data.datecalendaryear.substr(3,2)\
                         ,sf.lit('_'), combined_data.channel, sf.lit('_'), combined_data.division, sf.lit('_')\
                         ,combined_data.gender, sf.lit('_'), combined_data.category))

In [0]:
combined_data = combined_data.withColumn('netSales', combined_data['netSales'].cast(IntegerType()))
combined_data = combined_data.withColumn('salesUnits', sf.col('salesUnits').cast('integer'))

In [0]:
combined_data.describe()

In [0]:
#Aggregate at unique key, divison, gender , category, channel and weeknumber of season - sum netSales, salesUnits
calc_data = combined_data.groupBy('uniqueKey','division','gender','category'\
                                  ,'channel','weeknumberofseason')\
                                    .sum('netSales','salesUnits')

In [0]:
calc_data.columns

In [0]:
calc_data = calc_data.withColumn('year', calc_data.uniqueKey.substr(1,4))

In [0]:
#convert to Pandas dataframe
calc_data = calc_data.toPandas()

In [0]:
calc_data = calc_data.rename(columns={'sum(netSales)':'netSales','sum(salesUnits)':'salesUnits'})

In [0]:
#Create List and Dictionary for the result json file
insert = {}
datarow1 = {}
datarow2 = {}

for i in range(calc_data.shape[0]):
    insert['uniqueKey'] = calc_data.loc[i,'uniqueKey']
    insert['division'] = calc_data.loc[i,'division']
    insert['gender'] = calc_data.loc[i,'gender']
    insert['category'] = calc_data.loc[i,'category']
    insert['channel'] = calc_data.loc[i,'channel']
    insert['year'] = calc_data.loc[i,'year']
    insert['dataRow'] = {}
    datarow1['rowId'] = 'netSales'
    datarow1['dataRow'] = {}
    datarow2['rowId'] = 'salesUnits'
    datarow2['dataRow'] = {}
    for week in range(1,53):
        if str(week) == calc_data.loc[i,'weeknumberofseason']:
            datarow1['dataRow']['W'+ str(calc_data.loc[i,'weeknumberofseason'])] = str(calc_data.loc[i,'netSales'])
            datarow2['dataRow']['W'+ str(calc_data.loc[i,'weeknumberofseason'])] = str(calc_data.loc[i,'salesUnits'])
        else:
            datarow1['dataRow']['W'+ str(week)] = str(0)
            datarow2['dataRow']['W'+ str(week)] = str(0)
    insert['dataRow'] = [datarow1,datarow2]
    print(insert,"\n\n\n")
    jsonFile = 'consumption_' + insert['uniqueKey'] + '.json'
    with open(jsonFile,'w') as f:
        json.dump(insert,f) 