
# Spark API Pyton(Pyspark) Analysis for  Customer-Sales-Analysis-Pipeline:

##  We have  an e-commerce dataset and want to:
## # 
## 
###   1. Identify customer who made multiple purchases in given period.
### 
###  2. Calcuation Customer's avarage spending per transaction
### ## 
###  3. Rank Customers based on thier total spending and identify the top 5 spenders.
### 4. First & Last Purchase date per customer
### 5. Monthly Revenue Summary.
### 6. Customers who purchased before April but not   in April.


In [0]:
##Pyspark code prepare the environment

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,avg,sum as _sum,rank,to_date,to_csv,to_json,count, min as _min,max as _max,round as _round, date_format, to_date, datediff,lag,row_number
from pyspark.sql.window import Window




In [0]:
##  1. Identify customer who made multiple purchases in given period.


#step1: Create SparkSession
spark = SparkSession.builder.appName('CustomerRetentionAnalysis').getOrCreate()

In [0]:
#step2: Load Data
data = [
    ('S1', '2024-01-01', 200),
    ('S2', '2024-01-03', 150),
    ('S1', '2024-01-10', 300),
    ('S3', '2024-01-15', 100),
    ('S2', '2024-02-01', 200),
    ('S1', '2024-02-15', 400),
    ('S4', '2024-03-01', 500),
    ('S5', '2024-03-15', 600),
    ('S6', '2024-03-20', 300),
    ('S4', '2024-04-01', 300),
    ('S5', '2024-04-10', 200),
]
 
columns = ['CustomerID','PurchaseDate','Amount']
df = spark.createDataFrame(data,columns).withColumn('PurchaseDate', to_date('PurchaseDate', "yyyy-MM-dd"))


In [0]:
df.display()

CustomerID,PurchaseDate,Amount
S1,2024-01-01,200
S2,2024-01-03,150
S1,2024-01-10,300
S3,2024-01-15,100
S2,2024-02-01,200
S1,2024-02-15,400
S4,2024-03-01,500
S5,2024-03-15,600
S6,2024-03-20,300
S4,2024-04-01,300


In [0]:
#Solution-1. Identifying Repeat Customers

repeat_customer_df = df.groupBy('CustomerID').count().filter(col('count') > 1)

In [0]:
repeat_customer_df.show()

+----------+-----+
|CustomerID|count|
+----------+-----+
|        S1|    3|
|        S2|    2|
|        S4|    2|
|        S5|    2|
+----------+-----+



In [0]:
#Solution -2 Calculate Avarage Spending
'''Group data by customers and calculate avarage transaction amounts '''

#step: calculate avarage spending per transaction

avg_spending_df = df.groupBy('CustomerID')\
                    .agg(avg('Amount').alias('AvgSpendingPerTransaction'))



In [0]:
avg_spending_df.display()

CustomerID,AvgSpendingPerTransaction
S1,300.0
S2,175.0
S3,100.0
S4,400.0
S5,400.0
S6,300.0


In [0]:
#Solution -3. Rank Customers by Total Spending
'''Group by customers, sum their spending , and rank using window functions'''
#step-3:
total_spending_df = df.groupBy('CustomerID').agg(_sum('Amount').alias('TotalSpending'))

window_spec = Window.orderBy(col('TotalSpending').desc())
ranked_df = total_spending_df.withColumn(
    'Rank', rank().over(window_spec)
).filter(col('Rank') <= 5)

In [0]:
ranked_df.display()

CustomerID,TotalSpending,Rank
S1,900,1
S4,800,2
S5,800,2
S2,350,4
S6,300,5


In [0]:
#First & last purchase date per customer (and days between)
first_last_df = (

    df.groupBy('CustomerID')\
      .agg(_min('PurchaseDate').alias('first_purchase'), _max('PurchaseDate').alias('last_purchase'))\
      .withColumn("days_bettween", datediff(col('last_purchase'), col('first_purchase')))\
      .orderBy('CustomerID')  
)


In [0]:
first_last_df.show(truncate=False)
first_last_df.display()

