In [1]:
# Install PySpark version 3.1.2 silently
!pip install pyspark==3.1.2 -q
# Install findSpark silently
!pip install findspark -q

In [2]:
# Suppressing warnings by defining a function 'warn' that does nothing
def warn(*args, **kwargs):
    pass

# Importing the 'warnings' module to handle warnings
import warnings

# Overriding the 'warn' function in the 'warnings' module with the defined function to suppress warnings
warnings.warn = warn

# Filtering out all warnings to be ignored
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python
# Importing the 'findspark' module
import findspark

# Initializing FindSpark to locate Spark installation
findspark.init()

# Importing SparkSession from pyspark.sql module
from pyspark.sql import SparkSession


In [3]:
from pyspark import SparkContext
from datetime import datetime

# Initialize Spark context
sc = SparkContext(appName="RetailStoreSalesAnalysis")

24/11/15 10:10:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/XXlNzqYcxqkTbllc-tL_0w/Retailsales.csv


--2024-11-15 10:10:29--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/XXlNzqYcxqkTbllc-tL_0w/Retailsales.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 47593992 (45M) [text/csv]
Saving to: ‘Retailsales.csv.2’


2024-11-15 10:10:31 (36.7 MB/s) - ‘Retailsales.csv.2’ saved [47593992/47593992]



In [5]:
raw_data = sc.textFile("Retailsales.csv")

print(raw_data.take(5))

[Stage 0:>                                                          (0 + 1) / 1]

['product_id,store_id,date,sales,revenue,stock,price,promo_type_1,promo_bin_1,promo_type_2', 'P0001,S0002,1/2/2017,0,0,8,6.25,PR14,,PR03', 'P0001,S0012,1/2/2017,1,5.3,0,6.25,PR14,,PR03', 'P0001,S0013,1/2/2017,2,10.59,0,6.25,PR14,,PR03', 'P0001,S0023,1/2/2017,0,0,6,6.25,PR14,,PR03']


                                                                                

In [6]:
# Parse and Clean Data
def parse_line(line):
    # Split the line by comma to get fields
    fields = line.split(",")
    # Return a dictionary with parsed fields
    return {
        'product_id': fields[0],
        'store_id': fields[1],
        'date': fields[2],
        'sales': float(fields[3]),
        'revenue': float(fields[4]),
        'stock': float(fields[5]),
        'price': float(fields[6]),
        'promo_type_1': fields[7],
        'promo_type_2': fields[9]
    }

# Remove the header line
header = raw_data.first()

raw_data_no_header = raw_data.filter(lambda line: line != header)

# Parse the lines into a structured format
parsed_data = raw_data_no_header.map(parse_line)
parsed_data = parsed_data.filter(lambda x: x is not None)


# Filter out records with missing or invalid data
cleaned_data = parsed_data.filter(lambda x: x['sales'] > 0 and x['price'] > 0)



In [7]:
print(raw_data_no_header.take(5))

['P0001,S0002,1/2/2017,0,0,8,6.25,PR14,,PR03', 'P0001,S0012,1/2/2017,1,5.3,0,6.25,PR14,,PR03', 'P0001,S0013,1/2/2017,2,10.59,0,6.25,PR14,,PR03', 'P0001,S0023,1/2/2017,0,0,6,6.25,PR14,,PR03', 'P0001,S0025,1/2/2017,0,0,1,6.25,PR14,,PR03']


In [8]:
# Check the number of partitions
print(f"Number of partitions in cleaned_data: {cleaned_data.getNumPartitions()}")

Number of partitions in cleaned_data: 2


In [9]:
# Function to count the number of records in each partition
def count_in_partition(index, iterator):
    count = sum(1 for _ in iterator)
    yield (index, count)

# Get the count of records in each partition
partitions_info = cleaned_data.mapPartitionsWithIndex(count_in_partition).collect()
print("Number of records in each partition:")
for partition, count in partitions_info:
    print(f"Partition {partition}: {count} records")


[Stage 3:>                                                          (0 + 2) / 2]

Number of records in each partition:
Partition 0: 97534 records
Partition 1: 99127 records


                                                                                

In [10]:
# Aggregation 1: Total Sales and Revenue per Product
sales_revenue_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
print(f"Number of partitions in cleaned_data: {cleaned_data.getNumPartitions()}")

Number of partitions in cleaned_data: 2


In [11]:
# Aggregation 2: Total Sales and Revenue per Store
sales_revenue_per_store = cleaned_data.map(lambda x: (x['store_id'], (x['sales'], x['revenue']))) \
                                      .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

In [12]:
# Aggregation 3: Average Price per Product
total_price_count_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['price'], 1))) \
                                            .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
average_price_per_product = total_price_count_per_product.mapValues(lambda x: x[0] / x[1])

In [13]:
# Aggregation 4: Sales and Revenue per Promotion Type
sales_revenue_per_promo_1 = cleaned_data.map(lambda x: (x['promo_type_1'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
sales_revenue_per_promo_2 = cleaned_data.map(lambda x: (x['promo_type_2'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

In [14]:
# Aggregation 5: Stock Analysis per Product
stock_per_product = cleaned_data.map(lambda x: (x['product_id'], x['stock'])) \
                                .reduceByKey(lambda a, b: a + b)


In [15]:
# Save results to HDFS
sales_revenue_per_product.saveAsTextFile("sales_revenue_per_product")
sales_revenue_per_store.saveAsTextFile("sales_revenue_per_store")
average_price_per_product.saveAsTextFile("average_price_per_product")
sales_revenue_per_promo_1.saveAsTextFile("sales_revenue_per_promo_1")
sales_revenue_per_promo_2.saveAsTextFile("sales_revenue_per_promo_2")
stock_per_product.saveAsTextFile("stock_per_product")

Py4JJavaError: An error occurred while calling o166.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/resources/BD0231EN/labs/sales_revenue_per_product already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:287)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)


In [None]:
# Print results
print("Total Sales and Revenue per Product:")
print("=" * 35)
for product in sales_revenue_per_product.collect():
    # Create the format string with appropriate padding
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"

    # Print the values using the format string
    print(format_string.format(str(product[0]), str(round(product[1][0],2)), str(round(product[1][1],2))))

print("\n\nTotal Sales and Revenue per Store:")
print("=" * 35)
for store in sales_revenue_per_store.collect():
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"
    print(format_string.format(str(store[0]), str(round(store[1][0],2)), str(round(store[1][1],2))))

print("\n\nAverage Price per Product:")
print("=" * 30)

for product in average_price_per_product.collect():
    format_string = f"{{:<5}} | {{:<9}}"
    print(format_string.format(str(product[0]), str(round(product[1],2))))

print("\n\nSales and Revenue per Promotion Type 1:")
print("=" * 40)
for promo in sales_revenue_per_promo_1.collect():
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"
    print(format_string.format(str(promo[0]), str(round(promo[1][0],2)), str(round(promo[1][1],2))))

print("\n\nSales and Revenue per Promotion Type 2:")
print("=" * 40)
for promo in sales_revenue_per_promo_2.collect():
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"

    print(format_string.format(str(promo[0]), str(round(promo[1][0],2)), str(round(promo[1][1],2))))

print("\n\nStock per Product:")
print("=" * 20)
for product in stock_per_product.collect():
    format_string = f"{{:<5}} | {{:<9}}"
    print(format_string.format(str(product[0]), str(round(product[1],2))))

