<a href="https://colab.research.google.com/github/Rohit-Singh12/Data-Analysis/blob/main/Spark/Distributed_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Reference: Coursera PySpark Course
https://www.coursera.org/learn/machine-learning-with-apache-spark/ungradedLti/nmhLJ/hands-on-lab-distributed-architecture-of-spark

In [1]:
!pip install pyspark findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


# Import necessary spark libraries

In [4]:
# findspark simplifies the usage of pyspark
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession

# Initialize spark Context
Set the name of spark context

In [28]:
from pyspark import SparkContext
from datetime import datetime

# Initialize spark context
sc = SparkContext(appName="RetailStoreSalesAnalysis")

In [29]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/XXlNzqYcxqkTbllc-tL_0w/Retailsales.csv

--2025-06-28 06:47:15--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/XXlNzqYcxqkTbllc-tL_0w/Retailsales.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.45.118.108
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.45.118.108|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 47593992 (45M) [text/csv]
Saving to: ‘Retailsales.csv.1’


2025-06-28 06:47:17 (38.7 MB/s) - ‘Retailsales.csv.1’ saved [47593992/47593992]



# Visualize data

In [30]:
spark_session = SparkSession.builder.appName("RetailStoreSalesAnalysis").getOrCreate()

In [31]:
spark_data = spark_session.read.csv('Retailsales.csv', header=True, inferSchema=True)

In [32]:
spark_data.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- revenue: double (nullable = true)
 |-- stock: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- promo_type_1: string (nullable = true)
 |-- promo_bin_1: string (nullable = true)
 |-- promo_type_2: string (nullable = true)



In [33]:
spark_data.head(5)

[Row(product_id='P0001', store_id='S0002', date='1/2/2017', sales=0.0, revenue=0.0, stock=8, price=6.25, promo_type_1='PR14', promo_bin_1=None, promo_type_2='PR03'),
 Row(product_id='P0001', store_id='S0012', date='1/2/2017', sales=1.0, revenue=5.3, stock=0, price=6.25, promo_type_1='PR14', promo_bin_1=None, promo_type_2='PR03'),
 Row(product_id='P0001', store_id='S0013', date='1/2/2017', sales=2.0, revenue=10.59, stock=0, price=6.25, promo_type_1='PR14', promo_bin_1=None, promo_type_2='PR03'),
 Row(product_id='P0001', store_id='S0023', date='1/2/2017', sales=0.0, revenue=0.0, stock=6, price=6.25, promo_type_1='PR14', promo_bin_1=None, promo_type_2='PR03'),
 Row(product_id='P0001', store_id='S0025', date='1/2/2017', sales=0.0, revenue=0.0, stock=1, price=6.25, promo_type_1='PR14', promo_bin_1=None, promo_type_2='PR03')]

In [34]:
# spark_session.stop()

# Loading Dataset

In [35]:
# The data is downloaded as RDD (Retail Distributed Dataset)
raw_data = sc.textFile("Retailsales.csv")

In [36]:
type(raw_data)

# Parsing and cleaning Dataset

In [39]:
def parse_line(line):
  fields = line.split(',')
  return {
      'product_id': fields[0],
      'store_id': fields[1],
      'date': fields[2],
      'sales': float(fields[3]),
      'revenue': float(fields[4]),
      'stock': float(fields[5]),
      'price': float(fields[6]),
      'promo_type_1': fields[7],
      'promo_type_2': fields[9]
  }

In [40]:
header = raw_data.first()
header

'product_id,store_id,date,sales,revenue,stock,price,promo_type_1,promo_bin_1,promo_type_2'

In [41]:
raw_data_no_header = raw_data.filter(lambda x: x != header)

In [42]:
parsed_data = raw_data_no_header.map(parse_line)

In [43]:
parsed_data = parsed_data.filter(lambda x: x is not None)
cleaned_data = parsed_data.filter(lambda x: x['sales'] > 0 and x['price'] > 0)

# Partitioning

In [44]:
cleaned_data.getNumPartitions()

2

# Partition wise count
Number of counts in each partition

In [45]:
def count_in_partition(idx, itr):
  count = sum(1 for _ in itr)
  yield (idx, count)

