In [2]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

In [3]:
spark = SparkSession.builder \
    .appName("k1") \
    .config("spark.some.config.option", "some-value") \
    .enableHiveSupport() \
    .getOrCreate()

23/09/23 07:52:12 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Q1. Analyse data for each campaign_id, date, hour, os_type & value to get all the events with counts¶

In [4]:
df1=spark.read.option("multiline","true").json('/tmp/input_data/ad_campaigns_data.json')

df1.printSchema()
df1.show(truncate=False)

                                                                                

root
 |-- campaign_country: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- os_type: string (nullable = true)
 |-- place_id: string (nullable = true)
 |-- user_id: string (nullable = true)



                                                                                

+----------------+-----------+-----------------------------+-----------+------------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|campaign_name                |device_type|event_time              |event_type|os_type|place_id |user_id            |
+----------------+-----------+-----------------------------+-----------+------------------------+----------+-------+---------+-------------------+
|USA             |ABCDFAE    |Food category target campaign|apple      |2018-10-12T13:10:05.000Z|impression|ios    |CASSBB-11|1264374214654454321|
|USA             |ABCDFAE    |Food category target campaign|MOTOROLA   |2018-10-12T13:09:04.000Z|impression|android|CADGBD-13|1674374214654454321|
|USA             |ABCDFAE    |Food category target campaign|SAMSUNG    |2018-10-12T13:10:10.000Z|video ad  |android|BADGBA-12|5747421465445443   |
|USA             |ABCDFAE    |Food category target campaign|SAMSUNG    |2018-10-12T13:10:12.000Z|click     |android|CA

In [5]:
df2=df1.withColumn("date",date_format("event_time",'yyyy-mm-dd')).withColumn("hour",hour("event_time")) \
                   .withColumn("type",lit("os_type")).withColumn("value",col("os_type"))


In [6]:
df2.show()

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+----------+----+-------+-------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|      date|hour|   type|  value|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+----------+----+-------+-------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|2018-10-12|  13|os_type|    ios|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12T13:09:...|impression|android|CADGBD-13|1674374214654454321|2018-09-12|  13|os_type|android|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|  video ad|android|BADGBA-12|   5747421465445443|2018-10-12|  13|os_type|android|
|   

In [7]:
df3=df2.select("campaign_id","date","hour","value","type","event_type")
df3.show(truncate=False)

+-----------+----------+----+-------+-------+----------+
|campaign_id|date      |hour|value  |type   |event_type|
+-----------+----------+----+-------+-------+----------+
|ABCDFAE    |2018-10-12|13  |ios    |os_type|impression|
|ABCDFAE    |2018-09-12|13  |android|os_type|impression|
|ABCDFAE    |2018-10-12|13  |android|os_type|video ad  |
|ABCDFAE    |2018-10-12|13  |android|os_type|click     |
+-----------+----------+----+-------+-------+----------+



In [8]:
df4=df3.select("campaign_id","date","hour","value","type","event_type").groupBy("campaign_id","date","hour","value","type","event_type").count()
df4.show()

[Stage 4:>                                                          (0 + 1) / 1]

+-----------+----------+----+-------+-------+----------+-----+
|campaign_id|      date|hour|  value|   type|event_type|count|
+-----------+----------+----+-------+-------+----------+-----+
|    ABCDFAE|2018-10-12|  13|android|os_type|  video ad|    1|
|    ABCDFAE|2018-10-12|  13|android|os_type|     click|    1|
|    ABCDFAE|2018-09-12|  13|android|os_type|impression|    1|
|    ABCDFAE|2018-10-12|  13|    ios|os_type|impression|    1|
+-----------+----------+----+-------+-------+----------+-----+



                                                                                

# The agg component has to contain actual aggregation function. One way to approach this is to combine
# collect_list : Aggregate function: returns a list of objects with duplicates.
# struct : Creates a new struct column.
# map_from_entries : Collection function: Returns a map created from the given array of entries.

In [9]:
from pyspark.sql import functions as f
from pyspark.sql.functions import col, asc,desc

In [10]:
df5=df4.groupBy("campaign_id","date","hour","type","value") \
        .agg(f.map_from_entries(f.collect_list(f.struct("event_type", "count"))).alias("event"))
#df6=df5.orderBy(col("event").desc())
df5.show(truncate=False)

