<a href="https://colab.research.google.com/github/DevD000/hexaware_training_2/blob/main/Exercise_2_and__3_on_03_09_24.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
### **Exercise: Analyzing a Sample Sales Dataset Using PySpark**
### **Part 1: Dataset Preparation**

#### **Step 1: Generate the Sample Sales Dataset**

import pandas as pd
from datetime import datetime

# Sample sales data
data = {
    "TransactionID": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    "CustomerID": [101, 102, 103, 101, 104, 102, 103, 104, 101, 105],
    "ProductID": [501, 502, 501, 503, 504, 502, 503, 504, 501, 505],
    "Quantity": [2, 1, 4, 3, 1, 2, 5, 1, 2, 1],
    "Price": [150.0, 250.0, 150.0, 300.0, 450.0, 250.0, 300.0, 450.0, 150.0, 550.0],
    "Date": [
        datetime(2024, 9, 1),
        datetime(2024, 9, 1),
        datetime(2024, 9, 2),
        datetime(2024, 9, 2),
        datetime(2024, 9, 3),
        datetime(2024, 9, 3),
        datetime(2024, 9, 4),
        datetime(2024, 9, 4),
        datetime(2024, 9, 5),
        datetime(2024, 9, 5)
    ]
}

# Create a DataFrame
df = pd.DataFrame(data)

# Save the DataFrame to a CSV file
df.to_csv('sales_data.csv', index=False)

print("Sample sales dataset has been created and saved as 'sales_data.csv'.")

Sample sales dataset has been created and saved as 'sales_data.csv'.


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=3ed001f0c0b1e98bf3e171230c79349c0ad546fae7d517a386ebbdad9b9ed0b2
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [4]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Sales Dataset Analysis") \
    .getOrCreate()

    # Load the sales dataset
sales_df = spark.read.csv("sales_data.csv", header=True, inferSchema=True)

# Show the first few rows of the DataFrame
sales_df.show()

# Display the schema of the DataFrame
sales_df.printSchema()

# Display the first 5 rows of the DataFrame
sales_df.show(5)

# Get summary statistics for numeric columns (Quantity and Price)
sales_df.describe(["Quantity", "Price"]).show()


+-------------+----------+---------+--------+-----+----------+
|TransactionID|CustomerID|ProductID|Quantity|Price|      Date|
+-------------+----------+---------+--------+-----+----------+
|            1|       101|      501|       2|150.0|2024-09-01|
|            2|       102|      502|       1|250.0|2024-09-01|
|            3|       103|      501|       4|150.0|2024-09-02|
|            4|       101|      503|       3|300.0|2024-09-02|
|            5|       104|      504|       1|450.0|2024-09-03|
|            6|       102|      502|       2|250.0|2024-09-03|
|            7|       103|      503|       5|300.0|2024-09-04|
|            8|       104|      504|       1|450.0|2024-09-04|
|            9|       101|      501|       2|150.0|2024-09-05|
|           10|       105|      505|       1|550.0|2024-09-05|
+-------------+----------+---------+--------+-----+----------+

root
 |-- TransactionID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- ProductID: integer

In [5]:
from pyspark.sql.functions import col

# Add a new column for TotalSales (Quantity * Price)
sales_df = sales_df.withColumn("TotalSales", col("Quantity") * col("Price"))

# Show the updated DataFrame
sales_df.show()

+-------------+----------+---------+--------+-----+----------+----------+
|TransactionID|CustomerID|ProductID|Quantity|Price|      Date|TotalSales|
+-------------+----------+---------+--------+-----+----------+----------+
|            1|       101|      501|       2|150.0|2024-09-01|     300.0|
|            2|       102|      502|       1|250.0|2024-09-01|     250.0|
|            3|       103|      501|       4|150.0|2024-09-02|     600.0|
|            4|       101|      503|       3|300.0|2024-09-02|     900.0|
|            5|       104|      504|       1|450.0|2024-09-03|     450.0|
|            6|       102|      502|       2|250.0|2024-09-03|     500.0|
|            7|       103|      503|       5|300.0|2024-09-04|    1500.0|
|            8|       104|      504|       1|450.0|2024-09-04|     450.0|
|            9|       101|      501|       2|150.0|2024-09-05|     300.0|
|           10|       105|      505|       1|550.0|2024-09-05|     550.0|
+-------------+----------+---------+--

In [6]:

# Group by ProductID and calculate total sales for each product
product_sales_df = sales_df.groupBy("ProductID").sum("TotalSales").withColumnRenamed("sum(TotalSales)","TotalProductSales")

# Show the result
product_sales_df.show()


+---------+-----------------+
|ProductID|TotalProductSales|
+---------+-----------------+
|      501|           1200.0|
|      504|            900.0|
|      502|            750.0|
|      505|            550.0|
|      503|           2400.0|
+---------+-----------------+



In [9]:
# Find the product with the highest total sales
top_selling_product_df = product_sales_df.orderBy(col("TotalProductSales").desc())

# Show the top-selling product
top_selling_product_df.show(1)


+---------+-----------------+
|ProductID|TotalProductSales|
+---------+-----------------+
|      503|           2400.0|
+---------+-----------------+
only showing top 1 row



In [11]:
# Group by Date and calculate total sales for each day
daily_sales_df = sales_df.groupBy("Date").sum("TotalSales").withColumnRenamed("sum(TotalSales)","TotalDailySales")

