In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark =SparkSession.builder \
       .appName("marketing_data_analysis_using_spark") \
       .enableHiveSupport() \
       .getOrCreate()



23/11/07 09:21:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [6]:
#campain data read from hdfs

campains_df = spark.read.format("json")\
            .option("multiline","true")\
            .load("/tmp/marketing_input/ad_campaigns_data.json")


#print schema
campains_df.printSchema()

# query
campains_df.show(5)

root
 |-- campaign_country: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- os_type: string (nullable = true)
 |-- place_id: string (nullable = true)
 |-- user_id: string (nullable = true)

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|
+----------------+-----------+--------------------+-----------+--------------

In [7]:
#store data read from hdfs

store_df = spark.read.format("json")\
            .option("multiline","true")\
            .load("/tmp/marketing_input/store_data.json")


#print schema
store_df.printSchema()

# query
store_df.show(5)

root
 |-- place_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- store_name: string (nullable = true)

+--------------------+----------+
|           place_ids|store_name|
+--------------------+----------+
|[CASSBB-11, CADGB...|  McDonald|
+--------------------+----------+



In [8]:
#user profile data read from hdfs

user_df = spark.read.format("json")\
            .option("multiline","true")\
            .load("/tmp/marketing_input/user_profile_data.json")


#print schema
user_df.printSchema()

# query
user_df.show(5)



root
 |-- age_group: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- country: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- user_id: string (nullable = true)



[Stage 9:>                                                          (0 + 1) / 1]

+---------+------------------+-------+------+-------------------+
|age_group|          category|country|gender|            user_id|
+---------+------------------+-------+------+-------------------+
|    18-25|[shopper, student]|    USA|  male|1264374214654454321|
+---------+------------------+-------+------+-------------------+



                                                                                

In [21]:
campains_df.select("event_time").show()
#it show half because of auto trucate is enable so need to false
campains_df.select("event_time").show(5, truncate=False)


+--------------------+
|          event_time|
+--------------------+
|2018-10-12T13:10:...|
+--------------------+

+------------------------+
|event_time              |
+------------------------+
|2018-10-12T13:10:05.000Z|
+------------------------+



In [None]:
# analyse data for each campaign_id, date, hour, os_type & value to get all the events with counts

In [14]:
# Perform the data transformations and aggregations
ad_campaigns = campains_df.groupBy("campaign_id", substring(col("event_time"), 0, 10).alias("date"),
                                  substring(col("event_time"), 12, 2).alias("hour"),
                                  col("os_type"),
                                  col("event_type")).agg(count("event_type").alias("events")) \
                        .selectExpr("campaign_id", "date", "hour", "'os_type' as type", "os_type as value",
                                    "struct(event_type, events) as event") \
                        .groupBy("campaign_id", "date", "hour", "type", "value") \
                        .agg(collect_list("event").alias("events")) \
                        .selectExpr("campaign_id", "date", "hour", "type", "value",
                                    "map_from_entries(events) as event")

                    
ad_campaigns.show()


ad_campaigns.coalesce(1).write.format('json').save('/tmp/output_data/ad_campaigns/')
print("Write Successfull")

+-----------+----------+----+-------+-----+-----------------+
|campaign_id|      date|hour|   type|value|            event|
+-----------+----------+----+-------+-----+-----------------+
|    ABCDFAE|2018-10-12|  13|os_type|  ios|{impression -> 1}|
+-----------+----------+----+-------+-----+-----------------+



In [None]:
# Q2.Analyse data for each campaign_id, date, hour, store_name & value to get all the
# events with counts



In [None]:
# Q2.Analyse data for each campaign_id, date, hour, store_name & value to get all the
# events with counts


stores=campains_df.join(store_df


In [25]:
stores=campains_df.join(store_df, array_contains(store_df.place_ids, campains_df.place_id),"left")\
                    .groupBy("campaign_id",
                             substring("event_time", 0, 10).alias('date'),
                             substring("event_time", 12, 2).alias('hour'),
                             "store_name",
                             "event_type"
                             ).agg(count("event_type").alias('events'))\
                    .selectExpr("campaign_id",
                            "date",
                            "hour",
                            "'store_name' as type",
                            "store_name as value",
                            "struct(event_type, events) as event_dict")\
                    .groupBy("campaign_id",
                            "date",
                            "hour",
                            "type",
                            "value"
                            ).agg(collect_list("event_dict").alias('event'))\
                    .select("campaign_id",
                            "date",
                            "hour",
                            "type",
                            "value",
                            map_from_entries("event").alias('event'))
stores.show()

stores.coalesce(1).write.format('json').save('/tmp/output_data/stores/')
print("Write successful")

+-----------+----------+----+----------+--------+-----------------+
|campaign_id|      date|hour|      type|   value|            event|
+-----------+----------+----+----------+--------+-----------------+
|    ABCDFAE|2018-10-12|  13|store_name|McDonald|{impression -> 1}|
+-----------+----------+----+----------+--------+-----------------+



                                                                                

Write successful


In [None]:
campains_df.show(5)
store_df.show(5)
user_df.show(5)

In [32]:
# Q3.Analyse data for each campaign_id, date, hour, gender_type & value to get all the
# events with counts

user_profile=campains_df.join(user_df, campains_df.user_id == user_df.user_id, "left")\
                            .select("campaign_id",
                                    substring("event_time", 0, 10).alias("date"),
                                    substring("event_time", 12, 2).alias("hour"),
                                    lit('gender').alias("type"),
                                    col("gender").alias("value"),
                                    "event_type")\
                            .groupBy("campaign_id", "date", "hour", "type", "value", "event_type")\
                            .agg(count("event_type").alias("event_count"))\
                            .select("campaign_id", "date", "hour", "type", "value", struct("event_type", "event_count").alias("events_map"))\
                            .groupBy("campaign_id", "date", "hour", "type", "value")\
                            .agg(collect_list("events_map").alias("map_list"))\
                            .select("campaign_id", "date", "hour", "type", "value", map_from_entries("map_list").alias("event"))
                                       
user_profile.show()                               

+-----------+----------+----+------+-----+-----------------+
|campaign_id|      date|hour|  type|value|            event|
+-----------+----------+----+------+-----+-----------------+
|    ABCDFAE|2018-10-12|  13|gender| male|{impression -> 1}|
+-----------+----------+----+------+-----+-----------------+



In [33]:
user_profile.coalesce(1).write.format('json').save('/tmp/output_data/user_profile')
print("Write successfull")

Write successfull
