# Setup dependencies
I will be using pandas and sklearn for managing data and machine learning.
<details>
    <summary>pip install...</summary>

```python
# Allows to install a python package
pip install package-name
# or install python package with a specific version
pip install package-name==version
```
</details>


In [1]:
# Install PySpark version 3.1.2 silently
#!pip install pyspark==3.1.2 -q
# Install findSpark silently
!pip install findspark -q

In [2]:
# Used to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# Initializing Spark Context

In [3]:
import findspark

# Initializing FindSpark to locate Spark installation
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext
from datetime import datetime

# Initialize Spark context
sc = SparkContext(appName="RetailStoreSalesAnalysis")

# Reading Dataset
Reading dataset like a plain file.

In [4]:
raw_data = sc.textFile("data/Retailsales.csv")

### 4 Parsing and Cleaning Data:

- The `parse_line`function is defined to parse each line of the CSV file into a structured format, extracting fields like **product ID, store ID, date,** etc.
- The header line is removed from the RDD.
- The parsed data is filtered to remove records with missing or invalid data, such as zero or negative sales or price.


In [5]:
# Parse and Clean Data
def parse_line(line):
    # Split the line by comma to get fields
    fields = line.split(",")
    # Return a dictionary with parsed fields
    return {
        'product_id': fields[0],
        'store_id': fields[1],
        'date': fields[2],
        'sales': float(fields[3]),
        'revenue': float(fields[4]),
        'stock': float(fields[5]),
        'price': float(fields[6]),
        'promo_type_1': fields[7],
        'promo_type_2': fields[9]
    }

# 1. Remove the header line
header = raw_data.first()
# Filtering the line that is equal to first line
raw_data_no_header = raw_data.filter(lambda line: line != header)

# 2. Parse the lines into a structured format
parsed_data = raw_data_no_header.map(parse_line)
parsed_data = parsed_data.filter(lambda x: x is not None)


# 3 Filter out records with missing or invalid data
cleaned_data = parsed_data.filter(lambda x: x['sales'] > 0 and x['price'] > 0)

### 5. Partitioning:
The number of partitions in the cleaned data RDD is printed


In [6]:
print(f"Number of partitions in cleaned_data: {cleaned_data.getNumPartitions()}")

Number of partitions in cleaned_data: 2


### 6. Partition-wise Count:
`count_in_partition` function is defined to count the number of records in each partition of the RDD.
This function is applied using `mapPartitionsWithIndex` to get the count of records in each partition, and the results are printed.


In [7]:
# Function to count the number of records in each partition
def count_in_partition(index, iterator):
    count = sum(1 for _ in iterator)
    yield (index, count)


# Get the count of records in each partition
partitions_info = cleaned_data.mapPartitionsWithIndex(count_in_partition).collect()

print("Number of records in each partition:")
for partition, count in partitions_info:
    print(f"Partition {partition}: {count} records")


Number of records in each partition:
Partition 0: 106 records
Partition 1: 180 records


### 7.Aggregations:

#### a. Total Sales and Revenue per Product:
This aggregation calculates the total sales and revenue for each product.
Steps:
- Map each record in cleaned_data to a key-value pair (product ID, (sales, revenue)).
- Use reduceByKey to aggregate the sales and revenue values for each product ID.


In [8]:
# Aggregation 1: Total Sales and Revenue per Product
# a = sales and b = revenue; where 0 refers to first row and 1 refers to second row.
sales_revenue_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

#### b. Total Sales and Revenue per Store:
This aggregation calculates the total sales and revenue for each store.

In [9]:
# Aggregation 2: Total Sales and Revenue per Store
sales_revenue_per_store = cleaned_data.map(lambda x: (x['store_id'], (x['sales'], x['revenue']))) \
                                      .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

#### c. Average Price per Product:
This aggregation calculates the average price for each product.

In [10]:
# Aggregation 3: Average Price per Product
total_price_count_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['price'], 1))) \
                                            .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
average_price_per_product = total_price_count_per_product.mapValues(lambda x: x[0] / x[1])

