In [5]:
import warnings
warnings.filterwarnings('ignore')

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,count,sum,avg,max,min,datediff,current_date

In [7]:
pg_url = 'jdbc:postgresql://localhost:5432/Customer_Churn_Analysis'
pg_properties = {
    'user':'postgres',
    'password':'pgroot',
    'driver':'org.postgresql.Driver'
}

In [8]:
spark=SparkSession.builder\
.appName('CustomerChurnAnalysis')\
.config('spark.jars','/home/krish/postgresql-42.7.5.jar')\
.getOrCreate()

In [9]:
customers_df=spark.read.jdbc(pg_url,'customers',properties=pg_properties)
transactions_df=spark.read.jdbc(pg_url,'transactions',properties=pg_properties)
churn_df=spark.read.jdbc(pg_url,'churn_status',properties=pg_properties)

In [10]:
# Find last Transaction date for each customer

last_transaction=transactions_df.groupBy('customer_id')\
.agg(max('transaction_date').alias('last_purchase_date'))

In [11]:
# calculate days since last transaction

churn_analysis = last_transaction.withColumn('days_since_last_purchase',
                                            datediff(current_date(),col('last_purchase_date')))

In [12]:
# merge with churn status

final_churn_df=churn_analysis.join(churn_df,'customer_id','left')

In [13]:
# identify high risk customers(no purchase in 6+ months)

churn_risk = final_churn_df.filter(col('days_since_last_purchase')>180)

In [14]:
print('Customers at High risk of churn: ')
churn_risk.show(5)

Customers at High risk of churn: 
+-----------+------------------+------------------------+----------+
|customer_id|last_purchase_date|days_since_last_purchase|is_churned|
+-----------+------------------+------------------------+----------+
|          1|        2024-01-10|                     409|         0|
|          3|        2024-02-10|                     378|         0|
|          5|        2023-12-20|                     430|         1|
|          4|        2023-11-15|                     465|         1|
|          2|        2023-10-05|                     506|         1|
+-----------+------------------+------------------------+----------+



In [15]:
churn_risk.write.jdbc(pg_url,'churn_risk_customers',mode='overwrite',properties=pg_properties)