In [1]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg,
    col,
    expr,
    month,
    sum,
    to_date,
    year,
)
from pyspark.sql.functions import round as spark_round
from pyspark.sql.window import Window

# Hola ssh -i ~/.ssh/airflow-key-no-pass ec2-user@54.161.47.74


In [2]:

# 1. Crear SparkSession
spark = SparkSession.builder.appName("campaign_consumption").getOrCreate()
spark

In [3]:
!ls

inputs	outputs  ruta  work


In [4]:


# 2. Leer CSV
df = spark.read.option("header", True).option("inferSchema", True).csv("./inputs/meta/")
df.printSchema()


root
 |-- campaign_id: integer (nullable = true)
 |-- ad_group_id: integer (nullable = true)
 |-- ad_group_name: string (nullable = true)
 |-- ad_id: integer (nullable = true)
 |-- ad_name: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- marca: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- audience: string (nullable = true)
 |-- ad_type: string (nullable = true)
 |-- impressions: integer (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- spend: double (nullable = true)
 |-- interactions: integer (nullable = true)
 |-- conversions: integer (nullable = true)
 |-- quartile_25: integer (nullable = true)
 |-- quartile_50: integer (nullable = true)
 |-- quartile_75: integer (nullable = true)
 |-- completed: integer (nullable = true)



In [5]:

print(df.show(5))
# 3. Parsear fecha y extraer dimensiones
df = df.withColumn("date_parsed", to_date(col("date")))\
       .withColumn("year", year("date_parsed"))\
       .withColumn("month", month("date_parsed"))

# 4. Filtrar registros inválidos (opcional pero recomendado)
df = df.filter((col("impressions") > 0) & (col("spend") >= 0) & (col("clicks") >= 0))
df.show(5)

+-----------+-----------+-------------+-----+-------+---------+-------------------+-------+---------+--------+--------+-----------+------+------+------------+-----------+-----------+-----------+-----------+---------+
|campaign_id|ad_group_id|ad_group_name|ad_id|ad_name| platform|               date|  marca|     tipo|audience| ad_type|impressions|clicks| spend|interactions|conversions|quartile_25|quartile_50|quartile_75|completed|
+-----------+-----------+-------------+-----+-------+---------+-------------------+-------+---------+--------+--------+-----------+------+------+------------+-----------+-----------+-----------+-----------+---------+
|       1005|       2002|      Group_2| 3001|   Ad_1| Facebook|2022-01-01 00:00:00|Brand B|Awareness|   Youth|   Video|       4638|   487|169.52|          87|          1|       2133|       2088|       1401|      919|
|       1002|       2010|     Group_10| 3002|   Ad_2| Facebook|2022-01-01 00:00:00|Brand B|   Launch|     Men|   Image|       3218| 

In [6]:

# 5. Agrupar y sumarizar
grouped_df = df.groupBy(
    "campaign_id", "ad_group_name", "ad_name", "platform",
    "marca", "tipo", "audience", "ad_type", "year", "month"
).agg(
        sum("impressions").alias("impressions"),
        sum("clicks").alias("clicks"),
        sum("spend").alias("spend"),
        sum("conversions").alias("conversions"),
        sum("completed").alias("completed")
    )
grouped_df.show(5)

+-----------+-------------+-------+----------------+-------+---------+--------+--------+----+-----+-----------+------+------------------+-----------+---------+
|campaign_id|ad_group_name|ad_name|        platform|  marca|     tipo|audience| ad_type|year|month|impressions|clicks|             spend|conversions|completed|
+-----------+-------------+-------+----------------+-------+---------+--------+--------+----+-----+-----------+------+------------------+-----------+---------+
|       1005|      Group_2|  Ad_36|Audience Network|Brand B|Awareness|   Youth|   Video|2022|    2|      76960|  8790|            2567.7|       2491|    12120|
|       1004|      Group_8|  Ad_21|       Instagram|Brand A|Promotion|   Youth|   Image|2022|    9|      71710|  4207|1081.7200000000003|       2001|     8707|
|       1003|      Group_7|  Ad_50|       Instagram|Brand A|   Launch|   Women|   Video|2022|   10|      91763|  8024|           2607.39|       4148|    12400|
|       1003|      Group_6|  Ad_49|     

In [7]:

# 6. Calcular métricas derivadas
derived_df = grouped_df.withColumn("CTR", col("clicks") / col("impressions"))\
                       .withColumn("CPC", col("spend") / col("clicks"))\
                       .withColumn("CPA", col("spend") / col("conversions"))\
                       .withColumn("ROI", col("conversions") / col("spend"))\
                       .withColumn("CPM", (col("spend") / col("impressions")) * 1000)\
                       .withColumn("completion_rate", col("completed") / col("impressions"))
derived_df.show(5)


+-----------+-------------+-------+----------------+-------+---------+--------+--------+----+-----+-----------+------+------------------+-----------+---------+--------------------+-------------------+-------------------+------------------+------------------+-------------------+
|campaign_id|ad_group_name|ad_name|        platform|  marca|     tipo|audience| ad_type|year|month|impressions|clicks|             spend|conversions|completed|                 CTR|                CPC|                CPA|               ROI|               CPM|    completion_rate|
+-----------+-------------+-------+----------------+-------+---------+--------+--------+----+-----+-----------+------+------------------+-----------+---------+--------------------+-------------------+-------------------+------------------+------------------+-------------------+
|       1005|      Group_2|  Ad_36|Audience Network|Brand B|Awareness|   Youth|   Video|2022|    2|      76960|  8790|            2567.7|       2491|    12120| 0.1

In [8]:

# 7. Definir ventana de 3 meses móviles
window_spec = Window.partitionBy("platform", "marca", "tipo", "audience", "ad_type")\
                    .orderBy("year", "month")\
                    .rowsBetween(-2, 0)

# 8. Métricas móviles
final_df = derived_df.withColumn("CTR_3m", avg("CTR").over(window_spec))\
                     .withColumn("CPA_3m", avg("CPA").over(window_spec))\
                     .withColumn("ROI_3m", avg("ROI").over(window_spec))\
                     .withColumn("spend_3m", sum("spend").over(window_spec))\
                     .withColumn("conversions_3m", sum("conversions").over(window_spec))
final_df = final_df.withColumn("CTR_3m", spark_round(col("CTR_3m"), 4))\
                     .withColumn("CPA_3m", spark_round(col("CPA_3m"), 4))\
                     .withColumn("ROI_3m", spark_round(col("ROI_3m"), 4))\
                     .withColumn("spend_3m", spark_round(col("spend_3m"), 2))\
                     .withColumn("conversions_3m", spark_round(col("conversions_3m"), 2))
final_df = final_df.withColumn("last_updated_date", expr("date_sub(current_date(), 1)"))
final_df.show(5)

+-----------+-------------+-------+----------------+-------+------+--------+--------+----+-----+-----------+------+------------------+-----------+---------+-------------------+-------------------+------------------+------------------+------------------+-------------------+------+------+------+--------+--------------+-----------------+
|campaign_id|ad_group_name|ad_name|        platform|  marca|  tipo|audience| ad_type|year|month|impressions|clicks|             spend|conversions|completed|                CTR|                CPC|               CPA|               ROI|               CPM|    completion_rate|CTR_3m|CPA_3m|ROI_3m|spend_3m|conversions_3m|last_updated_date|
+-----------+-------------+-------+----------------+-------+------+--------+--------+----+-----+-----------+------+------------------+-----------+---------+-------------------+-------------------+------------------+------------------+------------------+-------------------+------+------+------+--------+--------------+--------

In [9]:
final_df.printSchema()

root
 |-- campaign_id: integer (nullable = true)
 |-- ad_group_name: string (nullable = true)
 |-- ad_name: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- audience: string (nullable = true)
 |-- ad_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- impressions: long (nullable = true)
 |-- clicks: long (nullable = true)
 |-- spend: double (nullable = true)
 |-- conversions: long (nullable = true)
 |-- completed: long (nullable = true)
 |-- CTR: double (nullable = true)
 |-- CPC: double (nullable = true)
 |-- CPA: double (nullable = true)
 |-- ROI: double (nullable = true)
 |-- CPM: double (nullable = true)
 |-- completion_rate: double (nullable = true)
 |-- CTR_3m: double (nullable = true)
 |-- CPA_3m: double (nullable = true)
 |-- ROI_3m: double (nullable = true)
 |-- spend_3m: double (nullable = true)
 |-- conversions_3m: long (nul

In [10]:

# 9. Exportar a Parquet particionado
final_df.write.mode("overwrite")\
    .partitionBy("platform", "year", "month")\
    .parquet("outputs/consumption_meta/parquet")

In [11]:
!ls

inputs	outputs  ruta  work