+----------+--------------+-------------+-------------+
|CustomerID|first_purchase|last_purchase|days_bettween|
+----------+--------------+-------------+-------------+
|S1        |2024-01-01    |2024-02-15   |45           |
|S2        |2024-01-03    |2024-02-01   |29           |
|S3        |2024-01-15    |2024-01-15   |0            |
|S4        |2024-03-01    |2024-04-01   |31           |
|S5        |2024-03-15    |2024-04-10   |26           |
|S6        |2024-03-20    |2024-03-20   |0            |
+----------+--------------+-------------+-------------+



CustomerID,first_purchase,last_purchase,days_bettween
S1,2024-01-01,2024-02-15,45
S2,2024-01-03,2024-02-01,29
S3,2024-01-15,2024-01-15,0
S4,2024-03-01,2024-04-01,31
S5,2024-03-15,2024-04-10,26
S6,2024-03-20,2024-03-20,0


In [0]:
# 5.Monthly Revenue Summary
monthly_revenue_df =(

    df.withColumn('month', date_format(col('PurchaseDate'),'yyyy-MM'))\
      .groupBy('month')
      .agg(_sum('Amount').alias('revenue'), count('*').alias('transactions'))
      .orderBy('month')
)

In [0]:
monthly_revenue_df.show()
monthly_revenue_df.display()

+-------+-------+------------+
|  month|revenue|transactions|
+-------+-------+------------+
|2024-01|    750|           4|
|2024-02|    600|           2|
|2024-03|   1400|           3|
|2024-04|    500|           2|
+-------+-------+------------+



month,revenue,transactions
2024-01,750,4
2024-02,600,2
2024-03,1400,3
2024-04,500,2


In [0]:
# 6. Customers who purchased before April but not in April
q1_customers_df =  df.filter(col('PurchaseDate') < '2024-04-01').select('CustomerID').distinct()
april_customers_df = df.filter(col('PurchaseDate')>='2024-04-01').select('CustomerID').distinct()
q1_not_april_df = q1_customers_df.join(april_customers_df, on="CustomerID",how="left_anti").orderBy("CustomerID")

In [0]:
q1_not_april_df.show()
q1_not_april_df.display()

+----------+
|CustomerID|
+----------+
|        S1|
|        S2|
|        S3|
|        S6|
+----------+



CustomerID
S1
S2
S3
S6


# Spark SQL (Python) Analysis for  Customer-Sales-Analysis-Pipeline:

##  We have  an e-commerce dataset and want to:
## # 
## 
###   1. Identify customer who made multiple purchases in given period.
### 
###  2. Calcuation Customer's avarage spending per transaction
### ## 
### 3. Rank Customers based on thier total spending and identify the top 5 spenders.
##  4.First & Last Purchase date per customer
##  5. Monthly Revenue Summary.
##  6. Customers who purchased before April but not   in April

In [0]:
#create temp view for spark SQL

df.createOrReplaceTempView('Customer_sales')
df.orderBy('PurchaseDate').show(truncate=False)

+----------+------------+------+
|CustomerID|PurchaseDate|Amount|
+----------+------------+------+
|S1        |2024-01-01  |200   |
|S2        |2024-01-03  |150   |
|S1        |2024-01-10  |300   |
|S3        |2024-01-15  |100   |
|S2        |2024-02-01  |200   |
|S1        |2024-02-15  |400   |
|S4        |2024-03-01  |500   |
|S5        |2024-03-15  |600   |
|S6        |2024-03-20  |300   |
|S4        |2024-04-01  |300   |
|S5        |2024-04-10  |200   |
+----------+------------+------+



In [0]:
#--------------------------------------------------
#spark SQL Queries
#------------------------------------------------

# Identify repate customers(more than one purchase)

query_repate_customers = """

select CustomerID,
COUNT(*) as purchase_count
from Customer_sales
Group By CustomerID 
Having COUNT(*) > 1
ORDER BY
purchase_count desc, CustomerID
"""

repeate_customers = spark.sql(query_repate_customers)



In [0]:
repeate_customers.display()

CustomerID,purchase_count
S1,3
S2,2
S4,2
S5,2


In [0]:
#2. Avarage Spending per transaction per customers

