In [13]:
# -------------------------
# 1. Crear SparkSession
# -------------------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, hour, count, collect_list, map_from_entries, struct

spark = SparkSession.builder \
    .appName("MarketingCampaignAnalysis") \
    .getOrCreate()

# -------------------------
# 2. Cargar datasets desde HDFS
# -------------------------
ad_campaigns = spark.read.json("hdfs:///user/spark/ass1/ad_campaigns_data.json")
user_profiles = spark.read.json("hdfs:///user/spark/ass1/user_profile_data.json")
stores = spark.read.json("hdfs:///user/spark/ass1/store_data.json")

# -------------------------
# 3. Preprocesamiento común
# -------------------------
# Extraer fecha y hora desde event_time
ad_campaigns = ad_campaigns.withColumn("date_", to_date(col("event_time"))) \
                           .withColumn("hour", hour(col("event_time")))


In [14]:
def query_q1(ad_campaigns):
    # Contar eventos agrupados
    grouped = ad_campaigns.groupBy("campaign_id", "date_", "hour", "os_type", "event_type") \
                          .count()

    # Reestructurar el resultado (pivot para cada tipo de evento)
    pivoted = grouped.groupBy("campaign_id", "date_", "hour", "os_type") \
                     .pivot("event_type") \
                     .sum("count") \
                     .fillna(0)

    return pivoted

q1_result = query_q1(ad_campaigns)
q1_result.show(truncate=False)


+-----------+----------+----+-------+-----+----------+--------+
|campaign_id|date_     |hour|os_type|click|impression|video_ad|
+-----------+----------+----+-------+-----+----------+--------+
|ABCDFAE    |2018-10-12|13  |android|1    |1         |1       |
|ABCDFAE    |2018-10-12|13  |ios    |0    |1         |0       |
+-----------+----------+----+-------+-----+----------+--------+



In [15]:
def query_q2(ad_campaigns, stores):
    # Expandir place_ids en store_data
    stores_exploded = stores.withColumn("place_id", explode(col("place_ids"))) \
                            .drop("place_ids")

    # Join entre campañas y stores
    joined = ad_campaigns.join(stores_exploded, "place_id", "inner")

    grouped = joined.groupBy("campaign_id", "date_", "hour", "store_name", "event_type") \
                    .count()

    pivoted = grouped.groupBy("campaign_id", "date_", "hour", "store_name") \
                     .pivot("event_type") \
                     .sum("count") \
                     .fillna(0)

    return pivoted

q2_result = query_q2(ad_campaigns, stores)
q2_result.show(truncate=False)


+-----------+----------+----+-------------+-----+----------+--------+
|campaign_id|date_     |hour|store_name   |click|impression|video_ad|
+-----------+----------+----+-------------+-----+----------+--------+
|ABCDFAE    |2018-10-12|13  |McDonald     |1    |2         |0       |
|ABCDFAE    |2018-10-12|13  |shoppers stop|0    |0         |1       |
|ABCDFAE    |2018-10-12|13  |BurgerKing   |1    |1         |0       |
+-----------+----------+----+-------------+-----+----------+--------+



In [16]:
def query_q3(ad_campaigns, user_profiles):
    # Join entre campañas y perfiles de usuarios
    joined = ad_campaigns.join(user_profiles, "user_id", "inner")

    grouped = joined.groupBy("campaign_id", "date_", "hour", "gender", "event_type") \
                    .count()

    pivoted = grouped.groupBy("campaign_id", "date_", "hour", "gender") \
                     .pivot("event_type") \
                     .sum("count") \
                     .fillna(0)

    return pivoted

q3_result = query_q3(ad_campaigns, user_profiles)
q3_result.show(truncate=False)


+-----------+----------+----+------+-----+----------+--------+
|campaign_id|date_     |hour|gender|click|impression|video_ad|
+-----------+----------+----+------+-----+----------+--------+
|ABCDFAE    |2018-10-12|13  |male  |1    |1         |1       |
|ABCDFAE    |2018-10-12|13  |female|0    |1         |0       |
+-----------+----------+----+------+-----+----------+--------+



In [17]:
q1_result.write.mode("overwrite").json("hdfs:///user/spark/ass1/q1")
q2_result.write.mode("overwrite").json("hdfs:///user/spark/ass1/q2")
q3_result.write.mode("overwrite").json("hdfs:///user/spark/ass1/q3")