In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName("Add Campaign")\
        .getOrCreate()

spark

In [2]:
hdfs_path="hdfs://localhost:9000/Pyspark/Marketing_Campaign/"

ad_campaigns_df=spark.read.format("json")\
                .option("multiline", "true")\
                .load(hdfs_path+"ad_campaigns_data.json")

ad_campaigns_df.show()

user_profile_df=spark.read.format("json")\
                .option("multiline", "true")\
                .load(hdfs_path+"user_profile_data.json")
user_profile_df.show()


store_df=spark.read.format("json")\
                .option("multiline", "true")\
                .load(hdfs_path+"store_data.json")

store_df.show()

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12T13:09:...|impression|android|CADGBD-13|1674374214654454321|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|  video ad|android|BADGBA-12|   5747421465445443|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|     click|android|CASSBB-11|1864374214654454132|
+----------------+-----------+--------------------+-----------

In [3]:

ad_campaigns_df.show(truncate=False)

+----------------+-----------+-----------------------------+-----------+------------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|campaign_name                |device_type|event_time              |event_type|os_type|place_id |user_id            |
+----------------+-----------+-----------------------------+-----------+------------------------+----------+-------+---------+-------------------+
|USA             |ABCDFAE    |Food category target campaign|apple      |2018-10-12T13:10:05.000Z|impression|ios    |CASSBB-11|1264374214654454321|
|USA             |ABCDFAE    |Food category target campaign|MOTOROLA   |2018-10-12T13:09:04.000Z|impression|android|CADGBD-13|1674374214654454321|
|USA             |ABCDFAE    |Food category target campaign|SAMSUNG    |2018-10-12T13:10:10.000Z|video ad  |android|BADGBA-12|5747421465445443   |
|USA             |ABCDFAE    |Food category target campaign|SAMSUNG    |2018-10-12T13:10:12.000Z|click     |android|CA

In [44]:
# Q1. Analyse data for each campaign_id, date, hour, os_type & value to get all the events with counts
from pyspark.sql.functions import *

df=ad_campaigns_df.groupBy("campaign_id",
                        substring(col("event_time"), 0, 10).alias("date"),
                        substring(col("event_time"),12, 2).alias("hour"),
                        col("os_type"),
                        col("event_type")
                       ).agg(count("event_type").alias("events"))


df.show()

selected_Exp_df = df.selectExpr(
    "campaign_id",
    "date",
    "hour",
    "os_type as type",
    "os_type as value",
    "events",
    "event_type",
    "struct(event_type, events) as event"
    
    
)
selected_Exp_df.show()


final_df = selected_Exp_df.groupBy("campaign_id", "date", "hour", "type", "value")\
            .agg(collect_list("event").alias("events"))\
            .selectExpr(
              "campaign_id",
              "date",
              "hour",
              "type",
              "value",
              "events",
              "map_from_entries(events) as event"
            )

final_df.show(truncate=False)

# load data to hdfs
# ad_campaigns.coalesce(1).write.format('json').save("hdfs://localhost:9000/Pyspark/Marketing_Campaign/results/")
# print("Write Successfull")

+-----------+----------+----+-------+----------+------+
|campaign_id|      date|hour|os_type|event_type|events|
+-----------+----------+----+-------+----------+------+
|    ABCDFAE|2018-10-12|  13|    ios|impression|     1|
|    ABCDFAE|2018-10-12|  13|android|     click|     1|
|    ABCDFAE|2018-10-12|  13|android|impression|     1|
|    ABCDFAE|2018-10-12|  13|android|  video ad|     1|
+-----------+----------+----+-------+----------+------+

