In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, ArrayType

# Create Spark session
spark = SparkSession.builder \
    .appName("Spark with Hive") \
    .enableHiveSupport() \
    .getOrCreate()

In [22]:
# # Load the JSON data
ad_hdfs_path = '/tmp/input_files/ad_campaigns_data.json'
user_hdfs_path = '/tmp/input_files/user_profile_data.json'
store_hdfs_path = '/tmp/input_files/store_data.json'


# Define the schema
schema_campaigns = StructType([
    StructField("campaign_id", StringType(), True),
    StructField("campaign_name", StringType(), True),
    StructField("campaign_country", StringType(), True),
    StructField("os_type", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("place_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_time", TimestampType(), True)
])


schema_users = StructType([
    StructField("user_id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("age_group", StringType(), True),
    StructField("category", ArrayType(StringType()), True)
])


schema_stores = StructType([
    StructField("store_name", StringType(), True),
    StructField("place_ids", ArrayType(StringType()), True)
])


df_campaign = spark.read.option("multiline","True").json(ad_hdfs_path)
df_user = spark.read.option("multiline","True").json(user_hdfs_path)
df_store = spark.read.option("multiline","True").json(store_hdfs_path)

#print schema and show data
df_campaign.printSchema()
df_campaign.show()

df_user.printSchema()
df_user.show()

df_store.printSchema()
df_store.show()

root
 |-- campaign_country: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- os_type: string (nullable = true)
 |-- place_id: string (nullable = true)
 |-- user_id: string (nullable = true)

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12T13:

In [25]:
# Extract date and hour from the event_time column and adding the new columns to dataframe
df_campaign = df_campaign.withColumn("event_time", col("event_time").cast("timestamp"))
df_campaign = df_campaign.withColumn("date", to_date("event_time"))
df_campaign = df_campaign.withColumn("hour", hour("event_time"))

In [26]:
# Define the output path
hdfs_output_path1 = '/tmp/input_files/output/'



# Q1. Analyze data for each campaign_id, date, hour, os_type & value to get all the events with counts
result_q1 = (
    df_campaign.groupBy("campaign_id", "date", "hour", "os_type", "event_type")
    .agg(count("event_type").alias("event_count"))
    .groupBy("campaign_id", "date", "hour", "os_type")
    .pivot("event_type")
    .agg(first("event_count"))
    .fillna(0)
    .select(
        "campaign_id",
        "date",
        "hour",
        "os_type",
        struct(
            col("impression").alias("impression"),
            col("click").alias("click"),
            col("video ad").alias("video_ad"),
        ).alias("event"),
    )
)

result_q1.show()



[Stage 43:>                                                         (0 + 1) / 1]

+-----------+----------+----+-------+---------+
|campaign_id|      date|hour|os_type|    event|
+-----------+----------+----+-------+---------+
|    ABCDFAE|2018-10-12|  13|android|{1, 1, 1}|
|    ABCDFAE|2018-10-12|  13|    ios|{1, 0, 0}|
+-----------+----------+----+-------+---------+



                                                                                

In [51]:
# Save the result to HDFS
result_q1.write.json(hdfs_output_path1 + "q1_output", mode="overwrite")
print ("Write Successful for Question 1")


Write Successful for Question 1


In [36]:
# Define the output path for Question 2
hdfs_output_path2 = '/tmp/input_files/output2/'

# Q2. Analyze data for each campaign_id, date, hour, store_name & value to get all the events with counts
q2_join = df_campaign.join(df_store, array_contains(df_store.place_ids, df_campaign.place_id), 'inner').drop(df_store.place_ids)

q2_join.show()




+----------------+-----------+--------------------+-----------+-------------------+----------+-------+---------+-------------------+----------+----+-------------+
|campaign_country|campaign_id|       campaign_name|device_type|         event_time|event_type|os_type| place_id|            user_id|      date|hour|   store_name|
+----------------+-----------+--------------------+-----------+-------------------+----------+-------+---------+-------------------+----------+----+-------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12 13:10:05|impression|    ios|CASSBB-11|1264374214654454321|2018-10-12|  13|     McDonald|
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12 13:10:05|impression|    ios|CASSBB-11|1264374214654454321|2018-10-12|  13|   BurgerKing|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12 13:09:04|impression|android|CADGBD-13|1674374214654454321|2018-10-12|  13|     McDonald|
|             USA|    

In [38]:
result_q2 = (
    q2_join.groupBy("campaign_id", "date", "hour", "store_name", "event_type")
    .agg(count("event_type").alias("event_count"))
    .groupBy("campaign_id", "date", "hour", "store_name")
    .pivot("event_type")
    .agg(first("event_count"))
    .fillna(0)
    .select(
        "campaign_id",
        "date",
        "hour",
        "store_name",
        struct(
            col("impression").alias("impression"),
            col("click").alias("click"),
            col("video ad").alias("video_ad"),
        ).alias("event"),
    )
)

result_q2.show()

+-----------+----------+----+-------------+---------+
|campaign_id|      date|hour|   store_name|    event|
+-----------+----------+----+-------------+---------+
|    ABCDFAE|2018-10-12|  13|     McDonald|{2, 1, 0}|
|    ABCDFAE|2018-10-12|  13|shoppers stop|{0, 0, 1}|
|    ABCDFAE|2018-10-12|  13|   BurgerKing|{1, 1, 0}|
+-----------+----------+----+-------------+---------+



In [50]:
# Save the result to HDFS
result_q2.write.json(hdfs_output_path2 + "q2_output", mode="overwrite")
print ("Write Successful for Question 2")


Write Successful for Question 2


In [45]:
# Define the output path for Question 2
hdfs_output_path3 = '/tmp/input_files/output3/'

# Q3. Analyse data for each campaign_id, date, hour, gender_type & value to get all the events with counts

q3_join = df_campaign.join(df_user, df_user.user_id == df_campaign.user_id, 'inner').drop(df_user.user_id)

q3_join.show()


+----------------+-----------+--------------------+-----------+-------------------+----------+-------+---------+-------------------+----------+----+---------+--------------------+-------+------+
|campaign_country|campaign_id|       campaign_name|device_type|         event_time|event_type|os_type| place_id|            user_id|      date|hour|age_group|            category|country|gender|
+----------------+-----------+--------------------+-----------+-------------------+----------+-------+---------+-------------------+----------+----+---------+--------------------+-------+------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12 13:10:05|impression|    ios|CASSBB-11|1264374214654454321|2018-10-12|  13|    18-25|  [shopper, student]|    USA|  male|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12 13:09:04|impression|android|CADGBD-13|1674374214654454321|2018-10-12|  13|    25-50|            [parent]|    USA|female|
|             USA|    ABC

In [46]:
result_q3 = (
    q3_join.groupBy("campaign_id", "date", "hour", "gender", "event_type")
    .agg(count("event_type").alias("event_count"))
    .groupBy("campaign_id", "date", "hour", "gender")
    .pivot("event_type")
    .agg(first("event_count"))
    .fillna(0)
    .select(
        "campaign_id",
        "date",
        "hour",
        "gender",
        struct(
            col("impression").alias("impression"),
            col("click").alias("click"),
            col("video ad").alias("video_ad"),
        ).alias("event"),
    )
)

result_q3.show()

+-----------+----------+----+------+---------+
|campaign_id|      date|hour|gender|    event|
+-----------+----------+----+------+---------+
|    ABCDFAE|2018-10-12|  13|  male|{1, 1, 1}|
|    ABCDFAE|2018-10-12|  13|female|{1, 0, 0}|
+-----------+----------+----+------+---------+



In [49]:
# Save the result to HDFS
result_q3.write.json(hdfs_output_path3 + "q3_output", mode="overwrite")
print ("Write Successful for Question 3")

Write Successful for Question 3