+-----------+----------+----+-------+-------+---------------------------+
|campaign_id|date      |hour|type   |value  |event                      |
+-----------+----------+----+-------+-------+---------------------------+
|ABCDFAE    |2018-10-12|13  |os_type|android|{video ad -> 1, click -> 1}|
|ABCDFAE    |2018-09-12|13  |os_type|android|{impression -> 1}          |
|ABCDFAE    |2018-10-12|13  |os_type|ios    |{impression -> 1}          |
+-----------+----------+----+-------+-------+---------------------------+



In [11]:
df5.coalesce(1).write.json("/tmp/output_data1/campaign/")

                                                                                

In [12]:
store_df=spark.read.option("multiline","True").json("/tmp/input_data/store_data.json")
store_df.show(truncate=False)

+---------------------------------+-------------+
|place_ids                        |store_name   |
+---------------------------------+-------------+
|[CASSBB-11, CADGBD-13, FDBEGD-14]|McDonald     |
|[CASSBB-11]                      |BurgerKing   |
|[BADGBA-13, CASSBB-15, FDBEGD-15]|Macys        |
|[BADGBA-12]                      |shoppers stop|
+---------------------------------+-------------+



In [13]:
df1.show()

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12T13:09:...|impression|android|CADGBD-13|1674374214654454321|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|  video ad|android|BADGBA-12|   5747421465445443|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|     click|android|CASSBB-11|1864374214654454132|
+----------------+-----------+--------------------+-----------

In [14]:
stores=df1.join(store_df, array_contains(store_df.place_ids,df1.place_id),'left')
stores.show(truncate=False)

+----------------+-----------+-----------------------------+-----------+------------------------+----------+-------+---------+-------------------+---------------------------------+-------------+
|campaign_country|campaign_id|campaign_name                |device_type|event_time              |event_type|os_type|place_id |user_id            |place_ids                        |store_name   |
+----------------+-----------+-----------------------------+-----------+------------------------+----------+-------+---------+-------------------+---------------------------------+-------------+
|USA             |ABCDFAE    |Food category target campaign|apple      |2018-10-12T13:10:05.000Z|impression|ios    |CASSBB-11|1264374214654454321|[CASSBB-11, CADGBD-13, FDBEGD-14]|McDonald     |
|USA             |ABCDFAE    |Food category target campaign|apple      |2018-10-12T13:10:05.000Z|impression|ios    |CASSBB-11|1264374214654454321|[CASSBB-11]                      |BurgerKing   |
|USA             |ABCDFAE

In [15]:
stores.count()

6

In [16]:
s_df=stores.withColumn("date",date_format("event_time",'yyyy-mm-dd')).withColumn("hour",hour("event_time")) \
                   .withColumn("type",lit("store_name")).withColumn("value",col("store_name")) \
                   .select("campaign_id","date","hour","type","value","event_type") \
                   .groupBy("campaign_id","date","hour","type","value","event_type").count()
s_df.show()

+-----------+----------+----+----------+-------------+----------+-----+
|campaign_id|      date|hour|      type|        value|event_type|count|
+-----------+----------+----+----------+-------------+----------+-----+
|    ABCDFAE|2018-10-12|  13|store_name|   BurgerKing|     click|    1|
|    ABCDFAE|2018-09-12|  13|store_name|     McDonald|impression|    1|
|    ABCDFAE|2018-10-12|  13|store_name|shoppers stop|  video ad|    1|
|    ABCDFAE|2018-10-12|  13|store_name|     McDonald|     click|    1|
|    ABCDFAE|2018-10-12|  13|store_name|     McDonald|impression|    1|
|    ABCDFAE|2018-10-12|  13|store_name|   BurgerKing|impression|    1|
+-----------+----------+----+----------+-------------+----------+-----+



In [17]:
s_df2=s_df.groupBy("campaign_id","date","hour","type","value") \
        .agg(f.map_from_entries(f.collect_list(f.struct("event_type", "count"))).alias("event"))

s_df2.show(truncate=False)

+-----------+----------+----+----------+-------------+-----------------------------+
|campaign_id|date      |hour|type      |value        |event                        |
+-----------+----------+----+----------+-------------+-----------------------------+
|ABCDFAE    |2018-10-12|13  |store_name|BurgerKing   |{click -> 1, impression -> 1}|
|ABCDFAE    |2018-09-12|13  |store_name|McDonald     |{impression -> 1}            |
|ABCDFAE    |2018-10-12|13  |store_name|McDonald     |{click -> 1, impression -> 1}|
|ABCDFAE    |2018-10-12|13  |store_name|shoppers stop|{video ad -> 1}              |
+-----------+----------+----+----------+-------------+-----------------------------+



