In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, when
from pyspark.sql.types import IntegerType, FloatType, DateType
from pyspark.sql.functions import to_date
from pyspark.sql.functions import regexp_replace, col, lower, trim, to_date, when

# Use the same Spark session as Extract notebook
spark = SparkSession.builder.appName("GoogleAdsETL").getOrCreate()

# Use df_ads from Extract notebook
# If running sequentially in the same kernel, df_ads is already in memory
# Otherwise, uncomment this to read from Parquet saved in Extract step:
df_ads = spark.read.parquet("../data/google_ads_extracted.parquet")

# Standardize column names
df_ads = df_ads.toDF(*[c.strip().lower().replace(' ', '_') for c in df_ads.columns])

# Handle nulls
important_cols = ['ad_id', 'campaign_name']
df_ads = df_ads.dropna(subset=important_cols)

num_cols = ['clicks', 'impressions', 'leads', 'conversions', 'conversion_rate']
for c in num_cols:
    df_ads = df_ads.fillna({c: 0})

text_cols = ['cost', 'sale_amount', 'location', 'device', 'keyword']
for c in text_cols:
    df_ads = df_ads.fillna({c: ''})

# Remove duplicates
df_ads = df_ads.dropDuplicates()

# Convert numeric columns
df_ads = df_ads.withColumn("clicks", col("clicks").cast(IntegerType()))
df_ads = df_ads.withColumn("impressions", col("impressions").cast(IntegerType()))
df_ads = df_ads.withColumn("leads", col("leads").cast(IntegerType()))
df_ads = df_ads.withColumn("conversions", col("conversions").cast(IntegerType()))
df_ads = df_ads.withColumn("conversion_rate", col("conversion_rate").cast(FloatType()))

# Clean currency columns
df_ads = df_ads.withColumn(
    "cost",
    when(col("cost") != "", regexp_replace(col("cost"), "[$,]", "").cast(FloatType())).otherwise(None)
)
df_ads = df_ads.withColumn(
    "sale_amount",
    when(col("sale_amount") != "", regexp_replace(col("sale_amount"), "[$,]", "").cast(FloatType())).otherwise(None)
)

# Standardize text columns
for c in ['campaign_name', 'location', 'device', 'keyword']:
    df_ads = df_ads.withColumn(c, lower(trim(col(c))))

# Standardize date column
df_ads = df_ads.withColumn("ad_date", 
    when(col("ad_date").rlike(r'\d{4}-\d{2}-\d{2}'), to_date(col("ad_date"), "yyyy-MM-dd"))
    .when(col("ad_date").rlike(r'\d{2}-\d{2}-\d{4}'), to_date(col("ad_date"), "dd-MM-yyyy"))
    .when(col("ad_date").rlike(r'\d{4}/\d{2}/\d{2}'), to_date(col("ad_date"), "yyyy/MM/dd"))
    .otherwise(None)
)

# ------------------------------
# Staging Layer
# ------------------------------
df_ads_staging = df_ads.withColumn(
    "campaign_name",
    when(col("campaign_name").rlike("dataanalyticscourse|data anlytics corse|data analitics online"), "data_analytics_course")
    .otherwise(col("campaign_name"))
).withColumn(
    "location",
    when(col("location").rlike("hyderabad|hyderabad "), "hyderabad")
    .otherwise(col("location"))
).withColumn(
    "device",
    when(col("device").rlike("desktop|mobile"), col("device"))
    .otherwise("other")
)

# Optionally, add CTR only
df_ads_staging = df_ads_staging.withColumn(
    "ctr", when(col("impressions") > 0, col("clicks") / col("impressions")).otherwise(0)
)

# ------------------------------
# Warehouse Layer
# ------------------------------
df_ads_warehouse = df_ads_staging.withColumn(
    "cpc", when(col("clicks") > 0, col("cost") / col("clicks")).otherwise(0)
).withColumn(
    "cpa", when(col("conversions") > 0, col("cost") / col("conversions")).otherwise(0)
).withColumn(
    "revenue_per_conversion", when(col("conversions") > 0, col("sale_amount") / col("conversions")).otherwise(0)
)

# Preview
print("Staging sample:")
df_ads_staging.show(5, truncate=False)

print("Warehouse sample:")
df_ads_warehouse.show(5, truncate=False)

# Save staging data
df_ads_staging.write.mode("overwrite").parquet("../data/ads_staging.parquet")

# Save warehouse data
df_ads_warehouse.write.mode("overwrite").parquet("../data/ads_warehouse.parquet")

Staging sample:
+-----+---------------------+------+-----------+------+-----+-----------+---------------+-----------+----------+---------+-------+----------------------+--------------------+
|ad_id|campaign_name        |clicks|impressions|cost  |leads|conversions|conversion_rate|sale_amount|ad_date   |location |device |keyword               |ctr                 |
+-----+---------------------+------+-----------+------+-----+-----------+---------------+-----------+----------+---------+-------+----------------------+--------------------+
|A2057|data analytcis course|172   |4850       |242.33|27   |4          |0.023          |1391.0     |2024-11-21|hyderbad |other  |data analitics online |0.03546391752577319 |
|A2398|data analytcis course|198   |4035       |215.94|22   |8          |0.0            |NULL       |2024-11-10|hydrebad |mobile |learn data analytics  |0.04907063197026022 |
|A2804|data_analytics_course|110   |3371       |211.89|12   |6          |0.055          |1743.0     |2024-11-