In [2]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("ReadDataFrame").getOrCreate()

# Define the file path (replace with your actual file path)
file_path = "/content/users.csv"  # Change for your file type

# Read the file into a DataFrame (assuming it's a CSV file)
df_users = spark.read.option("header", "true").csv(file_path)


file_path2 = "/content/user_touchpoints.csv"
df_user_touchpoints = spark.read.option("header", "true").csv(file_path2)

file_path3 = "/content/revenue_and_rewards.csv"
df_revenue_and_rewards = spark.read.option("header", "true").csv(file_path3)

file_path4 = "/content/marketing_spend.csv"
df_marketing_spend= spark.read.option("header", "true").csv(file_path4)

In [3]:
from pyspark.sql.functions import sum, count, col

# Aggregate total spend, impressions, and clicks from marketing_spend.csv
df_marketing_metrics = df_marketing_spend.groupBy("channel").agg(
    sum("spend").alias("total_spend"),
    sum("impressions").alias("total_impressions"),
    sum("clicks").alias("total_clicks")
)

# Aggregate total conversions from user_touchpoints.csv
df_conversions = df_user_touchpoints.filter(col("conversion") == True).groupBy("channel").agg(
    count("user_id").alias("total_conversions")
)

# Aggregate total revenue from revenue_and_rewards.csv (join with users for channel)
df_revenue = df_users.join(df_revenue_and_rewards, "user_id", "inner") \
                     .groupBy("channel").agg(sum("revenue").alias("total_revenue"))

# Join all datasets to get complete channel performance
df_channel_performance = df_marketing_metrics \
    .join(df_conversions, "channel", "left") \
    .join(df_revenue, "channel", "left") \
    .fillna(0)  # Fill nulls with 0 for missing values

# Show results
df_channel_performance.show()


+----------+------------------+-----------------+------------+-----------------+------------------+
|   channel|       total_spend|total_impressions|total_clicks|total_conversions|     total_revenue|
+----------+------------------+-----------------+------------+-----------------+------------------+
| instagram|23294.956006770208|        2342705.0|     57677.0|             5168|1405870.5400000694|
|   twitter|13950.928595089703|        2731093.0|     53713.0|             5179| 980673.7400000249|
|google_ads|40751.349267119636|        1728387.0|     59581.0|             5243|2284628.7300002063|
| affiliate| 9317.011117527823|         517110.0|     19806.0|             5234| 830896.4700000079|
|  facebook|29205.112395249587|        2280049.0|     62934.0|             5283|1878910.2500001364|
+----------+------------------+-----------------+------------+-----------------+------------------+



In [4]:
from pyspark.sql.functions import sum, count, col, when

# Aggregate total spend from marketing_spend.csv
df_campaign_spend = df_marketing_spend.groupBy("campaign").agg(
    sum("spend").alias("total_spend")
)

# Aggregate total conversions from user_touchpoints.csv
df_campaign_conversions = df_user_touchpoints.filter(col("conversion") == True).groupBy("campaign").agg(
    count("user_id").alias("total_conversions")
)

# Aggregate total revenue from revenue_and_rewards.csv (join with users for campaign)
df_campaign_revenue = df_users.join(df_revenue_and_rewards, "user_id", "inner") \
                     .groupBy("campaign").agg(sum("revenue").alias("total_revenue"))

# Join all datasets to get complete campaign performance
df_campaign_performance = df_campaign_spend \
    .join(df_campaign_conversions, "campaign", "left") \
    .join(df_campaign_revenue, "campaign", "left") \
    .fillna(0)  # Fill nulls with 0 for missing values

# Calculate ROAS (Return on Ad Spend) = Total Revenue / Total Spend
df_campaign_performance = df_campaign_performance.withColumn(
    "ROAS", when(col("total_spend") > 0, col("total_revenue") / col("total_spend")).otherwise(None)
)

# Show results
df_campaign_performance.show()


