In [2]:
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession, functions as F, types as T
from datetime import datetime
findspark.init()

## Dataset Description


**product_id**: This attribute represents the unique identifier for each product in the dataset. Each product is assigned a specific ID (e.g., P0001).

**store_id**: This attribute represents the unique identifier for each store where the product is sold. Each store is assigned a specific ID (e.g., S0002).

**date**: This attribute represents the date of sales data. It indicates when the sales, revenue, stock, and other information were recorded for a particular product in a specific store.

**sales**: This attribute represents the number of units of the product sold on a given date in a particular store. It indicates the quantity of the product that was purchased.

**revenue**: This attribute represents the total revenue generated from the sales of the product on a given date in a specific store. It is calculated by multiplying the number of units sold (sales) by the price per unit (price).

**stock**: This attribute represents the quantity of the product available in stock at the beginning of the day on the specified date in the given store.

**price**: This attribute represents the price per unit of the product on a given date in a specific store. It indicates the amount charged to the customer for each unit of the product.

**promo_type_1**: This attribute represents the type of promotion (if any) applied to the product. It indicates the first type of promotional activity associated with the product, such as discounts, special offers, or marketing campaigns.

**promo_bin_1**: This attribute represents the specific promotional bin (if any) associated with the first type of promotion. It provides additional details about the nature or category of the promotion.

**promo_type_2**: This attribute represents the type of secondary promotion (if any) applied to the product. It indicates another type of promotional activity associated with the product, similar to promo_type_1 but potentially different in nature or timing.

These attributes collectively provide detailed information about the sales, revenue, pricing, and promotional activities associated with each product in various stores over time.


In [3]:
try:
    spark.stop()
except:
    pass

spark = SparkSession.builder.appName("Retail").getOrCreate()
sc = spark.sparkContext

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/08 16:16:59 WARN Utils: Your hostname, omar, resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlo1)
25/12/08 16:16:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/08 16:17:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
%%bash

cd data
fileName='Retailsales.csv'

if test -f $fileName;then
    echo "File Exists"
else
    echo "Downloading the file"
    wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/XXlNzqYcxqkTbllc-tL_0w/Retailsales.csv
    mv Retailsales.csv data/
fi


File Exists


In [6]:
#Load raw_data as RDD
raw_data = sc.textFile("/home/omar/me/Data-Engineering/Data-Engineering-Course/Spark/data/Retailsales.csv")

In [7]:
raw_data.take(5)

                                                                                

['product_id,store_id,date,sales,revenue,stock,price,promo_type_1,promo_bin_1,promo_type_2',
 'P0001,S0002,1/2/2017,0,0,8,6.25,PR14,,PR03',
 'P0001,S0012,1/2/2017,1,5.3,0,6.25,PR14,,PR03',
 'P0001,S0013,1/2/2017,2,10.59,0,6.25,PR14,,PR03',
 'P0001,S0023,1/2/2017,0,0,6,6.25,PR14,,PR03']

In [8]:
print(type(raw_data))

<class 'pyspark.core.rdd.RDD'>


In [30]:
def parse_line(line):
    fields  = line.split(',')
    
    return {'product_id': fields[0],
        'store_id': fields[1],
        'date': fields[2],
        'sales': float(fields[3]),
        'revenue': float(fields[4]),
        'stock': float(fields[5]),
        'price': float(fields[6]),
        'promo_type_1': fields[7],
        'promo_type_2': fields[9]}
        
header = raw_data.first()
rawDataWithNoHeader = raw_data.filter(lambda i: i!= header)

#Now each line of data is converted into dictionary instead of being a string
parsedData = rawDataWithNoHeader.map(parse_line)

#This removes all columns that contains None values (keeps dictionaries and all data types but None)
parsedData = parsedData.filter(lambda i: i is not None)

print(parsedData.count())


1033434


                                                                                

In [32]:
#Clean the  data from invalid data
cleanedData = parsedData.filter(lambda x: x['price'] > 0 and x['sales'] > 0)
print(cleanedData.count())

[Stage 23:>                                                         (0 + 2) / 2]

196661


                                                                                

In [33]:
print(f"Number of partitions of data = {cleanedData.getNumPartitions()}")

Number of partitions of data = 2


In [39]:
def count_in_partition(index,iterator):
    count = sum(1 for _ in iterator)
    yield(index, count)
    
    
partitions_info = cleanedData.mapPartitionsWithIndex(count_in_partition).collect()
for index,count in  partitions_info:
    print(f"partition number {index} contains {count} records")

[Stage 27:>                                                         (0 + 2) / 2]

partition number 0 contains 97534 records
partition number 1 contains 99127 records


                                                                                

In [40]:
header

'product_id,store_id,date,sales,revenue,stock,price,promo_type_1,promo_bin_1,promo_type_2'

In [45]:
sales_revenue_per_product = cleanedData.map(lambda x: (x['product_id'], (x['sales'], x['revenue'])))
sales_revenue_per_product = sales_revenue_per_product.reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1]))

In [48]:
sales_revenue_per_store = cleanedData.map(lambda x: (x['store_id'], (x['sales'],x['revenue']))).reduceByKey(lambda a,b : (a[0]+b[0], a[1]+b[1]))

In [49]:
sales_revenue_per_store.collect()

                                                                                