In [46]:
partition_info = cleaned_data.mapPartitionsWithIndex(count_in_partition).collect()
for idx, count in partition_info:
  print(f"Partition {idx} : count {count}")

Partition 0 : count 97534
Partition 1 : count 99127


# Aggregations

In [47]:
# Total sales and revenue per product
sales_revenue_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b : (a[0]+b[0], a[1]+b[1]))

In [59]:
format_string = f"{{:<5}} | {{: <9}} | {{:<9}}"
for product in sales_revenue_per_product.collect():
  print(format_string.format(str(product[0]), str(round(product[1][0],2)), str(round(product[1][1],2))))

P0001 | 130.0     | 708.48   
P0004 | 41.0      | 156.25   
P0015 | 1555.0    | 3530.56  
P0024 | 288.0     | 517.23   
P0046 | 425.0     | 10310.79 
P0051 | 26381.0   | 16782.56 
P0057 | 233.0     | 2478.77  
P0079 | 5464.0    | 10949.09 
P0092 | 2567.0    | 3953.26  
P0102 | 31.0      | 2776.98  
P0109 | 308.0     | 2345.69  
P0110 | 1123.0    | 2933.74  
P0125 | 5089.0    | 13873.31 
P0129 | 13106.0   | 295934.59
P0132 | 359.0     | 1933.87  
P0134 | 260.0     | 1678.19  
P0137 | 55.0      | 1042.91  
P0140 | 3077.0    | 16841.9  
P0148 | 705.0     | 3681.21  
P0157 | 513.0     | 3366.49  
P0162 | 399.0     | 1032.92  
P0165 | 420.0     | 545.31   
P0171 | 1007.0    | 9962.67  
P0174 | 282.0     | 783.72   
P0187 | 223.0     | 3394.17  
P0197 | 1560.0    | 4895.49  
P0201 | 412.0     | 5057.87  
P0204 | 90.0      | 405.97   
P0211 | 107.0     | 811.1    
P0214 | 10.0      | 35.02    
P0220 | 797.0     | 1291.14  
P0225 | 93.0      | 163.29   
P0226 | 666.0     | 973.51   
P0229 | 97

In [60]:
# total sales and revenue per store
sales_revenue_per_store = cleaned_data.map(lambda x: (x['store_id'], (x['sales'], x['revenue']))) \
                                      .reduceByKey(lambda a,b : (a[0]+b[0], a[1] + b[1]))
format_string = f"{{:<5}} | {{: <9}} | {{:<9}}"
for sales in sales_revenue_per_store.collect():
  print(format_string.format(str(product[0]), str(round(product[1][0],2)), str(round(product[1][1], 2))))

P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.0       | 77.51    
P0337 | 4.

In [51]:
# average price per product
total_price_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['price'], 1))) \
                                      .reduceByKey(lambda a, b : (a[0]+b[0], a[1]+b[1]))

In [52]:
avg_price_per_product = total_price_per_product.mapValues(lambda x : x[0]/x[1])

In [53]:
avg_price_per_product

PythonRDD[30] at RDD at PythonRDD.scala:53

In [54]:
#sales revenue per promotion type
sales_revenue_per_promo_1 = cleaned_data.map(lambda x: (x['promo_type_1'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
sales_revenue_per_promo_2 = cleaned_data.map(lambda x: (x['promo_type_2'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

In [55]:
# stock analysis per product
stock_per_product = cleaned_data.map(lambda x: (x['product_id'], x['stock'])) \
                                .reduceByKey(lambda a,b : a + b)

# Saving the result as HDFS

In [56]:
sales_revenue_per_product.saveAsTextFile("sales_revenue_per_product")
sales_revenue_per_store.saveAsTextFile("sales_revenue_per_store")
total_price_per_product.saveAsTextFile("total_price_per_product")
avg_price_per_product.saveAsTextFile("avg_price_per_product")
sales_revenue_per_promo_1.saveAsTextFile("sales_revenue_per_promo_1")
sales_revenue_per_promo_2.saveAsTextFile("sales_revenue_per_promo_2")
stock_per_product.saveAsTextFile("stock_per_product")

In [61]:
sc.stop()