+---------------+------------------+-----------------+------------------+------------------+
|       campaign|       total_spend|total_conversions|     total_revenue|              ROAS|
+---------------+------------------+-----------------+------------------+------------------+
|brand_awareness| 23339.69835786019|             3168| 383497.6999999947| 16.43113351852051|
|holiday_special| 23312.31130874749|             6580|1699933.1000000883| 72.91997251950825|
|    summer_sale|23217.213090944046|             6654|1420579.4800000563| 61.18647722426936|
|    retargeting|23283.140160950188|            12135| 2743903.860000251|117.84938977441915|
| new_user_promo|23366.994463254894|             7771| 1133065.590000059| 48.49000121867745|
+---------------+------------------+-----------------+------------------+------------------+



In [5]:
# CAC
from pyspark.sql.functions import countDistinct

# Total new customers acquired
df_new_customers = df_users.groupBy("channel").agg(countDistinct("user_id").alias("new_customers"))

# CAC Calculation
df_cac = df_marketing_spend.groupBy("channel").agg(sum("spend").alias("total_spend")) \
    .join(df_new_customers, "channel", "left") \
    .withColumn("CAC", col("total_spend") / col("new_customers"))

df_cac.show()


+----------+------------------+-------------+------------------+
|   channel|       total_spend|new_customers|               CAC|
+----------+------------------+-------------+------------------+
| instagram|23294.956006770208|         6565|3.5483558273831237|
|   twitter|13950.928595089703|         4690|2.9746116407440732|
|google_ads|40751.349267119636|        10674|3.8178142464979987|
| affiliate| 9317.011117527823|         3885|2.3982010598527217|
|  facebook|29205.112395249587|         8883|3.2877532810142505|
+----------+------------------+-------------+------------------+



In [6]:
# conversion rate
df_conversion_rate = df_user_touchpoints.filter(col("conversion") == True) \
    .groupBy("channel").agg(count("user_id").alias("total_conversions")) \
    .join(df_marketing_spend.groupBy("channel").agg(sum("clicks").alias("total_clicks")), "channel", "left") \
    .withColumn("Conversion Rate (%)", (col("total_conversions") / col("total_clicks")) * 100)

df_conversion_rate.show()


+----------+-----------------+------------+-------------------+
|   channel|total_conversions|total_clicks|Conversion Rate (%)|
+----------+-----------------+------------+-------------------+
| instagram|             5168|     57677.0|  8.960244118106004|
|   twitter|             5179|     53713.0|  9.641986111369686|
|     email|             5155|        NULL|               NULL|
|google_ads|             5243|     59581.0|   8.79978516641211|
|   organic|             5046|        NULL|               NULL|
| affiliate|             5234|     19806.0| 26.426335453902855|
|  facebook|             5283|     62934.0|  8.394508532748594|
+----------+-----------------+------------+-------------------+



In [7]:
# ros
df_roas = df_marketing_spend.groupBy("channel").agg(sum("spend").alias("total_spend")) \
    .join(df_users.join(df_revenue_and_rewards, "user_id", "inner").groupBy("channel")
          .agg(sum("revenue").alias("total_revenue")), "channel", "left") \
    .withColumn("ROAS", col("total_revenue") / col("total_spend"))

df_roas.show()


+----------+------------------+------------------+------------------+
|   channel|       total_spend|     total_revenue|              ROAS|
+----------+------------------+------------------+------------------+
| instagram|23294.956006770208|1405870.5400000694|60.350856193567466|
|   twitter|13950.928595089703| 980673.7400000249| 70.29451361002535|
|google_ads|40751.349267119636|2284628.7300002063| 56.06265242961088|
| affiliate| 9317.011117527823| 830896.4700000079| 89.18058157479993|
|  facebook|29205.112395249587|1878910.2500001364| 64.33497753995132|
+----------+------------------+------------------+------------------+



In [8]:
# CTR
df_ctr = df_marketing_spend.groupBy("channel").agg(
    sum("impressions").alias("total_impressions"),
    sum("clicks").alias("total_clicks")
).withColumn("CTR (%)", (col("total_clicks") / col("total_impressions")) * 100)

df_ctr.show()