#### d. Sales and Revenue per Promotion Type:
These aggregations calculate the total sales and revenue for each promotion type (promo_type_1 and promo_type_2).

In [11]:
# Aggregation 4: Sales and Revenue per Promotion Type
sales_revenue_per_promo_1 = cleaned_data.map(lambda x: (x['promo_type_1'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
sales_revenue_per_promo_2 = cleaned_data.map(lambda x: (x['promo_type_2'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

### 8. Stock Analysis per Product:
This aggregation calculates the total stock for each product.

In [12]:
# Aggregation 5: Stock Analysis per Product
stock_per_product = cleaned_data.map(lambda x: (x['product_id'], x['stock'])) \
                                .reduceByKey(lambda a, b: a + b)

### 9. Saving Results:
The results of each aggregation are saved to HDFS (Hadoop Distributed File System) using saveAsTextFile.

### 10. Printing Results:
Finally, we prints the results of each aggregation by collecting the data from the RDDs and iterating over them.

In [13]:
# Print results
print("Total Sales and Revenue per Product:")
print("=" * 35)
for product in sales_revenue_per_product.collect():
    # Create the format string with appropriate padding
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"

    # Print the values using the format string
    print(format_string.format(str(product[0]), str(round(product[1][0],2)), str(round(product[1][1],2))))

print("\n\nTotal Sales and Revenue per Store:")
print("=" * 35)
for store in sales_revenue_per_store.collect():
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"
    print(format_string.format(str(store[0]), str(round(store[1][0],2)), str(round(store[1][1],2))))

print("\n\nAverage Price per Product:")
print("=" * 30)

for product in average_price_per_product.collect():
    format_string = f"{{:<5}} | {{:<9}}"
    print(format_string.format(str(product[0]), str(round(product[1],2))))

print("\n\nSales and Revenue per Promotion Type 1:")
print("=" * 40)
for promo in sales_revenue_per_promo_1.collect():
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"
    print(format_string.format(str(promo[0]), str(round(promo[1][0],2)), str(round(promo[1][1],2))))

print("\n\nSales and Revenue per Promotion Type 2:")
print("=" * 40)
for promo in sales_revenue_per_promo_2.collect():
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"

    print(format_string.format(str(promo[0]), str(round(promo[1][0],2)), str(round(promo[1][1],2))))

print("\n\nStock per Product:")
print("=" * 20)
for product in stock_per_product.collect():
    format_string = f"{{:<5}} | {{:<9}}"
    print(format_string.format(str(product[0]), str(round(product[1],2))))

Total Sales and Revenue per Product:
P0016 | 1.0       | 1.85     
P0185 | 14.0      | 10.5     
P0196 | 9.0       | 74.16    
P0198 | 103.0     | 47.67    
P0201 | 3.0       | 37.89    
P0204 | 2.0       | 10.0     
P0241 | 3.0       | 35.97    
P0504 | 1.0       | 12.63    
P0506 | 21.0      | 102.43   
P0511 | 1.0       | 26.48    
P0514 | 7.0       | 62.3     
P0521 | 7.0       | 71.19    
P0533 | 3.0       | 21.54    
P0550 | 3.46      | 207.44   
P0551 | 22.0      | 37.66    
P0587 | 9.0       | 30.42    
P0590 | 275.0     | 124.05   
P0001 | 6.0       | 31.79    
P0004 | 1.0       | 3.81     
P0015 | 1.0       | 2.41     
P0017 | 10.0      | 13.8     
P0187 | 1.0       | 18.64    
P0195 | 7.0       | 33.92    
P0197 | 27.0      | 77.68    
P0500 | 5.0       | 32.19    
P0503 | 1.0       | 16.2     
P0520 | 20.0      | 47.23    
P0525 | 2.0       | 3.39     
P0527 | 9.0       | 16.28    
P0534 | 1.0       | 1.85     
P0536 | 72.0      | 210.03   
P0543 | 25.0      | 57.79    
P05

# Stop Spark Session

In [18]:
sc.stop()