<a href="https://colab.research.google.com/github/armahdavi/BigData_pyspark_practice/blob/main/PySpark_retail_sales_data_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Bucketizer

In [2]:
# Start a spark session
spark = SparkSession.builder.appName('PySpark Demo').getOrCreate()

In [3]:
csv_file = '/content/sample_data/retail_sales_dataset.csv'

df = spark.read \
    .format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load(csv_file)

In [4]:
df.createOrReplaceTempView('u')
df.show()

+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+
|Transaction ID|      Date|Customer ID|Gender|Age|Product Category|Quantity|Price per Unit|Total Amount|
+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+
|             1|2023-11-24|    CUST001|  Male| 34|          Beauty|       3|            50|         150|
|             2|2023-02-27|    CUST002|Female| 26|        Clothing|       2|           500|        1000|
|             3|2023-01-13|    CUST003|  Male| 50|     Electronics|       1|            30|          30|
|             4|2023-05-21|    CUST004|  Male| 37|        Clothing|       1|           500|         500|
|             5|2023-05-06|    CUST005|  Male| 30|          Beauty|       2|            50|         100|
|             6|2023-04-25|    CUST006|Female| 45|          Beauty|       1|            30|          30|
|             7|2023-03-13|    CUST007|  Male| 46|     

In [5]:
# Find the most sold product category

sales_by_cateogry = df.groupBy('Product Category').agg(sum('Quantity').alias('Total Quantity Sold'))
sales_by_cateogry.orderBy('Total Quantity Sold', ascending = False).show()
sales_by_cateogry.orderBy(desc('Total Quantity Sold')).show() # See the results are the same despite syntax difference

+----------------+-------------------+
|Product Category|Total Quantity Sold|
+----------------+-------------------+
|        Clothing|                894|
|     Electronics|                849|
|          Beauty|                771|
+----------------+-------------------+

+----------------+-------------------+
|Product Category|Total Quantity Sold|
+----------------+-------------------+
|        Clothing|                894|
|     Electronics|                849|
|          Beauty|                771|
+----------------+-------------------+



In [6]:
# Most sold category
most_sold_category = sales_by_cateogry.orderBy('Total Quantity Sold', ascending = False).limit(1)
most_sold_category.show()

+----------------+-------------------+
|Product Category|Total Quantity Sold|
+----------------+-------------------+
|        Clothing|                894|
+----------------+-------------------+



In [7]:
# Find the most revenue generating product categories.
rev_by_category = df.groupBy('Product Category').agg(sum('Total Amount').alias('Total Amount Generated'))
rev_by_category.orderBy('Total Amount Generated', ascending = False).show()

+----------------+----------------------+
|Product Category|Total Amount Generated|
+----------------+----------------------+
|     Electronics|                156905|
|        Clothing|                155580|
|          Beauty|                143515|
+----------------+----------------------+



In [8]:
# Find the top revenue generating product
rev_by_category_top = df.groupBy('Product Category').agg(sum('Total Amount').alias('Total Amount Generated')).limit(1)
rev_by_category_top.show()

+----------------+----------------------+
|Product Category|Total Amount Generated|
+----------------+----------------------+
|     Electronics|                156905|
+----------------+----------------------+



In [9]:
# Create the top 10 list of most values customers.
revenue_by_customer = df.groupBy('Customer ID').agg(sum('Total Amount').alias('Total Revenue'))
revenue_by_customer.orderBy('Total Revenue', ascending = False).show(10)
# revenue_by_customer.show()

+-----------+-------------+
|Customer ID|Total Revenue|
+-----------+-------------+
|    CUST412|         2000|
|    CUST257|         2000|
|    CUST093|         2000|
|    CUST072|         2000|
|    CUST743|         2000|
|    CUST269|         2000|
|    CUST476|         2000|
|    CUST946|         2000|
|    CUST487|         2000|
|    CUST577|         2000|
+-----------+-------------+
only showing top 10 rows



In [10]:
# Find which gender buys which product category the most
gender_product_category = df.groupBy('Gender', 'Product Category').count()
gender_product_category.show()

+------+----------------+-----+
|Gender|Product Category|count|
+------+----------------+-----+
|  Male|        Clothing|  177|
|Female|        Clothing|  174|
|  Male|     Electronics|  172|
|Female|          Beauty|  166|
|  Male|          Beauty|  141|
|Female|     Electronics|  170|
+------+----------------+-----+



In [11]:
# Bucketize the age
splits = list(range(10, 71, 10))  # [10, 20, 30, 40, 50, 60]
# splits = [float('-inf')] + splits + [float('inf')]  # Handle out-of-range values by including -inf and inf
labels = [f'{splits[i]}-{splits[i+1]}' for i in range(len(splits) - 1)] # Adjust labels to cover all splits

# Initialize the Bucketizer
bucketizer = Bucketizer(
    splits=splits,
    inputCol='Age',
    outputCol='AgeBucket'
)

# Apply the Bucketizer to the DataFrame
bucketized_df = bucketizer.transform(df)

# Define a UDF to map bucket indices to labels
def bucket_to_label(bucket):
    return labels[int(bucket)] if 0 <= bucket < len(labels) else 'Out of Range'

# Add a new column with human-readable labels
bucket_to_label_udf = udf(bucket_to_label, StringType())
labeled_df = bucketized_df.withColumn('Age Category', bucket_to_label_udf(col('AgeBucket')))

# Show the resulting DataFrame
labeled_df.show()

+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+---------+------------+
|Transaction ID|      Date|Customer ID|Gender|Age|Product Category|Quantity|Price per Unit|Total Amount|AgeBucket|Age Category|
+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+---------+------------+
|             1|2023-11-24|    CUST001|  Male| 34|          Beauty|       3|            50|         150|      2.0|       30-40|
|             2|2023-02-27|    CUST002|Female| 26|        Clothing|       2|           500|        1000|      1.0|       20-30|
|             3|2023-01-13|    CUST003|  Male| 50|     Electronics|       1|            30|          30|      4.0|       50-60|
|             4|2023-05-21|    CUST004|  Male| 37|        Clothing|       1|           500|         500|      2.0|       30-40|
|             5|2023-05-06|    CUST005|  Male| 30|          Beauty|       2|            50|         100|

In [12]:
# Create a chart for age category vs. sales
sales_by_age_category = labeled_df.groupBy('Age Category').agg(sum('Total Amount').alias('Total Sales'))
sales_by_age_category.orderBy(desc('Age Category')).show()

+------------+-----------+
|Age Category|Total Sales|
+------------+-----------+
|       60-70|      44815|
|       50-60|      98340|
|       40-50|      93365|
|       30-40|      96325|
|       20-30|      97070|
|       10-20|      26085|
+------------+-----------+

