In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, ArrayType
from pyspark.sql.functions import col, to_date, hour, count, lit, struct, explode

In [None]:
def define_schema():
    ad_campaigns_schema = StructType([
    StructField("campaign_id", StringType(), True),
    StructField("campaign_name", StringType(), True),
    StructField("campaign_country", StringType(), True),
    StructField("os_type", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("place_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_time", TimestampType(), True)  # since it’s an ISO datetime
    ])

    user_profile_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("age_group", StringType(), True),
    StructField("category", ArrayType(StringType()), True)  # array of strings
    ])

    store_schema = StructType([
    StructField("store_name", StringType(), True),
    StructField("place_ids", ArrayType(StringType()), True)  # array of strings
    ])
    return ad_campaigns_schema, user_profile_schema, store_schema

In [None]:
def load_data(spark, ad_campaigns_schema, user_profile_schema, store_schema):
    adc_df= spark.read.option("multiline", "true").schema(ad_campaigns_schema).json("/projects/marketingDA/input_data/ad_campaigns_data.json")
    up_df = spark.read.option("multiline", "true").schema(user_profile_schema).json("/projects/marketingDA/input_data/user_profile_data.json")
    s_df = spark.read.option("multiline", "true").schema(store_schema).json("/projects/marketingDA/input_data/store_data.json")
    return adc_df, up_df, s_df

In [None]:
def exploding_df_q1(up_df, s_df):
    up_df= up_df.withColumn("category", explode("category"))
    s_df= s_df.withColumn("place_ids", explode("place_ids"))
    return up_df, s_df

In [None]:
def sol_q1(adc_df):
    adc_df_temp= adc_df.select("campaign_id", "os_type", "event_type", to_date(col("event_time")).alias("date"), hour(col("event_time")) \
        .alias("hour"))
    grouped_df= adc_df_temp.groupBy("campaign_id", "date", "hour", "os_type", "event_type") \
            .agg(count("*").alias("event_count")) \
            .orderBy("event_type")
    grouped_df= grouped_df.withColumnRenamed("os_type", "value").withColumn("type", lit("os_type")) \
            .select("campaign_id", "date", "hour", "type", "value", "event_type", "event_count")
    pivoted_df= grouped_df.groupBy("campaign_id", "date", "hour", "type", "value").pivot("event_type").sum("event_count")
    struct_df= pivoted_df.select("campaign_id", "date", "hour", "type", "value", \
                            struct(col("impression"), col("click"), col("video ad")).alias("event"))
    return struct_df

In [None]:
if __name__== "__main__":
    spark= SparkSession.builder \
       .appName("Marketing Campaign DA") \
       .enableHiveSupport() \
       .getOrCreate()
    schema= define_schema()
    adc_df, up_df, s_df= load_data(spark, schema[0], schema[1], schema[2])
    up_df, s_df= exploding_df_q1(up_df, s_df)
    final_df_q1= sol_q1(adc_df)
    final_df_q1.write.mode("overwrite").json("/projects/marketingDA/output_data/marketing_DA/ques_1")