+----------+-----------------+------------+-----------------+
|   channel|total_impressions|total_clicks|          CTR (%)|
+----------+-----------------+------------+-----------------+
| instagram|        2342705.0|     57677.0|2.461983049509008|
|   twitter|        2731093.0|     53713.0|1.966721748398901|
|google_ads|        1728387.0|     59581.0|3.447202507308838|
| affiliate|         517110.0|     19806.0|3.830132853744851|
|  facebook|        2280049.0|     62934.0| 2.76020383772454|
+----------+-----------------+------------+-----------------+



In [9]:
# CRR
from pyspark.sql.functions import date_trunc

# Count active customers at start and end of the period
df_retention = df_users.withColumn("month", date_trunc("month", col("created_at"))) \
    .groupBy("month").agg(countDistinct("user_id").alias("active_customers"))

df_retention.orderBy(col("month").desc()).show()

+-------------------+----------------+
|              month|active_customers|
+-------------------+----------------+
|2024-01-01 00:00:00|               4|
|2023-12-01 00:00:00|            3752|
|2023-11-01 00:00:00|            3648|
|2023-10-01 00:00:00|            2634|
|2023-09-01 00:00:00|            2495|
|2023-08-01 00:00:00|            3259|
|2023-07-01 00:00:00|            3130|
|2023-06-01 00:00:00|            3115|
|2023-05-01 00:00:00|            2639|
|2023-04-01 00:00:00|            2529|
|2023-03-01 00:00:00|            2624|
|2023-02-01 00:00:00|            2299|
|2023-01-01 00:00:00|            2569|
+-------------------+----------------+



In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count, col

# Initialize Spark Session
spark = SparkSession.builder.appName("DenormalizedUsers").getOrCreate()

# Load DataFrames
df_users = spark.read.option("header", "true").csv("/content/users.csv")
df_revenue_rewards = spark.read.option("header", "true").csv("/content/revenue_and_rewards.csv")
df_user_touchpoints = spark.read.option("header", "true").csv("/content/user_touchpoints.csv")

# Convert columns to appropriate types
df_revenue_rewards = df_revenue_rewards.withColumn("revenue", col("revenue").cast("double")) \
                                       .withColumn("reward_to_user", col("reward_to_user").cast("double"))

# Aggregate revenue & rewards per user
df_user_revenue = df_revenue_rewards.groupBy("user_id").agg(
    sum("revenue").alias("total_revenue"),
    sum("reward_to_user").alias("total_rewards")
)

# Count number of touchpoints per user
df_touchpoints = df_user_touchpoints.groupBy("user_id").agg(
    count("touchpoint_date").alias("num_touchpoints")
)

# Join all datasets to create denormalized user table
df_denormalized_users = df_users.join(df_user_revenue, "user_id", "left") \
    .join(df_touchpoints, "user_id", "left") \
    .fillna(0)  # Fill null values with 0

# Show final denormalized table
df_denormalized_users.show()

# Save as a Parquet file for materialized view
# df_denormalized_users.write.mode("overwrite").csv("file:///C:/Users/adity/Downloads/parquet", header=True)
df_denormalized_users.write.mode("overwrite").parquet("file:///C:/Users/adity/Downloads/parquet_output")





+-------+-------------------+-------------------+-------------------+----------+---------------+------------------+------------------+---------------+
|user_id|       installed_at|         created_at|         updated_at|   channel|       campaign|     total_revenue|     total_rewards|num_touchpoints|
+-------+-------------------+-------------------+-------------------+----------+---------------+------------------+------------------+---------------+
|u000015|2023-01-01 07:31:00|2023-01-01 07:59:00|2023-01-01 07:59:00|google_ads|    retargeting|             163.8|1.6400000000000001|              3|
|u000002|2023-01-01 00:51:00|2023-01-01 01:38:00|2023-01-01 01:38:00|google_ads| new_user_promo| 91.86000000000001|              0.89|              4|
|u000009|2023-01-01 03:11:00|2023-01-01 03:24:00|2023-01-01 03:24:00|google_ads|holiday_special|144.25000000000003|1.4400000000000004|              4|
|u000011|2023-01-01 20:32:00|2023-01-01 20:33:00|2023-01-01 20:33:00|google_ads|holiday_specia