query_avarage_spend = """
select 
  CustomerID,
  Round(AVG(Amount),2) as  avg_spend,
  Count(*) as total_transactions
  from Customer_sales 
  Group By CustomerID
  Order By avg_spend desc

"""
avg_spend_customers = spark.sql(query_avarage_spend)

In [0]:
avg_spend_customers.display()

CustomerID,avg_spend,total_transactions
S4,400.0,2
S5,400.0,2
S1,300.0,3
S6,300.0,1
S2,175.0,2
S3,100.0,1


In [0]:
#3. Rank customer by total spend (Top 5)


query_total_spend = """
WITH total_spend_cte AS (
  SELECT
    CustomerID,
    SUM(Amount) AS total_amount
  FROM Customer_sales
  GROUP BY CustomerID
)
SELECT
  CustomerID,
  total_amount,
  RANK() OVER (ORDER BY total_amount DESC) AS spend_rank
FROM total_spend_cte
ORDER BY spend_rank
LIMIT 5
"""

query_spend_rank = spark.sql(query_total_spend)


In [0]:
query_spend_rank.display()

CustomerID,total_amount,spend_rank
S1,900,1
S4,800,2
S5,800,2
S2,350,4
S6,300,5


In [0]:
#4. First & Last Purchase date per customer

query_first_last = """
select 
  CustomerID,
  Min(PurchaseDate) as first_purchase,
  Max(PurchaseDate) as last_purchase,
  DATEDIFF(Max(PurchaseDate), Min(PurchaseDate)) As days_between
  from Customer_sales
  Group By CustomerID
  order by CustomerID


"""
query_first_last_purchase = spark.sql(query_first_last) 

In [0]:
query_first_last_purchase.show()
query_first_last_purchase.display()

+----------+--------------+-------------+------------+
|CustomerID|first_purchase|last_purchase|days_between|
+----------+--------------+-------------+------------+
|        S1|    2024-01-01|   2024-02-15|          45|
|        S2|    2024-01-03|   2024-02-01|          29|
|        S3|    2024-01-15|   2024-01-15|           0|
|        S4|    2024-03-01|   2024-04-01|          31|
|        S5|    2024-03-15|   2024-04-10|          26|
|        S6|    2024-03-20|   2024-03-20|           0|
+----------+--------------+-------------+------------+



CustomerID,first_purchase,last_purchase,days_between
S1,2024-01-01,2024-02-15,45
S2,2024-01-03,2024-02-01,29
S3,2024-01-15,2024-01-15,0
S4,2024-03-01,2024-04-01,31
S5,2024-03-15,2024-04-10,26
S6,2024-03-20,2024-03-20,0


In [0]:
#5. Monthly Revenue Summary
query_monthly_revenue ="""
 select date_format(PurchaseDate, 'yyyy-MM') as month,
 sum(amount) as revenue,
 Count(*) as transactions
 from Customer_sales
 group by  date_format(PurchaseDate, 'yyyy-MM')
 order by month

"""

query_monthly_revenue_summary = spark.sql(query_monthly_revenue)

In [0]:
query_monthly_revenue_summary.show()
query_monthly_revenue_summary.display()

+-------+-------+------------+
|  month|revenue|transactions|
+-------+-------+------------+
|2024-01|    750|           4|
|2024-02|    600|           2|
|2024-03|   1400|           3|
|2024-04|    500|           2|
+-------+-------+------------+



month,revenue,transactions
2024-01,750,4
2024-02,600,2
2024-03,1400,3
2024-04,500,2


In [0]:
#6.Customers who purchased before April but not in April
query_q1_customer = """
 select distinct 
     CustomerID from Customer_sales 
    where PurchaseDate < '2024-04-01'
"""
query_april_customers = """
select distinct 
       CustomerID  from Customer_sales
       where PurchaseDate >= '2024-04-01'
  
"""
q1_customers = [row.CustomerID for row in spark.sql(query_q1_customer).collect()]
q1_april_customers = set([row.CustomerID for row in spark.sql(query_april_customers).collect()])
q1_not_april = [n for n in q1_customers if n not  in q1_april_customers]



In [0]:
q1_not_april

Out[29]: ['S1', 'S2', 'S3', 'S6']