![](http://spark.apache.org/images/spark-logo.png)


# Leveraging Apache Spark for Efficient Retail Data Processing at RetailWorld


## Objectives

After completing this lab you will be able to:

 - Understand the Distributed Architecture of Spark in the context of a Real Time Problem
 - Perform Data Parsing and Cleaning of Data
 - Perform various aggregations to derive insights from the cleaned data
 - Save the aggregated results to HDFS (Hadoop Distributed File System) for further storage and processing.


## Background
RetailWorld, a prominent retail chain with numerous stores across Metropolia, faces the challenge of processing and analyzing substantial volumes of daily sales data. With real-time data streaming from multiple sources, RetailWorld needs to clean, transform, and aggregate this data to derive actionable insights such as total Sales and Revenue per Product, Total Sales and Revenue per Store, Sales and Revenue per Promotion Type and Stock Analysis per Product.

This dataset is a modified  <a href="https://www.kaggle.com/datasets/berkayalan/retail-sales-data?select=sales.csv">sales</a> dataset taken from th Kaggle website. This data is collected from a Turkish retail company, covering the period from the beginning of 2017 to the end of 2019.
It currently consists of 1033435 records.


## Dataset Description


**product_id**: This attribute represents the unique identifier for each product in the dataset. Each product is assigned a specific ID (e.g., P0001).

**store_id**: This attribute represents the unique identifier for each store where the product is sold. Each store is assigned a specific ID (e.g., S0002).

**date**: This attribute represents the date of sales data. It indicates when the sales, revenue, stock, and other information were recorded for a particular product in a specific store.

**sales**: This attribute represents the number of units of the product sold on a given date in a particular store. It indicates the quantity of the product that was purchased.

**revenue**: This attribute represents the total revenue generated from the sales of the product on a given date in a specific store. It is calculated by multiplying the number of units sold (sales) by the price per unit (price).

**stock**: This attribute represents the quantity of the product available in stock at the beginning of the day on the specified date in the given store.

**price**: This attribute represents the price per unit of the product on a given date in a specific store. It indicates the amount charged to the customer for each unit of the product.

**promo_type_1**: This attribute represents the type of promotion (if any) applied to the product. It indicates the first type of promotional activity associated with the product, such as discounts, special offers, or marketing campaigns.

**promo_bin_1**: This attribute represents the specific promotional bin (if any) associated with the first type of promotion. It provides additional details about the nature or category of the promotion.

**promo_type_2**: This attribute represents the type of secondary promotion (if any) applied to the product. It indicates another type of promotional activity associated with the product, similar to promo_type_1 but potentially different in nature or timing.

These attributes collectively provide detailed information about the sales, revenue, pricing, and promotional activities associated with each product in various stores over time.


## Challenges
Traditional data processing tools are inadequate for handling the velocity and volume of incoming sales data, leading to delays in analysis and decision-making. These delays hinder RetailWorld's ability to respond swiftly to market demands and optimize inventory and sales strategies.

## Solution: Apache Spark
To address these challenges, RetailWorld requires a scalable and efficient solution. Apache Spark, with its distributed computing architecture and robust processing capabilities, is the ideal solution for RetailWorld's data analytics needs. Spark's ability to parallelize data processing tasks across a cluster of nodes enables rapid aggregation and analysis of large datasets. Additionally, its fault-tolerant design ensures reliability and resilience against failures, making it a dependable choice for RetailWorld's critical data processing tasks.

**To know more about the Distributed Architecture of Spark <a  href="https://cf-courses-data.static.labs.skills.network/BmowmNpC6oPodsERIBziYg/DistributedArchApacheSpark-v1.md.html">Click here</a>**.



### 1. Install and import the necessary spark libraries


In [1]:
# Install PySpark version 3.1.2 silently
#!pip install pyspark==3.1.2 -q
# Install findSpark silently
#!pip install findspark -q

!pip install pyspark findspark wget

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark, wget
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=c29d6d4a1011f0faa3244f41e4e089af8f5895eb051e779189d99222ef8568b7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9656 sha256=4b651b630b3664bf2c59a8ad22557cee65fae52b0df31f560df

In [2]:
# Suppressing warnings by defining a function 'warn' that does nothing
def warn(*args, **kwargs):
    pass

# Importing the 'warnings' module to handle warnings
import warnings

# Overriding the 'warn' function in the 'warnings' module with the defined function to suppress warnings
warnings.warn = warn

# Filtering out all warnings to be ignored
warnings.filterwarnings('ignore')

In [3]:
# FindSpark simplifies the process of using Apache Spark with Python
# Importing the 'findspark' module
import findspark

# Initializing FindSpark to locate Spark installation
findspark.init()
findspark.find()

# Importing SparkSession from pyspark.sql module
from pyspark.sql import SparkSession


### 2. Initializing the SparkContext

The Driver Program initializes the Spark Context and sets the name of the Spark application to **"RetailStoreSalesAnalysis"**.


In [4]:
from pyspark import SparkContext
from datetime import datetime

# Initialize Spark context
sc = SparkContext(appName="RetailStoreSalesAnalysis").getOrCreate()

### 3. Loading Data:
It starts by loading data from a CSV file named "sales.csv" using SparkContext's textFile function. The data is loaded as an **RDD (Resilient Distributed Dataset)** named `raw_data`.


In [5]:
# Load Data

!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/JjdiVyBOwapVxvIQ66NImw/sales.csv

raw_data = sc.textFile("sales.csv")

--2024-06-17 14:14:10--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/JjdiVyBOwapVxvIQ66NImw/sales.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 47593992 (45M) [text/csv]
Saving to: ‘sales.csv’


2024-06-17 14:14:12 (39.8 MB/s) - ‘sales.csv’ saved [47593992/47593992]



### 4 Parsing and Cleaning Data:

The `parse_line`function is defined to parse each line of the CSV file into a structured format, extracting fields like **product ID, store ID, date, sales, revenue,** etc.
The header line is removed from the RDD.
The parsed data is filtered to remove records with missing or invalid data, such as zero or negative sales or price.


In [6]:
# Parse and Clean Data
def parse_line(line):
    # Split the line by comma to get fields
    fields = line.split(",")
    # Return a dictionary with parsed fields
    return {
        'product_id': fields[0],
        'store_id': fields[1],
        'date': fields[2],
        'sales': float(fields[3]),
        'revenue': float(fields[4]),
        'stock': float(fields[5]),
        'price': float(fields[6]),
        'promo_type_1': fields[7],
        'promo_type_2': fields[9]
    }

# Remove the header line
header = raw_data.first()

raw_data_no_header = raw_data.filter(lambda line: line != header)

# Parse the lines into a structured format
parsed_data = raw_data_no_header.map(parse_line)


# Filter out records with missing or invalid data
parsed_data = parsed_data.filter(lambda x: x is not None)
cleaned_data = parsed_data.filter(lambda x: x['sales'] > 0 and x['price'] > 0)




In [7]:
cleaned_data.take(2)

[{'product_id': 'P0001',
  'store_id': 'S0012',
  'date': '1/2/2017',
  'sales': 1.0,
  'revenue': 5.3,
  'stock': 0.0,
  'price': 6.25,
  'promo_type_1': 'PR14',
  'promo_type_2': 'PR03'},
 {'product_id': 'P0001',
  'store_id': 'S0013',
  'date': '1/2/2017',
  'sales': 2.0,
  'revenue': 10.59,
  'stock': 0.0,
  'price': 6.25,
  'promo_type_1': 'PR14',
  'promo_type_2': 'PR03'}]

In [9]:
cleaned_data.count()

196661

### 5. Partitioning:

The number of partitions in the cleaned data RDD is checked and printed


In [8]:
# Check the number of partitions
print(f"Number of partitions in cleaned_data: {cleaned_data.getNumPartitions()}")

Number of partitions in cleaned_data: 2


### 5. Partition-wise Count:

Here a function `count_in_partition` is defined to count the number of records in each partition of the RDD.
This function is applied using `mapPartitionsWithIndex` to get the count of records in each partition, and the results are printed.


In [10]:
# Function to count the number of records in each partition
def count_in_partition(index, iterator):
    count = sum(1 for _ in iterator)
    yield (index, count)

# Get the count of records in each partition
partitions_info = cleaned_data.mapPartitionsWithIndex(count_in_partition).collect()
print("Number of records in each partition:")
for partition, count in partitions_info:
    print(f"Partition {partition}: {count} records")


Number of records in each partition:
Partition 0: 97534 records
Partition 1: 99127 records


### 6.Aggregations:

Several aggregations are performed on the cleaned data RDD:
- Total sales and revenue per product.
- Total sales and revenue per store.
- Average price per product.
- Sales and revenue per promotion type 1 and promotion type 2.
- Stock analysis per product.
- Each aggregation is performed using map to transform the data into key-value pairs and reduceByKey to aggregate the values for each key.


#### a. Total Sales and Revenue per Product:
This aggregation calculates the total sales and revenue for each product.
It first maps each record in cleaned_data to a key-value pair, where the key is the product ID and the value is a tuple containing sales and revenue.
Then, it uses reduceByKey to aggregate the sales and revenue values for each product ID.


In [13]:
# Aggregation 1: Total Sales and Revenue per Product
sales_revenue_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
print(f"Number of partitions in sales_revenue_per_product: {sales_revenue_per_product.getNumPartitions()}")

Number of partitions in sales_revenue_per_product: 2


#### b. Total Sales and Revenue per Store:
This aggregation calculates the total sales and revenue for each store.
Similar to the first aggregation, it maps each record to a key-value pair with the store ID as the key and a tuple containing sales and revenue as the value.
It then uses reduceByKey to aggregate the sales and revenue values for each store ID.


In [12]:
# Aggregation 2: Total Sales and Revenue per Store
sales_revenue_per_store = cleaned_data.map(lambda x: (x['store_id'], (x['sales'], x['revenue']))) \
                                      .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

#### c.Average Price per Product:
This aggregation calculates the average price for each product.
It first maps each record to a key-value pair with the product ID as the key and a tuple containing the price and a count of 1.
Then, it uses reduceByKey to aggregate the total price and count of prices for each product.
Finally, it calculates the average price by dividing the total price by the count.


In [14]:
# Aggregation 3: Average Price per Product
total_price_count_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['price'], 1))) \
                                            .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
