In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, ArrayType

spark = SparkSession.builder \
    .appName("Spark with Hive") \
    .enableHiveSupport() \
    .getOrCreate()

schema_campaign=StructType([
                         StructField("campaign_id",StringType(),True),
                         StructField("campaign_name",StringType(),True),
                         StructField("campaign_country",StringType(),True),
                         StructField("os_type",StringType(),True),
                         StructField("device_type",StringType(),True),
                         StructField("place_id",StringType(),True),
                         StructField("user_id",StringType(),True),
                         StructField("event_type",StringType(),True),
                         StructField("event_time",TimestampType(),True)
                          ])
df_campaign = spark.read.option("multiline", "true").schema(schema_campaign).json("/content/ad_campaign_data.json")
#df_campaign = spark.read.option("multiline", "true").json("/ad_campaign_data.json")

schema_users = StructType([
    StructField("user_id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("age_group", StringType(), True),
    StructField("category", ArrayType(StringType()), True)
])

df_users = spark.read.option("multiline", "true").schema(schema_users).json("/content/user_profile_data.json")

schema_stores = StructType([
    StructField("store_name", StringType(), True),
    StructField("place_ids", ArrayType(StringType()), True)
])
df_stores = spark.read.option("multiline", "true").schema(schema_stores).json("/content/store_data.json")
#df_stores = spark.read.option("multiline", "true").json("/store_data.json")

df_campaign.printSchema()
df_campaign.show(5)


#df_campaigns = spark.read.format('json').option("multiline", "true").schema(schema_campaign).load(hdfs_path1)
#df_users = spark.read.format('json').option("multiline", "true").schema(schema_users).load(hdfs_path2)
#df_stores = spark.read.format('json').option("multiline", "true").schema(schema_stores).load(hdfs_path3)

df_users.printSchema()
df_users.show(5)

df_stores.printSchema()
df_stores.show(5)


df_users.printSchema()
df_users.show(5)

df_stores.printSchema()
df_stores.show(5)

root
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- campaign_country: string (nullable = true)
 |-- os_type: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- place_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: timestamp (nullable = true)

+-----------+--------------------+----------------+-------+-----------+---------+-------------------+----------+-------------------+
|campaign_id|       campaign_name|campaign_country|os_type|device_type| place_id|            user_id|event_type|         event_time|
+-----------+--------------------+----------------+-------+-----------+---------+-------------------+----------+-------------------+
|    ABCDFAE|Food category tar...|             USA|    ios|      apple|CASSBB-11|1264374214654454321|impression|2018-10-12 13:10:05|
|    ABCDFAE|Food category tar...|             USA|android|   MOTOROLA|CADGBD-

In [7]:
# Extract date and hour from the event_time column
df_campaigns = df_campaign.withColumn("event_time", F.col("event_time").cast("timestamp"))
df_campaigns = df_campaigns.withColumn("date", F.to_date("event_time"))
df_campaigns = df_campaigns.withColumn("hour", F.hour("event_time"))

In [9]:
# Q1. Analyze data for each campaign_id, date, hour, os_type & value to get all the events with counts
result_q1 = (
    df_campaigns.groupBy("campaign_id", "date", "hour", "os_type", "event_type")
    .agg(F.count("event_type").alias("event_count"))
    .groupBy("campaign_id", "date", "hour", "os_type")
    .pivot("event_type")
    .agg(F.first("event_count"))
    .fillna(0)
    .select(
        "campaign_id",
        "date",
        "hour",
        "os_type",
        F.struct(
            F.col("impression").alias("impression"),
            F.col("click").alias("click"),
            F.col("video ad").alias("video_ad"),
        ).alias("event"),
    )
)
result_q1.show()


+-----------+----------+----+-------+---------+
|campaign_id|      date|hour|os_type|    event|
+-----------+----------+----+-------+---------+
|    ABCDFAE|2018-10-12|  13|android|{1, 1, 1}|
|    ABCDFAE|2018-10-12|  13|    ios|{1, 0, 0}|
+-----------+----------+----+-------+---------+



In [22]:
# Q2. Analyze data for each campaign_id, date, hour, store_name & value to get all the events with counts

(df_campaigns.join(df_stores, F.array_contains(df_stores.place_ids, df_campaigns.place_id), "inner")
#df_stores.place_ids is an array containing multiple place_ids.df_campaigns.place_id is a single value.
#F.array_contains(df_stores.place_ids, df_campaigns.place_id) checks if place_id from df_campaigns is in the place_ids array of df_stores.

.groupBy("campaign_id", "date", "hour", "store_name","event_type")
#Counts the occurrences of each event_type for the group and stores the result in a new column, event_count

.agg(F.count("event_type").alias("event_count"))
.groupBy("campaign_id", "date", "hour", "store_name") #Groups by the same fields except for event_type.
 #Converts unique values of event_type (e.g., impression, click, video ad) into separate columns.

 #The result is a table where each event_type becomes a column with its corresponding count
.pivot("event_type")
.agg(F.first("event_count")) #Retrieves the first (or only) value of event_count for each pivoted column
.fillna(0)

 #Creates a new column, event, as a struct containing:
#impression: Count of impression events.
#click: Count of click events.
#video ad: Count of video ad events.
.select(
    "campaign_id",
    "date",
    "hour",
    "store_name",
    F.struct(
        F.col("impression").alias("impression"),
        F.col("video ad").alias("video_ad"),
        F.col("Click").alias("click")
    ).alias("event"),
)).show()




+-----------+----------+----+-------------+---------+
|campaign_id|      date|hour|   store_name|    event|
+-----------+----------+----+-------------+---------+
|    ABCDFAE|2018-10-12|  13|   BurgerKing|{1, 0, 1}|
|    ABCDFAE|2018-10-12|  13|     McDonald|{2, 0, 1}|
|    ABCDFAE|2018-10-12|  13|shoppers stop|{0, 1, 0}|
+-----------+----------+----+-------------+---------+



In [33]:
# Q3. Analyze data for each campaign_id, date, hour, gender_type & value to get all the events with counts

(
df_campaigns.join(df_users,"user_id","inner")
.groupBy("campaign_id", "date", "hour", "gender","event_type")
.agg(F.count("event_type").alias("event_count"))
.groupBy("campaign_id","date","hour","gender")
.pivot("event_type")
.agg(F.first("event_count"))
.fillna(0)
.select(
    "campaign_id",
    "date",
    "hour",
    "gender",
    F.struct(
        F.col("impression").alias("Impression"),
        F.col("video ad").alias("Video_Ad"),
        F.col("click").alias("Click"),
    ).alias("event"),
)
).show()

+-----------+----------+----+------+---------+
|campaign_id|      date|hour|gender|    event|
+-----------+----------+----+------+---------+
|    ABCDFAE|2018-10-12|  13|  male|{1, 1, 1}|
|    ABCDFAE|2018-10-12|  13|female|{1, 0, 0}|
+-----------+----------+----+------+---------+