[('S0012', (4023.0, 11399.660000000003)),
 ('S0056', (8212.510000000002, 30396.319999999934)),
 ('S0106', (5526.99, 13176.459999999994)),
 ('S0002', (10077.98, 29115.07999999992)),
 ('S0040', (9843.920000000002, 33434.97999999988)),
 ('S0130', (938.5699999999999, 2313.8600000000006)),
 ('S0008', (2243.0, 6547.119999999997)),
 ('S0025', (2825.0, 7058.309999999997)),
 ('S0027', (3070.0, 7987.299999999994)),
 ('S0043', (6619.65, 20601.089999999986)),
 ('S0051', (7552.509999999998, 31624.72999999991)),
 ('S0058', (5803.919999999999, 19454.640000000003)),
 ('S0066', (5332.09, 25313.229999999956)),
 ('S0078', (2151.62, 7890.179999999995)),
 ('S0087', (5114.39, 12696.860000000011)),
 ('S0123', (4216.26, 11412.520000000004)),
 ('S0125', (5556.0, 22470.77999999999)),
 ('S0139', (2690.13, 8865.159999999987)),
 ('S0033', (2520.0, 6334.499999999996)),
 ('S0038', (22120.030000000002, 51650.19999999989)),
 ('S0014', (3299.3500000000004, 10988.30999999999)),
 ('S0024', (8617.0, 26969.539999999935)),


In [50]:
average_price_per_product = cleanedData.map(lambda x: (x['product_id'], (x['price'],1)))
average_price_per_product = average_price_per_product.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
average_price_per_product = average_price_per_product.mapValues(lambda x: x[0]/x[1])
average_price_per_product.collect()

                                                                                

[('P0001', 6.4245283018867925),
 ('P0004', 4.5),
 ('P0015', 2.591105354058704),
 ('P0024', 1.9401595744680864),
 ('P0046', 34.5),
 ('P0051', 0.7000000000000284),
 ('P0057', 13.91843317972352),
 ('P0079', 2.2443181818181817),
 ('P0092', 1.6624203821656123),
 ('P0102', 105.70645161290324),
 ('P0109', 8.986131386861333),
 ('P0110', 2.9390438247012036),
 ('P0125', 2.942725258493282),
 ('P0129', 26.0178325123156),
 ('P0132', 6.304606741573042),
 ('P0134', 7.046788990825675),
 ('P0137', 23.899999999999988),
 ('P0140', 5.471069182389937),
 ('P0148', 5.635740072202168),
 ('P0157', 7.0915384615384776),
 ('P0162', 4.415068493150685),
 ('P0165', 1.4000000000000028),
 ('P0171', 11.666367980884106),
 ('P0174', 3.0),
 ('P0187', 21.393405405405403),
 ('P0197', 3.497004357298475),
 ('P0201', 14.900000000000052),
 ('P0204', 5.887804878048784),
 ('P0211', 8.95),
 ('P0214', 3.9),
 ('P0220', 1.75),
 ('P0225', 2.3852941176470592),
 ('P0226', 1.5981927710843344),
 ('P0229', 6.8500000000000005),
 ('P0252', 1

#### Lets explain how `reduceByKey` function works
- It's applied on RDD of tuples (key, value)
- Takes a function as a paramter
- Group RDD by the key
- for example we want to get the sum per something
    1. group all records  by key
    2. accumulate to sum of each value (result = first+second and then result = result+third and so on until the last of RDD)

In [54]:
#Total Stock per product
print(header)
stock_per_product = cleanedData.map(lambda x: (x['product_id'], x['stock'])).reduceByKey(lambda acc, next: acc+next)


product_id,store_id,date,sales,revenue,stock,price,promo_type_1,promo_bin_1,promo_type_2


In [55]:
stock_per_product.collect()

                                                                                

[('P0001', 476.0),
 ('P0004', 621.0),
 ('P0015', 13273.0),
 ('P0024', 5716.0),
 ('P0046', 3060.0),
 ('P0051', 476816.0),
 ('P0057', 1317.0),
 ('P0079', 161374.0),
 ('P0092', 19891.0),
 ('P0102', 237.0),
 ('P0109', 2040.0),
 ('P0110', 7917.0),
 ('P0125', 244948.0),
 ('P0129', 64266.0),
 ('P0132', 2749.0),
 ('P0134', 2854.0),
 ('P0137', 205.0),
 ('P0140', 22957.0),
 ('P0148', 2842.0),
 ('P0157', 6524.0),
 ('P0162', 3159.0),
 ('P0165', 5274.0),
 ('P0171', 12724.0),
 ('P0174', 3050.0),
 ('P0187', 1262.0),
 ('P0197', 23372.0),
 ('P0201', 1973.0),
 ('P0204', 4191.0),
 ('P0211', 469.0),
 ('P0214', 24.0),
 ('P0220', 10644.0),
 ('P0225', 1095.0),
 ('P0226', 3165.0),
 ('P0229', 517.0),
 ('P0252', 785.0),
 ('P0258', 2276.0),
 ('P0260', 8226.0),
 ('P0261', 130707.0),
 ('P0263', 108.0),
 ('P0267', 879.0),
 ('P0268', 26033.0),
 ('P0277', 27091.0),
 ('P0280', 11150.0),
 ('P0282', 36177.0),
 ('P0287', 2592.0),
 ('P0294', 2570.0),
 ('P0297', 778.0),
 ('P0311', 9623.0),
 ('P0325', 50481.0),
 ('P0332', 9

In [None]:
#Save results to HDFS
sales_revenue_per_product.saveAsTextFile('data/Sales_Revenue_Per_Product')
sales_revenue_per_store.saveAsTextFile('data/Sales_Revenue_Per_Store')
average_price_per_product.saveAsTextFile('data/Average_Price_Per_Product')
stock_per_product.saveAsTextFile('data/Stock_Per_Product')

                                                                                