In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [26]:
spark = SparkSession.builder\
    .appName("Marketing Campaign Data Analysis Using PySpark")\
    .enableHiveSupport()\
    .getOrCreate()

In [27]:
ad_campaigns = spark.read.format("json").option("multiline","true").option("header","true").load("ad_campaigns_data.json")
ad_campaigns.printSchema()

# convert event_time from string to timestamp
ad_campaigns = ad_campaigns.withColumn("event_time", to_timestamp(ad_campaigns["event_time"]))

# convert user_id from string to int
# ad_campaigns = ad_campaigns.withColumn("user_id", ad_campaigns["user_id"].cast(IntegerType()))

ad_campaigns.printSchema()
ad_campaigns.show(5)

root
 |-- campaign_country: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- os_type: string (nullable = true)
 |-- place_id: string (nullable = true)
 |-- user_id: string (nullable = true)

root
 |-- campaign_country: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- os_type: string (nullable = true)
 |-- place_id: string (nullable = true)
 |-- user_id: string (nullable = true)

+----------------+-----------+--------------------+-----------+-------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|         event_time|event_t

In [28]:
user_profile = spark.read.format("json").option("multiline","true").option("header", "true").load("user_profile_data.json")
user_profile.schema

# convert user_id from string to int
# user_profile = user_profile.withColumn("user_id", user_profile["user_id"].cast(IntegerType()))

user_profile.printSchema()

root
 |-- age_group: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- country: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- user_id: string (nullable = true)



In [29]:
store_data = spark.read.format("json").option("multiline","true").load("store_data.json")
store_data.schema

StructType([StructField('place_ids', ArrayType(StringType(), True), True), StructField('store_name', StringType(), True)])

# Q1. Analyse data for each campaign_id, date, hour, os_type & value to get all the events with counts

In [30]:
# extract hour and date from ad_campaigns
ad_campaigns = ad_campaigns.withColumn("date", to_date(ad_campaigns["event_time"]))
ad_campaigns = ad_campaigns.withColumn("hour", hour(ad_campaigns["event_time"]))
ad_campaigns.show(5)

+----------------+-----------+--------------------+-----------+-------------------+----------+-------+---------+-------------------+----------+----+
|campaign_country|campaign_id|       campaign_name|device_type|         event_time|event_type|os_type| place_id|            user_id|      date|hour|
+----------------+-----------+--------------------+-----------+-------------------+----------+-------+---------+-------------------+----------+----+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12 16:10:05|impression|    ios|CASSBB-11|1264374214654454321|2018-10-12|  16|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12 16:09:04|impression|android|CADGBD-13|1674374214654454321|2018-10-12|  16|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12 16:10:10|  video ad|android|BADGBA-12|   5747421465445443|2018-10-12|  16|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12 16:10:12|     click|android|CASS

In [31]:
# ad_campaigns.groupBy("campaign_id","date","hour","os_type").count().show()
Q1_result = (ad_campaigns.groupBy("campaign_id","date","hour","os_type","event_type")\
                .agg(count("event_type").alias("event_count"))
                .groupBy("campaign_id","date","hour","os_type")
                .pivot("event_type")
                .agg(first("event_count"))
                .fillna(0)
                .select
                ("campaign_id", "date","hour","os_type",
                 struct(col("impression"),col("click"),col("video ad"),).alias("event"),))

In [32]:
Q1_result.show()

+-----------+----------+----+-------+---------+
|campaign_id|      date|hour|os_type|    event|
+-----------+----------+----+-------+---------+
|    ABCDFAE|2018-10-12|  16|android|{1, 1, 1}|
|    ABCDFAE|2018-10-12|  16|    ios|{1, 0, 0}|
+-----------+----------+----+-------+---------+



# Q2. Analyse data for each campaign_id, date, hour, store_name & value to get all the events with counts

In [33]:
Q2_result  = (ad_campaigns.join(store_data, array_contains(store_data.place_ids,  ad_campaigns.place_id), "inner").drop(ad_campaigns.place_id).groupBy("campaign_id","date","hour","store_name", "event_type")\
    .agg(count("event_type").alias("event_count"))\
    .groupBy("campaign_id","date","hour","store_name")\
    .pivot("event_type")\
    .agg(first("event_count"))\
    .fillna(0)\
    .select
    ("campaign_id", "date","hour","store_name",
    struct(col("impression"),col("click"),col("video ad"),).alias("event"),))
Q2_result.show()

+-----------+----------+----+-------------+---------+
|campaign_id|      date|hour|   store_name|    event|
+-----------+----------+----+-------------+---------+
|    ABCDFAE|2018-10-12|  16|shoppers stop|{0, 0, 1}|
|    ABCDFAE|2018-10-12|  16|     McDonald|{2, 1, 0}|
|    ABCDFAE|2018-10-12|  16|   BurgerKing|{1, 1, 0}|
+-----------+----------+----+-------------+---------+



# Q3. Analyse data for each campaign_id, date, hour, gender_type & value to get all the events with counts

In [36]:
df = ad_campaigns.join(user_profile, "user_id", "inner").drop(ad_campaigns.user_id)

+----------------+-----------+--------------------+-----------+-------------------+----------+-------+---------+----------+----+---------+--------------------+-------+------+
|campaign_country|campaign_id|       campaign_name|device_type|         event_time|event_type|os_type| place_id|      date|hour|age_group|            category|country|gender|
+----------------+-----------+--------------------+-----------+-------------------+----------+-------+---------+----------+----+---------+--------------------+-------+------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12 16:10:05|impression|    ios|CASSBB-11|2018-10-12|  16|    18-25|  [shopper, student]|    USA|  male|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12 16:09:04|impression|android|CADGBD-13|2018-10-12|  16|    25-50|            [parent]|    USA|female|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12 16:10:10|  video ad|android|BADGBA-12|2018-10-12|  

In [35]:
Q3_result  = (ad_campaigns.join(user_profile, "user_id", "inner").drop(ad_campaigns.user_id).groupBy("campaign_id","date","hour","gender", "event_type")\
    .agg(count("event_type").alias("event_count"))\
    .groupBy("campaign_id","date","hour","gender")\
    .pivot("event_type")\
    .agg(first("event_count"))\
    .fillna(0)\
    .select
    ("campaign_id", "date","hour","gender",
    struct(col("impression"),col("click"),col("video ad"),).alias("event"),))
Q3_result.show()

+-----------+----------+----+------+---------+
|campaign_id|      date|hour|gender|    event|
+-----------+----------+----+------+---------+
|    ABCDFAE|2018-10-12|  16|  male|{1, 1, 1}|
|    ABCDFAE|2018-10-12|  16|female|{1, 0, 0}|
+-----------+----------+----+------+---------+



# Write to HDFS

# Create HIVE table on top of it