# Show the result
daily_sales_df.show()


+----------+---------------+
|      Date|TotalDailySales|
+----------+---------------+
|2024-09-03|          950.0|
|2024-09-01|          550.0|
|2024-09-02|         1500.0|
|2024-09-05|          850.0|
|2024-09-04|         1950.0|
+----------+---------------+



In [12]:
# Filter transactions with TotalSales greater than ₹500
high_value_sales_df = sales_df.filter(col("TotalSales") > 500)

# Show the filtered transactions
high_value_sales_df.show()


+-------------+----------+---------+--------+-----+----------+----------+
|TransactionID|CustomerID|ProductID|Quantity|Price|      Date|TotalSales|
+-------------+----------+---------+--------+-----+----------+----------+
|            3|       103|      501|       4|150.0|2024-09-02|     600.0|
|            4|       101|      503|       3|300.0|2024-09-02|     900.0|
|            7|       103|      503|       5|300.0|2024-09-04|    1500.0|
|           10|       105|      505|       1|550.0|2024-09-05|     550.0|
+-------------+----------+---------+--------+-----+----------+----------+



In [16]:
# Count the number of purchases made by each customer
repeat_customers_df = sales_df.groupBy("CustomerID").count().filter(col("count") > 1)

# Show repeat customers
repeat_customers_df.show()

# Calculate the average price per product
avg_price_per_product_df = sales_df.groupBy("ProductID").agg(sum("TotalSales") / sum("Quantity"))

# Show the average price per product
avg_price_per_product_df.show()



+----------+-----+
|CustomerID|count|
+----------+-----+
|       101|    3|
|       103|    2|
|       102|    2|
|       104|    2|
+----------+-----+



TypeError: unsupported operand type(s) for +: 'int' and 'str'

In [3]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Key-Value Pair RDDs Exercise") \
    .getOrCreate()

# Get the SparkContext from SparkSession
sc = spark.sparkContext

#Task 1: Create an RDD from the Sales Data
sales_data = [
    ("ProductA", 100),
    ("ProductB", 150),
    ("ProductA", 200),
    ("ProductC", 300),
    ("ProductB", 250),
    ("ProductC", 100)
]

# Create an RDD from sales_data
sales_rdd = sc.parallelize(sales_data)


print("Sales RDD:")
print(sales_rdd.take(5))

#Task 2: Group Data by Product Name

grouped_rdd = sales_rdd.groupByKey()


print("Grouped Data:")
for product, sales in grouped_rdd.collect():
    print(product, list(sales))


#Task 3: Calculate Total Sales by Product
# Calculate total sales for each product using reduceByKey
total_sales_rdd = sales_rdd.reduceByKey(lambda x, y: x + y)


print("Total Sales by Product:")
total_sales_rdd.collect()


#Task 4: Sort Products by Total Sales
sorted_sales_rdd = total_sales_rdd.sortBy(lambda x: x[1], ascending=False)


print("Sorted Products by Total Sales:")
sorted_sales_rdd.collect()


#Task 5: Filter Products with High Sales
# Filter products with total sales greater than 200
high_sales_rdd = total_sales_rdd.filter(lambda x: x[1] > 200)


print("Products with Sales Greater Than 200:")
high_sales_rdd.collect()


#Task 6: Combine Regional Sales Data

# Regional sales data
regional_sales_data = [
    ("ProductA", 50),
    ("ProductC", 150)
]

# Create an RDD from regional_sales_data
regional_sales_rdd = sc.parallelize(regional_sales_data)

# Combine the sales RDDs using union
combined_rdd = sales_rdd.union(regional_sales_rdd)

# Calculate the new total sales for each product after combining the datasets
combined_total_sales_rdd = combined_rdd.reduceByKey(lambda x, y: x + y)


print("Combined Sales Data:")
combined_total_sales_rdd.collect()

#Task 7: Count the Number of Distinct Products


distinct_products_count = combined_total_sales_rdd.keys().distinct().count()

print("Number of Distinct Products:", distinct_products_count)


#Task 8: Identify the Product with Maximum Sales
# Find the product with the maximum total sales using reduce
max_sales_product = combined_total_sales_rdd.reduce(lambda x, y: x if x[1] > y[1] else y)


print("Product with Maximum Sales:", max_sales_product)

#Challenge Task

# Calculate the total quantity sold for each product
total_quantity_rdd = sales_rdd.mapValues(lambda x: 1).reduceByKey(lambda x, y: x + y)

total_sales_quantity_rdd = total_sales_rdd.join(total_quantity_rdd)


avg_sales_rdd = total_sales_quantity_rdd.mapValues(lambda x: x[0] / x[1])


print("Average Sales per Product:")
avg_sales_rdd.collect()



Sales RDD:
[('ProductA', 100), ('ProductB', 150), ('ProductA', 200), ('ProductC', 300), ('ProductB', 250)]
Grouped Data:
ProductA [100, 200]
ProductB [150, 250]
ProductC [300, 100]
Total Sales by Product:
Sorted Products by Total Sales:
Products with Sales Greater Than 200:
Combined Sales Data:
Number of Distinct Products: 3
Product with Maximum Sales: ('ProductC', 550)
Average Sales per Product:


[('ProductA', 150.0), ('ProductB', 200.0), ('ProductC', 200.0)]