average_price_per_product = total_price_count_per_product.mapValues(lambda x: x[0] / x[1])

In [16]:
total_price_count_per_product.take(2)

[('P0016', (72.0, 36)), ('P0051', (3217.9000000001306, 4597))]

In [17]:
average_price_per_product.take(2)

[('P0016', 2.0), ('P0051', 0.7000000000000284)]

#### d. Sales and Revenue per Promotion Type:
These aggregations calculate the total sales and revenue for each promotion type (promo_type_1 and promo_type_2).
Each record is mapped to a key-value pair with the promotion type as the key and a tuple containing sales and revenue as the value.
Then, reduceByKey is used to aggregate the sales and revenue values for each promotion type.


In [15]:
# Aggregation 4: Sales and Revenue per Promotion Type
sales_revenue_per_promo_1 = cleaned_data.map(lambda x: (x['promo_type_1'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
sales_revenue_per_promo_2 = cleaned_data.map(lambda x: (x['promo_type_2'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

### 7. Stock Analysis per Product:
This aggregation calculates the total stock for each product.
Each record is mapped to a key-value pair with the product ID as the key and the stock as the value.
Then, reduceByKey is used to aggregate the stock values for each product.


In [18]:
# Aggregation 5: Stock Analysis per Product
stock_per_product = cleaned_data.map(lambda x: (x['product_id'], x['stock'])) \
                                .reduceByKey(lambda a, b: a + b)


In [19]:
stock_per_product.take(2)

[('P0016', 1032.0), ('P0051', 476816.0)]

### 8. Saving Results:

The results of each aggregation are saved to HDFS (Hadoop Distributed File System) using saveAsTextFile.


In [20]:
# Save results to HDFS
sales_revenue_per_product.saveAsTextFile("sales_revenue_per_product")
sales_revenue_per_store.saveAsTextFile("sales_revenue_per_store")
average_price_per_product.saveAsTextFile("average_price_per_product")
sales_revenue_per_promo_1.saveAsTextFile("sales_revenue_per_promo_1")
sales_revenue_per_promo_2.saveAsTextFile("sales_revenue_per_promo_2")
stock_per_product.saveAsTextFile("stock_per_product")


### 9. Printing Results:

Finally, we prints the results of each aggregation by collecting the data from the RDDs and iterating over them.


In [21]:
# Print results
print("Total Sales and Revenue per Product:")
for product in sales_revenue_per_product.collect():
    print(product)

print("Total Sales and Revenue per Store:")
for store in sales_revenue_per_store.collect():
    print(store)

print("Average Price per Product:")
for product in average_price_per_product.collect():
    print(product)

print("Sales and Revenue per Promotion Type 1:")
for promo in sales_revenue_per_promo_1.collect():
    print(promo)

print("Sales and Revenue per Promotion Type 2:")
for promo in sales_revenue_per_promo_2.collect():
    print(promo)

print("Stock per Product:")
for product in stock_per_product.collect():
    print(product)



Total Sales and Revenue per Product:
('P0016', (60.0, 111.08000000000001))
('P0051', (26381.0, 16782.55999999981))
('P0055', (1136.0, 3650.8600000000083))
('P0057', (233.0, 2478.7700000000013))
('P0067', (286.0, 3799.190000000001))
('P0068', (255.0, 1188.1799999999998))
('P0070', (1366.0, 6895.109999999976))
('P0071', (63.0, 484.8499999999999))
('P0079', (5464.0, 10949.089999999924))
('P0097', (138.0, 1506.6800000000003))
('P0103', (122472.0, 254597.93999999814))
('P0109', (308.0, 2345.6899999999982))
('P0114', (6447.0, 2430.8400000000006))
('P0116', (6745.0, 4963.969999999994))
('P0125', (5089.0, 13873.30999999974))
('P0140', (3077.0, 16841.899999999994))
('P0147', (535.0, 1574.6299999999997))
('P0157', (513.0, 3366.4900000000007))
('P0169', (593.0, 2257.8500000000035))
('P0171', (1007.0, 9962.669999999973))
('P0174', (282.0, 783.7199999999991))
('P0183', (241.0, 1108.4900000000007))
('P0185', (4135.0, 3101.25))
('P0196', (854.0, 6182.179999999975))
('P0201', (412.0, 5057.870000000012

### 10.Cleanup:

The Spark context is stopped to release the resources.


In [22]:
# Stop the Spark context
sc.stop()
