In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, when, count, sum, lit
from pyspark.sql.window import Window

### Initialize Spark session

In [None]:
spark = SparkSession.builder.appName("CUST_MSTR_Transformations").getOrCreate()

### Load the data from the Data Lake

In [None]:
cust_mstr_df = spark.read.csv("https://ctcoe.blob.core.windows.net/week07/Raw Data/CUST_MSTR_20191112.csv", header=True, inferSchema=True)

## Task 1: Find Average Spending Score by Profession

In [None]:
avg_spending_by_profession = cust_mstr_df.groupBy("Profession").agg(avg("SpendingScore").alias("AvgSpendingScore"))

## Task 2: Identify Customers with High Spending Propensity and Low Income
### Assuming threshold for high spending propensity is SpendingScore > 70 and low income is Income < 30000

In [None]:
high_spending_low_income = cust_mstr_df.filter((col("SpendingScore") > 70) & (col("Income") < 30000))

## Task 3: Compare Spending Score Distribution Across Age Groups
### Creating age groups

In [None]:
cust_mstr_df = cust_mstr_df.withColumn("AgeGroup", 
                when(col("Age") < 20, "Under 20")
               .when((col("Age") >= 20) & (col("Age") < 30), "20-29")
               .when((col("Age") >= 30) & (col("Age") < 40), "30-39")
               .when((col("Age") >= 40) & (col("Age") < 50), "40-49")
               .when((col("Age") >= 50) & (col("Age") < 60), "50-59")
               .otherwise("60 and above"))

spending_score_by_age_group = cust_mstr_df.groupBy("AgeGroup").agg(avg("SpendingScore").alias("AvgSpendingScore"))

## Task 4: Identify Top Spending Families
### Assuming families are identified by FamilyID and top spending is based on SpendingScore

In [None]:
top_spending_families = cust_mstr_df.groupBy("FamilyID").agg(sum("SpendingScore").alias("TotalSpendingScore")).orderBy(col("TotalSpendingScore").desc())

## Task 5: Finding Customers with High Spending Potential based on Age and Profession
### Assuming high spending potential is SpendingScore > 80


In [None]:
high_spending_potential = cust_mstr_df.filter(col("SpendingScore") > 80).select("CustomerID", "Age", "Profession", "SpendingScore")

## Task 6: Identifying Customers with Declining Spending Score
### Assuming decline is defined as a drop in SpendingScore over time
### Creating a window specification to calculate the difference

In [None]:
window_spec = Window.partitionBy("CustomerID").orderBy("Date")
cust_mstr_df = cust_mstr_df.withColumn("PrevSpendingScore", lag("SpendingScore").over(window_spec))
cust_mstr_df = cust_mstr_df.withColumn("SpendingScoreDiff", col("SpendingScore") - col("PrevSpendingScore"))

declining_spenders = cust_mstr_df.filter(col("SpendingScoreDiff") < 0).select("CustomerID", "Date", "SpendingScoreDiff")

### Combine all the results into one dataframe for loading

In [None]:
final_df = cust_mstr_df.join(avg_spending_by_profession, "Profession", "left") \
                       .join(high_spending_low_income, "CustomerID", "left") \
                       .join(spending_score_by_age_group, "AgeGroup", "left") \
                       .join(top_spending_families, "FamilyID", "left") \
                       .join(high_spending_potential, "CustomerID", "left") \
                       .join(declining_spenders, "CustomerID", "left")

### Save the final transformed data back to Data Lake or directly to SQL Database

In [None]:
final_df.write.csv("path_to_transformed_data", header=True)

### Stop Spark session

In [None]:
spark.stop()