In [18]:
s_df2.coalesce(1).write.format('json').save('/tmp/output_data1/store')

In [26]:
user_profile_df=spark.read.format("json")\
                .option("multiline", "true")\
                .load("/tmp/input_data/user_profile_data.json")
user_profile_df.show()

+---------+--------------------+-------+------+-------------------+
|age_group|            category|country|gender|            user_id|
+---------+--------------------+-------+------+-------------------+
|    18-25|  [shopper, student]|    USA|  male|1264374214654454321|
|    25-50|            [parent]|    USA|female|1674374214654454321|
|    25-50|[shopper, parent,...|    USA|  male|   5747421465445443|
|      50+|      [professional]|    USA|  male|1864374214654454132|
|    18-25|  [shopper, student]|    USA|female|  14537421465445443|
|      50+|[shopper, profess...|    USA|female|  25547421465445443|
+---------+--------------------+-------+------+-------------------+



In [27]:
df1.show()

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12T13:09:...|impression|android|CADGBD-13|1674374214654454321|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|  video ad|android|BADGBA-12|   5747421465445443|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|     click|android|CASSBB-11|1864374214654454132|
+----------------+-----------+--------------------+-----------

In [30]:
users=df1.join(user_profile_df, df1.user_id == user_profile_df.user_id, "left")
users.show(truncate=False)

+----------------+-----------+-----------------------------+-----------+------------------------+----------+-------+---------+-------------------+---------+-------------------------------+-------+------+-------------------+
|campaign_country|campaign_id|campaign_name                |device_type|event_time              |event_type|os_type|place_id |user_id            |age_group|category                       |country|gender|user_id            |
+----------------+-----------+-----------------------------+-----------+------------------------+----------+-------+---------+-------------------+---------+-------------------------------+-------+------+-------------------+
|USA             |ABCDFAE    |Food category target campaign|apple      |2018-10-12T13:10:05.000Z|impression|ios    |CASSBB-11|1264374214654454321|18-25    |[shopper, student]             |USA    |male  |1264374214654454321|
|USA             |ABCDFAE    |Food category target campaign|MOTOROLA   |2018-10-12T13:09:04.000Z|impress

In [31]:
u_df=users.withColumn("date",date_format("event_time",'yyyy-mm-dd')).withColumn("hour",hour("event_time")) \
                   .withColumn("type",lit("gender")).withColumn("value",col("gender")) \
                   .select("campaign_id","date","hour","type","value","event_type") \
                   .groupBy("campaign_id","date","hour","type","value","event_type").count()
u_df.show()

+-----------+----------+----+------+------+----------+-----+
|campaign_id|      date|hour|  type| value|event_type|count|
+-----------+----------+----+------+------+----------+-----+
|    ABCDFAE|2018-10-12|  13|gender|  male|  video ad|    1|
|    ABCDFAE|2018-09-12|  13|gender|female|impression|    1|
|    ABCDFAE|2018-10-12|  13|gender|  male|     click|    1|
|    ABCDFAE|2018-10-12|  13|gender|  male|impression|    1|
+-----------+----------+----+------+------+----------+-----+



In [32]:
s_df_2=u_df.groupBy("campaign_id","date","hour","type","value") \
        .agg(f.map_from_entries(f.collect_list(f.struct("event_type", "count"))).alias("event"))

s_df_2.show(truncate=False)

+-----------+----------+----+------+------+--------------------------------------------+
|campaign_id|date      |hour|type  |value |event                                       |
+-----------+----------+----+------+------+--------------------------------------------+
|ABCDFAE    |2018-09-12|13  |gender|female|{impression -> 1}                           |
|ABCDFAE    |2018-10-12|13  |gender|male  |{video ad -> 1, click -> 1, impression -> 1}|
+-----------+----------+----+------+------+--------------------------------------------+



In [33]:
s_df_2.coalesce(1).write.format('json').save('/tmp/output_data/users')

In [35]:
s_df_2.coalesce(1).write.format('json').save('/home/kushwahahacker8356/users')