In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, ArrayType

# Create Spark session
spark = SparkSession.builder \
    .appName("Spark with Hive") \
    .getOrCreate()

# # Load the JSON data
path1 = '/Volumes/workspace/default/assignment-1/ad_campaigns_data.json'
path2 = '/Volumes/workspace/default/assignment-1/store_data.json'
path3 = '/Volumes/workspace/default/assignment-1/user_profile_data.json'


# Define the schema
schema_campaigns = StructType([
    StructField("campaign_id", StringType(), True),
    StructField("campaign_name", StringType(), True),
    StructField("campaign_country", StringType(), True),
    StructField("os_type", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("place_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_time", TimestampType(), True)
])


schema_users = StructType([
    StructField("user_id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("age_group", StringType(), True),
    StructField("category", ArrayType(StringType()), True)
])


schema_stores = StructType([
    StructField("store_name", StringType(), True),
    StructField("place_ids", ArrayType(StringType()), True)
])


df_campaigns = spark.read.format('json').option("multiline", "true").schema(schema_campaigns).load(path1)
df_users = spark.read.format('json').option("multiline", "true").schema(schema_users).load(path3)
df_stores = spark.read.format('json').option("multiline", "true").schema(schema_stores).load(path2)





In [0]:
# Print schema and sample data
df_campaigns.printSchema()
df_users.printSchema()
df_stores.printSchema()

root
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- campaign_country: string (nullable = true)
 |-- os_type: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- place_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: timestamp (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age_group: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)

root
 |-- store_name: string (nullable = true)
 |-- place_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [0]:
# Show the dataframes
df_campaigns.show(5)
df_users.show(5)
df_stores.show(5)

+-----------+--------------------+----------------+-------+-----------+---------+-------------------+----------+-------------------+
|campaign_id|       campaign_name|campaign_country|os_type|device_type| place_id|            user_id|event_type|         event_time|
+-----------+--------------------+----------------+-------+-----------+---------+-------------------+----------+-------------------+
|    ABCDFAE|Food category tar...|             USA|    ios|      apple|CASSBB-11|1264374214654454321|impression|2018-10-12 13:10:05|
|    ABCDFAE|Food category tar...|             USA|android|   MOTOROLA|CADGBD-13|1674374214654454321|impression|2018-10-12 13:09:04|
|    ABCDFAE|Food category tar...|             USA|android|    SAMSUNG|BADGBA-12|   5747421465445443|  video ad|2018-10-12 13:10:10|
|    ABCDFAE|Food category tar...|             USA|android|    SAMSUNG|CASSBB-11|1864374214654454132|     click|2018-10-12 13:10:12|
+-----------+--------------------+----------------+-------+----------

In [0]:
# Extract date and hour from the event_time column
df_campaigns = df_campaigns.withColumn("event_time", F.col("event_time").cast("timestamp"))
df_campaigns = df_campaigns.withColumn("date", F.to_date("event_time"))
df_campaigns = df_campaigns.withColumn("hour", F.hour("event_time"))

In [0]:
result_q1 = (
    df_campaigns.groupBy("campaign_id", "date", "hour", "os_type", "event_type")
    .agg(F.count("event_type").alias("event_count"))
    .groupBy("campaign_id", "date", "hour", "os_type")
    .pivot("event_type")
    .agg(F.first("event_count"))
    .fillna(0)
    .select(
        "campaign_id",
        "date",
        "hour",
        "os_type",
        F.struct(
            F.col("impression").alias("impression"),
            F.col("click").alias("click"),
            F.col("video ad").alias("video_ad"),
        ).alias("event"),
    )
)

result_q1.display()

campaign_id,date,hour,os_type,event
ABCDFAE,2018-10-12,13,android,"List(1, 1, 1)"
ABCDFAE,2018-10-12,13,ios,"List(1, 0, 0)"


In [0]:
# Q2. Analyze data for each campaign_id, date, hour, store_name & value to get all the events with counts
result_q2 = (
    df_campaigns.join(df_stores, F.array_contains(df_stores.place_ids, df_campaigns.place_id), "inner")
    .groupBy("campaign_id", "date", "hour", "store_name", "event_type")
    .agg(F.count("event_type").alias("event_count"))
    .groupBy("campaign_id", "date", "hour", "store_name")
    .pivot("event_type")
    .agg(F.first("event_count"))
    .fillna(0)
    .select(
        "campaign_id",
        "date",
        "hour",
        "store_name",
        F.struct(
            F.col("impression").alias("impression"),
            F.col("click").alias("click"),
            F.col("video ad").alias("video_ad"),
        ).alias("event"),
    )
)

result_q2.display()


campaign_id,date,hour,store_name,event
ABCDFAE,2018-10-12,13,BurgerKing,"List(1, 1, 0)"
ABCDFAE,2018-10-12,13,McDonald,"List(2, 1, 0)"
ABCDFAE,2018-10-12,13,shoppers stop,"List(0, 0, 1)"


In [0]:
# Q3. Analyze data for each campaign_id, date, hour, gender_type & value to get all the events with counts
result_q3 = (
    df_campaigns.join(df_users, "user_id", "inner")
    .groupBy("campaign_id", "date", "hour", "gender", "event_type")
    .agg(F.count("event_type").alias("event_count"))
    .groupBy("campaign_id", "date", "hour", "gender")
    .pivot("event_type")
    .agg(F.first("event_count"))
    .fillna(0)
    .select(
        "campaign_id",
        "date",
        "hour",
        "gender",
        F.struct(
            F.col("impression").alias("impression"),
            F.col("click").alias("click"),
            F.col("video ad").alias("video_ad"),
        ).alias("event"),
    )
)

result_q3.display()


campaign_id,date,hour,gender,event
ABCDFAE,2018-10-12,13,male,"List(1, 1, 1)"
ABCDFAE,2018-10-12,13,female,"List(1, 0, 0)"