+-----------+----------+----+-------+-------+------+----------+---------------+
|campaign_id|      date|hour|   type|  value|events|event_type|          event|
+-----------+----------+----+-------+-------+------+----------+---------------+
|    ABCDFAE|2018-10-12|  13|    ios|    ios|     1|impression|{impression, 1}|
|    ABCDFAE|2018-10-12|  13|android|android|     1|     click|     {click, 1}|
|    ABCDFAE|2018-10-12|  13|android|android|     1|impression|{impression, 1}|
|    ABCDFAE|2018-10-12|  13|android|android|     1|  video ad|  {video

In [None]:
# load data to hdfs
# coalesce will merge and save the data in only one csv file
ad_campaigns.coalesce(1).write.format('json').save("hdfs://localhost:9000/Pyspark/Marketing_Campaign/results/")
print("Write Successfull")

In [27]:
from pyspark.sql.functions import *

ad_campaigns_df.show()

ad_campaigns=ad_campaigns_df.groupBy("campaign_id",
                        substring(col("event_time"), 0, 10).alias("date"),
                        substring(col("event_time"),12, 2).alias("hour"),
                        col("os_type"),
                        col("event_type")
                       ).agg(count("event_type").alias("events"))\
                        .selectExpr(
                          "campaign_id",
                          "date",
                          "hour",
                          "'os_type' as type",
                          "os_type as value",
                          "struct(event_type, events) as event"
                          ) \
                          .groupBy("campaign_id", "date", "hour", "type", "value") \
                          .agg(collect_list("event").alias("events")) \
                          .selectExpr(
                              "campaign_id",
                              "date",
                              "hour",
                              "type",
                              "value",
                              "map_from_entries(events) as event"
                          )

ad_campaigns.show()

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12T13:09:...|impression|android|CADGBD-13|1674374214654454321|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|  video ad|android|BADGBA-12|   5747421465445443|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|     click|android|CASSBB-11|1864374214654454132|
+----------------+-----------+--------------------+-----------

In [47]:
# Q2.Analyse data for each campaign_id, date, hour, store_name & value to get all the events with counts

ad_campaigns_df.show(truncate=False)
store_df.show(truncate=False)

+----------------+-----------+-----------------------------+-----------+------------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|campaign_name                |device_type|event_time              |event_type|os_type|place_id |user_id            |
+----------------+-----------+-----------------------------+-----------+------------------------+----------+-------+---------+-------------------+
|USA             |ABCDFAE    |Food category target campaign|apple      |2018-10-12T13:10:05.000Z|impression|ios    |CASSBB-11|1264374214654454321|
|USA             |ABCDFAE    |Food category target campaign|MOTOROLA   |2018-10-12T13:09:04.000Z|impression|android|CADGBD-13|1674374214654454321|
|USA             |ABCDFAE    |Food category target campaign|SAMSUNG    |2018-10-12T13:10:10.000Z|video ad  |android|BADGBA-12|5747421465445443   |
|USA             |ABCDFAE    |Food category target campaign|SAMSUNG    |2018-10-12T13:10:12.000Z|click     |android|CA

In [28]:
from pyspark.sql.functions import *

ad_campaigns_df.show()

ad_campaigns=ad_campaigns_df.groupBy("campaign_id",
                        substring(col("event_time"), 0, 10).alias("date"),
                        substring(col("event_time"),12, 2).alias("hour"),
                        col("os_type"),
                        col("event_type")
                       ).agg(count("event_type").alias("events"))\
                        .selectExpr(
                          "campaign_id",
                          "date",
                          "hour",
                          "'os_type' as type",
                          "os_type as value",
                          "struct(event_type, events) as event"
                          ) \
                          .groupBy("campaign_id", "date", "hour", "type", "value") \
                          .agg(collect_list("event").alias("events")) \
                          .selectExpr(
                              "campaign_id",
                              "date",
                              "hour",
                              "type",
                              "value",
                              "map_from_entries(events) as event"
                          )

ad_campaigns.show()

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12T13:09:...|impression|android|CADGBD-13|1674374214654454321|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|  video ad|android|BADGBA-12|   5747421465445443|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|     click|android|CASSBB-11|1864374214654454132|
+----------------+-----------+--------------------+-----------

In [28]:
from pyspark.sql.functions import *

ad_campaigns_df.show()

ad_campaigns=ad_campaigns_df.groupBy("campaign_id",
                        substring(col("event_time"), 0, 10).alias("date"),
                        substring(col("event_time"),12, 2).alias("hour"),
                        col("os_type"),
                        col("event_type")
                       ).agg(count("event_type").alias("events"))\
                        .selectExpr(
                          "campaign_id",
                          "date",
                          "hour",
                          "'os_type' as type",
                          "os_type as value",
                          "struct(event_type, events) as event"
                          ) \
                          .groupBy("campaign_id", "date", "hour", "type", "value") \
                          .agg(collect_list("event").alias("events")) \
                          .selectExpr(
                              "campaign_id",
                              "date",
                              "hour",
                              "type",
                              "value",
                              "map_from_entries(events) as event"
                          )

ad_campaigns.show()

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12T13:09:...|impression|android|CADGBD-13|1674374214654454321|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|  video ad|android|BADGBA-12|   5747421465445443|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|     click|android|CASSBB-11|1864374214654454132|
+----------------+-----------+--------------------+-----------

In [54]:
stores=ad_campaigns_df.join(store_df, array_contains(store_df.place_ids, ad_campaigns_df.place_id),"left")\
                    .groupBy("campaign_id",
                             substring("event_time", 0, 10).alias('date'),
                             substring("event_time", 12, 2).alias('hour'),
                             "store_name",
                             "event_type"
                             ).agg(count("event_type").alias('events'))\
                    .selectExpr("campaign_id",
                            "date",
                            "hour",
                            "'store_name' as type",
                            "store_name as value",
                            "struct(event_type, events) as event_dict")\
                    .groupBy("campaign_id",
                            "date",
                            "hour",
                            "type",
                            "value"
                            ).agg(collect_list("event_dict").alias('event'))\
                    .select("campaign_id",
                            "date",
                            "hour",
                            "type",
                            "value",
                            map_from_entries("event").alias('event'))
stores.show()

+-----------+----------+----+----------+-------------+--------------------+
|campaign_id|      date|hour|      type|        value|               event|
+-----------+----------+----+----------+-------------+--------------------+
|    ABCDFAE|2018-10-12|  13|store_name|   BurgerKing|{impression -> 1,...|
|    ABCDFAE|2018-10-12|  13|store_name|     McDonald|{click -> 1, impr...|
|    ABCDFAE|2018-10-12|  13|store_name|shoppers stop|     {video ad -> 1}|
+-----------+----------+----+----------+-------------+--------------------+



In [71]:
# Q2.Analyse data for each campaign_id, date, hour, store_name & value to get all the
# events with counts

joined_df = ad_campaigns_df.join(store_df, array_contains(store_df.place_ids, ad_campaigns_df.place_id), 'left')
grouped_df = joined_df.groupBy("campaign_id",
                              substring("event_time", 0, 10).alias("date"),
                              substring("event_time", 12, 2).alias("hour"),
                              "store_name",
                              "event_type"
                              ).agg(count("event_type").alias("events"))

selectExpr_df = grouped_df.selectExpr("campaign_id",
                                   "date",
                                   "hour",
                                   "store_name as type",
                                   "store_name as value",
                                   "struct(event_type, events) as event_dict")


grouped_df2 = selectExpr_df.groupBy(
    "campaign_id",
    "date",
    "hour",
    "type",
    "value"
    )\
    .agg(collect_list("event_dict").alias("event"))

final_df = grouped_df2.select(
    "campaign_id",
    "date",
    "hour",
    "type",
    "value",
    map_from_entries("event").alias("event")
    )\
    .show(truncate=False)
# joined_df = ad_campaigns_df.join(store_df, array_contains(store_df.place_ids, ad_campaigns_df.place_id), "left")

+-----------+----------+----+-------------+-------------+-----------------------------+
|campaign_id|date      |hour|type         |value        |event                        |
+-----------+----------+----+-------------+-------------+-----------------------------+
|ABCDFAE    |2018-10-12|13  |shoppers stop|shoppers stop|{video ad -> 1}              |
|ABCDFAE    |2018-10-12|13  |BurgerKing   |BurgerKing   |{impression -> 1, click -> 1}|
|ABCDFAE    |2018-10-12|13  |McDonald     |McDonald     |{click -> 1, impression -> 2}|
+-----------+----------+----+-------------+-------------+-----------------------------+



In [64]:
grouped_df.show()
selectExpr_df.show()

+-----------+----------+----+-------------+----------+------+
|campaign_id|      date|hour|   store_name|event_type|events|
+-----------+----------+----+-------------+----------+------+
|    ABCDFAE|2018-10-12|  13|     McDonald|     click|     1|
|    ABCDFAE|2018-10-12|  13|   BurgerKing|impression|     1|
|    ABCDFAE|2018-10-12|  13|     McDonald|impression|     2|
|    ABCDFAE|2018-10-12|  13|shoppers stop|  video ad|     1|
|    ABCDFAE|2018-10-12|  13|   BurgerKing|     click|     1|
+-----------+----------+----+-------------+----------+------+

+-----------+----------+----+-------------+-------------+---------------+
|campaign_id|      date|hour|         type|        value|     event_dict|
+-----------+----------+----+-------------+-------------+---------------+
|    ABCDFAE|2018-10-12|  13|     McDonald|     McDonald|     {click, 1}|
|    ABCDFAE|2018-10-12|  13|   BurgerKing|   BurgerKing|{impression, 1}|
|    ABCDFAE|2018-10-12|  13|     McDonald|     McDonald|{impression, 2

In [None]:
user_profile=ad_campaigns_df.join(user_profile_df, ad_campaigns_df.user_id == user_profile_df.user_id, "left")\
                            .select("campaign_id",
                                    substring("event_time", 0, 10).alias("date"),
                                    substring("event_time", 12, 2).alias("hour"),
                                    lit('gender').alias("type"),
                                    col("gender").alias("value"),
                                    "event_type")\
                            .groupBy("campaign_id", "date", "hour", "type", "value", "event_type")\
                            .agg(count("event_type").alias("event_count"))\
                            .select("campaign_id", "date", "hour", "type", "value", struct("event_type", "event_count").alias("events_map"))\
                            .groupBy("campaign_id", "date", "hour", "type", "value")\
                            .agg(collect_list("events_map").alias("map_list"))\
                            .select("campaign_id", "date", "hour", "type", "value", map_from_entries("map_list").alias("event"))
                                   
user_profile.show()

In [91]:
# Q3.Analyse data for each campaign_id, date, hour, gender_type & value to get all the
# events with counts

joinded_df = ad_campaigns_df.join(user_profile_df, ad_campaigns_df.user_id==user_profile_df.user_id, 'left')

grouped_df = joinded_df.groupBy("campaign_id",
                               substring("event_time", 0, 10).alias("date"),
                               substring("event_time" , 12, 2).alias("hour"),
                               lit("gender").alias("type"),
                                col("gender").alias("value"),
                                "event_type")\
                    .agg(count("event_type").alias("event_count"))


grouped_df.show()

final_df = grouped_df.select("campaign_id", "date", "hour", "type", "value", struct("event_type", "event_count").alias("event_map"))\
        .groupBy("campaign_id", "date", "hour", "type", "value")\
        .agg(collect_list("event_map").alias("map_list"))\
        .select("campaign_id", "date", "hour", "type", "value", map_from_entries("map_list").alias("event"))

final_df.show(truncate=False)

+-----------+----------+----+------+------+----------+-----------+
|campaign_id|      date|hour|  type| value|event_type|event_count|
+-----------+----------+----+------+------+----------+-----------+
|    ABCDFAE|2018-10-12|  13|gender|female|impression|          1|
|    ABCDFAE|2018-10-12|  13|gender|  male|impression|          1|
|    ABCDFAE|2018-10-12|  13|gender|  male|     click|          1|
|    ABCDFAE|2018-10-12|  13|gender|  male|  video ad|          1|
+-----------+----------+----+------+------+----------+-----------+

+-----------+----------+----+------+------+--------------------------------------------+
|campaign_id|date      |hour|type  |value |event                                       |
+-----------+----------+----+------+------+--------------------------------------------+
|ABCDFAE    |2018-10-12|13  |gender|male  |{impression -> 1, click -> 1, video ad -> 1}|
|ABCDFAE    |2018-10-12|13  |gender|female|{impression -> 1}                           |
+-----------+-----