In [0]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Application").getOrCreate()


In [0]:
data3 = spark.read.format("csv").option('InferSchema', 'True').option("header", "true").load("dbfs:/FileStore/FileStore/dataset.csv")

data3.display()

transaction_id,customer_id,product,quantity,price,date,region
1,101,Notebook,2,500,2024-01-15,North
2,102,Pencil,10,20,2024-01-16,South
3,103,Notebook,1,500,2024-01-17,East
4,104,Pen,5,100,2024-01-18,West
5,105,Notebook,3,500,2024-01-19,North
6,101,Pencil,4,20,2024-01-20,South
7,103,Pen,2,100,2024-01-21,East
8,102,Notebook,1,500,2024-01-22,West
9,104,Pencil,8,20,2024-01-23,North
10,105,Notebook,2,500,2024-01-24,South


In [0]:
data3.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- region: string (nullable = true)



**Question 1. Find the total sales for each region**

In [0]:


data3 = data3.withColumn("total_sales", col("quantity") * col("price"))
data3.display()

transaction_id,customer_id,product,quantity,price,date,region,total_sales
1,101,Notebook,2,500,2024-01-15,North,1000
2,102,Pencil,10,20,2024-01-16,South,200
3,103,Notebook,1,500,2024-01-17,East,500
4,104,Pen,5,100,2024-01-18,West,500
5,105,Notebook,3,500,2024-01-19,North,1500
6,101,Pencil,4,20,2024-01-20,South,80
7,103,Pen,2,100,2024-01-21,East,200
8,102,Notebook,1,500,2024-01-22,West,500
9,104,Pencil,8,20,2024-01-23,North,160
10,105,Notebook,2,500,2024-01-24,South,1000


In [0]:

sales_by_region = data3.groupBy("region").agg(
    sum("total_sales").alias("total_sales_by_region")
)

sales_by_region.display()

region,total_sales_by_region
South,1280
East,700
West,1000
North,2660


**Question 2. Identify the most purchased product by each customer.**

In [0]:
from pyspark.sql.window import *



In [0]:
df_aggregated = data3.groupBy("customer_id", "product").agg(
    sum("quantity").alias("total_quantity")
)
 
window = Window.partitionBy('customer_id').orderBy(col('total_quantity').desc())

df_window = df_aggregated.withColumn('rank', row_number().over(window))

df_top_product= df_window.filter(col('rank')==1)
 
df_top_product.display()
 

customer_id,product,total_quantity,rank
101,Pencil,4,1
102,Pencil,10,1
103,Pen,2,1
104,Pencil,8,1
105,Notebook,5,1


**OR sql query**

In [0]:
# Step 1: Register data3 as a temporary SQL view
data3.createOrReplaceTempView("data3")

df_top_product_sql = spark.sql('''
    WITH cte AS (
        SELECT 
            customer_id,
            product,
            SUM(quantity) AS total_quantity,
            RANK() OVER (PARTITION BY customer_id ORDER BY SUM(quantity) DESC) AS rank
        FROM data3
        GROUP BY customer_id, product
    )
    SELECT 
        customer_id, 
        product, 
        total_quantity
    FROM cte
    WHERE rank = 1
''')



df_top_product_sql.display()

customer_id,product,total_quantity
101,Pencil,4
102,Pencil,10
103,Pen,2
104,Pencil,8
105,Notebook,5


**Question 3. Calculate the total revenue generated by each product.**

In [0]:
revenue = data3.groupby('product').agg(sum('total_sales').alias('total_revenue'))

revenue.display()

product,total_revenue
Pen,700
Notebook,4500
Pencil,440


**Question 4. Find the region with the highest total sales.**

In [0]:
highest_sales = data3.groupBy('region').agg(sum('total_sales').alias('Highest_Total_sales_By_region')).orderBy(col('Highest_Total_sales_By_region').desc())

highest_sales.display()

region,Highest_Total_sales_By_region
North,2660
South,1280
West,1000
East,700


**Question 5. Find customers who purchased more than or equals to 10 units of any product.**

In [0]:
more_than_10_units = data3.filter(col('quantity')>= 10).display()

transaction_id,customer_id,product,quantity,price,date,region,total_sales
2,102,Pencil,10,20,2024-01-16,South,200


**Question 6. Calculate average price per product across all regions.**

In [0]:
avg_price_per_product = data3.groupBy('product').agg(avg('price')).display()

product,avg(price)
Pen,100.0
Notebook,500.0
Pencil,20.0


**Question 7. Rank the products by total sales in descending order.**

In [0]:
product_sales = data3.groupBy('product').agg(sum(col('quantity') * col('price')).alias('total_Sales'))

window_spec = Window.orderBy(col('total_Sales').desc())

ranked_products = product_sales.withColumn('rank', rank().over(window_spec))

ranked_products.display()

product,total_Sales,rank
Notebook,4500,1
Pen,700,2
Pencil,440,3


**Question 8. Count the number of distinct customers in each region.**
 


In [0]:
customer_by_region = data3.groupBy('region').agg(countDistinct('customer_id')).display()

region,count(customer_id)
South,3
East,1
West,2
North,3


**Question 9. Determine which region has the highest average quantity sold per transaction.**
 


In [0]:
avg_quantity_by_region = data3.groupBy('region').agg(avg(col('quantity')).alias('Avg_quantity'))

highest_avg_region = avg_quantity_by_region.orderBy(col('Avg_quantity').desc())

highest_avg_region.display()


region,Avg_quantity
South,5.333333333333333
North,4.333333333333333
West,3.0
East,1.5


**Question 10. Find transactions that contributed to the top 10% of total revenue.**

In [0]:
# Step 1: Calculate total revenue for each transaction
data3 = data3.withColumn('total_revenue', col('quantity') * col('price'))

# Step 2: Calculate the total revenue of all transactions
total_revenue = data3.agg(sum('total_revenue')).collect()[0][0]

# Step 3: Calculate the threshold for the top 10% revenue
threshold = total_revenue * 0.10

# Step 4: Filter transactions with total revenue >= threshold
top_10_percent_transactions = data3.filter(col('total_revenue') >= threshold)

# Show the results
top_10_percent_transactions.display()

transaction_id,customer_id,product,quantity,price,date,region,total_sales,total_revenue
1,101,Notebook,2,500,2024-01-15,North,1000,1000
5,105,Notebook,3,500,2024-01-19,North,1500,1500
10,105,Notebook,2,500,2024-01-24,South,1000,1000
