In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark=SparkSession.builder\
      .appName("hive-practice-396920")\
      .enableHiveSupport()\
      .getOrCreate()

hdfs_path1='/tmp/test1/ad_campaigns_data.json'
hdfs_path2='/tmp/test1/user_profile_data.json'
hdfs_path3='/tmp/test1/stores_data.json'
adf=spark.read.format('json').option('multiline','true').load(hdfs_path1)
udf=spark.read.format('json').option('multiline','true').load(hdfs_path2)
sdf=spark.read.format('json').option('multiline','true').load(hdfs_path3)

23/10/28 23:02:02 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [2]:
adf.show()

ad_n=adf.groupBy("campaign_id",
                        substring(col("event_time"), 0, 10).alias("date"),
                        substring(col("event_time"),12, 2).alias("hour"),
                        col("os_type"),
                        col("event_type")
                       ).agg(count("event_type").alias("events"))\
                        .selectExpr(
                          "campaign_id",
                          "date",
                          "hour",
                          "'os_type' as type",
                          "os_type as value",
                          "struct(event_type, events) as event"
                          ) \
                          .groupBy("campaign_id", "date", "hour", "type", "value") \
                          .agg(collect_list("event").alias("events")) \
                          .selectExpr(
                              "campaign_id",
                              "date",
                              "hour",
                              "type",
                              "value",
                              "map_from_entries(events) as event"
                          )

ad_n.show()

                                                                                

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12T13:09:...|impression|android|CADGBD-13|1674374214654454321|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|  video ad|android|BADGBA-12|   5747421465445443|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|     click|android|CASSBB-11|1864374214654454132|
+----------------+-----------+--------------------+-----------

[Stage 6:>                                                          (0 + 1) / 1]

+-----------+----------+----+-------+-------+--------------------+
|campaign_id|      date|hour|   type|  value|               event|
+-----------+----------+----+-------+-------+--------------------+
|    ABCDFAE|2018-10-12|  13|os_type|android|{click -> 1, impr...|
|    ABCDFAE|2018-10-12|  13|os_type|    ios|   {impression -> 1}|
+-----------+----------+----+-------+-------+--------------------+



                                                                                

In [3]:
asjoin=adf.join(sdf,array_contains(sdf.place_ids,adf.place_id),"left")\
          .groupBy('campaign_id',
                   substring('event_time',0,10).alias('date'),
                   substring('event_time',12,2).alias('hour'),
                   'store_name',
                   'event_type'
                  ).agg(count('event_type').alias('events'))\
          .selectExpr('campaign_id',
                     'date',
                     'hour',
                     '"store name" as type',
                     'store_name as value',
                     'struct(event_type,events) as event_dict')\
          .groupBy('campaign_id',
                   'date',
                   'hour',
                   'type',
                   'value'
                   ).agg(collect_list('event_dict').alias('event'))\
          .select('campaign_id',
                      'date',
                      'hour',
                      'type',
                      'value',
                      map_from_entries('event').alias('event'))
asjoin.show()

+-----------+----------+----+----------+-------------+--------------------+
|campaign_id|      date|hour|      type|        value|               event|
+-----------+----------+----+----------+-------------+--------------------+
|    ABCDFAE|2018-10-12|  13|store name|     McDonald|{click -> 1, impr...|
|    ABCDFAE|2018-10-12|  13|store name|   BurgerKing|{impression -> 1,...|
|    ABCDFAE|2018-10-12|  13|store name|shoppers stop|     {video ad -> 1}|
+-----------+----------+----+----------+-------------+--------------------+



In [4]:
aujoin=adf.join(udf, adf.user_id == udf.user_id, "left")\
                            .select("campaign_id",
                                    substring("event_time", 0, 10).alias("date"),
                                    substring("event_time", 12, 2).alias("hour"),
                                    lit('gender').alias("type"),
                                    col("gender").alias("value"),
                                    "event_type")\
                            .groupBy("campaign_id", "date", "hour", "type", "value", "event_type")\
                            .agg(count("event_type").alias("event_count"))\
                            .select("campaign_id", "date", "hour", "type", "value", struct("event_type", "event_count").alias("events_map"))\
                            .groupBy("campaign_id", "date", "hour", "type", "value")\
                            .agg(collect_list("events_map").alias("map_list"))\
                            .select("campaign_id", "date", "hour", "type", "value", map_from_entries("map_list").alias("event"))
aujoin.show()

+-----------+----------+----+------+------+--------------------+
|campaign_id|      date|hour|  type| value|               event|
+-----------+----------+----+------+------+--------------------+
|    ABCDFAE|2018-10-12|  13|gender|  male|{impression -> 1,...|
|    ABCDFAE|2018-10-12|  13|gender|female|   {impression -> 1}|
+-----------+----------+----+------+------+--------------------+



In [6]:
spark.sql("""set hive.exec.dynamic.partition.mode=nonstrict""")
spark.sql("""USE a1""")
spark.sql("""
             CREATE TABLE IF NOT EXISTS ad2_n(
             campaign_id STRING,
             hour STRING,
             type STRING,
             value STRING,
             event map<string,bigint>
             )PARTITIONED BY (date STRING)
""")
ad_n.select('campaign_id','hour','type','value','event','date').write.mode('append').insertInto('ad1_n')

23/10/28 21:41:57 WARN SetCommand: 'SET hive.exec.dynamic.partition.mode=nonstrict' might not work, since Spark doesn't support changing the Hive config dynamically. Please pass the Hive-specific config by adding the prefix spark.hadoop (e.g. spark.hadoop.hive.exec.dynamic.partition.mode) when starting a Spark application. For details, see the link: https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties.
23/10/28 21:41:57 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.
23/10/28 21:41:57 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
                                                                                