# 03 – Ingeniería de Variables (Feature Engineering)
Con las ubicaciones asociadas a los pings, crear ingeniería de variables agrupando los datos por ubicación

In [0]:
%sql
select count(1) from sv_12_2023_locations_w_pings

In [0]:
from pyspark.sql import functions as F, Window

1 - DF principal + variables básicas

In [0]:
df = (spark.table("sv_12_2023_locations_w_pings")
      .select("osm_id","clase","nombre","geometry_wkt","datetime","hora","fecha",
              "is_weekend","nombre_dia","device_id","latitude","longitude")
      .withColumn("fecha", F.to_date("fecha"))
      .withColumn("hour", F.col("hora").cast("int"))
      .withColumn("day_of_week", F.dayofweek("fecha").cast("int"))  # 1=domingo..7=sabado
     )

# timeblock: mañana(5-11), tarde(12-17), noche(18-4)
df = (df.withColumn(
        "timeblock",
        F.when((F.col("hour") >= 5) & (F.col("hour") <= 11), F.lit("morning"))
         .when((F.col("hour") >= 12) & (F.col("hour") <= 17), F.lit("afternoon"))
         .otherwise(F.lit("evening"))
     )
)

2 - Agregados diarios y por ubicación

In [0]:
# Visitas únicas por día (footfall diario) + flag fin de semana
daily = (df.groupBy("osm_id","fecha","is_weekend")
           .agg(F.countDistinct("device_id").alias("unique_devices_day"),
                F.count("*").alias("visits_day")))

# Totales base por osm_id
base = (df.groupBy("osm_id")
          .agg(F.count("*").alias("visits_total"),
               F.countDistinct("device_id").alias("unique_devices_count"),
               F.countDistinct("fecha").alias("days_active"))
        )

# Footfall promedio/día, std de visitas por día, promedio de visitas en días activos
by_osm_day = (daily.groupBy("osm_id")
                .agg(
                    F.avg("unique_devices_day").alias("footfall_avg_per_day"),
                    F.stddev_samp("visits_day").alias("std_visits_per_day"),
                    F.avg(F.when(F.col("visits_day")>0, F.col("visits_day"))).alias("avg_visits_per_active_day"),
                    F.sum(F.when(F.col("is_weekend")==1, F.col("unique_devices_day")).otherwise(F.lit(0))).alias("unique_visits_weekend"),
                    F.sum(F.when(F.col("is_weekend")==0, F.col("unique_devices_day")).otherwise(F.lit(0))).alias("unique_visits_weekday")
                )
             )

# Ratio weekend/weekday
wk_ratio = (by_osm_day
            .withColumn("weekend_to_weekday_ratio",
                        F.when(F.col("unique_visits_weekday")>0,
                               F.col("unique_visits_weekend")/F.col("unique_visits_weekday"))
                         .otherwise(F.lit(None)))
           ).select("osm_id","weekend_to_weekday_ratio","unique_visits_weekend","unique_visits_weekday")

3 -  Perfiles por hora y por día (vectores + entropía + hora pico)

In [0]:
# Conteo por hora
by_hour = (df.groupBy("osm_id","hour")
             .agg(F.count("*").alias("cnt_hour")))

# Pivot 0..23 → counts por hora
hour_pivot = (by_hour.groupBy("osm_id")
                   .pivot("hour", list(range(24)))
                   .agg(F.first("cnt_hour"))
             ).fillna(0)

# Normaliza a proporciones por hora
sum_hours = sum([F.col(str(h)) for h in range(24)])
hour_props = hour_pivot.select(
    "osm_id",
    *[(F.col(str(h)) / F.when(sum_hours>0, sum_hours).otherwise(F.lit(1))).alias(f"visits_by_hour_{h}") for h in range(24)]
)

In [0]:
# Entropía por hora: -sum(p*ln(p))
def entropy_expr(cols):
    eps = F.lit(1e-12)
    return -sum([ (F.col(c)+eps) * F.log(F.col(c)+eps) for c in cols ])

hour_cols = [f"visits_by_hour_{h}" for h in range(24)]
entropy_hour = hour_props.select(
    "osm_id",
    entropy_expr(hour_cols).alias("entropy_hourly_pattern")
)

# Hora pico en semana y fin de semana (por visitas, no solo únicas)
# Semana
w_week = Window.partitionBy("osm_id","is_weekend").orderBy(F.col("cnt").desc(), F.col("hour").asc())
peak = (df.groupBy("osm_id","is_weekend","hour").agg(F.count("*").alias("cnt"))
          .withColumn("rn", F.row_number().over(w_week))
          .where(F.col("rn")==1)
          .groupBy("osm_id")
          .pivot("is_weekend",[0,1])
          .agg(F.first("hour").alias("peak_hour"))
          .withColumnRenamed("0","peak_hour_weekday")
          .withColumnRenamed("1","peak_hour_weekend")
          .select("osm_id","peak_hour_weekday","peak_hour_weekend")
       )

# Conteo por día de semana (0..6)
by_dow = (df.groupBy("osm_id","day_of_week").agg(F.count("*").alias("cnt_dow")))
dow_pivot = (by_dow.groupBy("osm_id")
                  .pivot("day_of_week", list(range(7)))
                  .agg(F.first("cnt_dow"))
            ).fillna(0)

sum_dow = sum([F.col(str(d)) for d in range(7)])
dow_props = dow_pivot.select(
    "osm_id",
    *[(F.col(str(d)) / F.when(sum_dow>0, sum_dow).otherwise(F.lit(1))).alias(f"visits_by_day_{d}") for d in range(7)]
)

entropy_day = dow_props.select(
    "osm_id",
    entropy_expr([f"visits_by_day_{d}" for d in range(7)]).alias("entropy_daily_pattern")
)

3 -  Timeblock: visitas únicas por franja horaria + morning/evening ratio

In [0]:
# Únicos por franja → columnas
tb_unique = (df.groupBy("osm_id","timeblock")
               .agg(F.countDistinct("device_id").alias("unique_device_by_timeblock"))
            )

tb_pivot = (tb_unique.groupBy("osm_id")
              .pivot("timeblock", ["morning","afternoon","evening"])
              .agg(F.first("unique_device_by_timeblock"))
           ).fillna(0) \
            .withColumnRenamed("morning","unique_device_by_timeblock_morning") \
            .withColumnRenamed("afternoon","unique_device_by_timeblock_afternoon") \
            .withColumnRenamed("evening","unique_device_by_timeblock_evening")

morning_to_evening = (tb_pivot
                      .withColumn("morning_to_evening_ratio",
                          F.when(F.col("unique_device_by_timeblock_evening")>0,
                                 F.col("unique_device_by_timeblock_morning")/F.col("unique_device_by_timeblock_evening"))
                           .otherwise(F.lit(None)))
                      .select("osm_id","morning_to_evening_ratio",
                              "unique_device_by_timeblock_morning",
                              "unique_device_by_timeblock_afternoon",
                              "unique_device_by_timeblock_evening"))

4 -  Métricas por dispositivo (frecuenca, medianas y ratios)

In [0]:
# Visitas por dispositivo en la ubicación
per_dev = (df.groupBy("osm_id","device_id")
             .agg(F.count("*").alias("visits_per_device"),
                  F.countDistinct("fecha").alias("distinct_days_per_device"))
          )

visits_stats = (per_dev.groupBy("osm_id")
                  .agg(
                      F.avg("visits_per_device").alias("visits_per_device_mean"),
                      F.expr("percentile_approx(visits_per_device, 0.5)").alias("median_visits_per_device")
                  ))

# avg_visits_per_device y repeat_to_unique_ratio
repeat_unique = (base.join(base.select("osm_id", (F.col("visits_total")/F.col("unique_devices_count")).alias("avg_visits_per_device")), "osm_id")
                    .withColumn("repeat_to_unique_ratio",
                                F.when(F.col("unique_devices_count")>0,
                                       (F.col("visits_total") - F.col("unique_devices_count"))/F.col("unique_devices_count"))
                                 .otherwise(F.lit(None)))
                    .select("osm_id","avg_visits_per_device","repeat_to_unique_ratio"))

5 -  Recurrencia y recencia (rate, días para retornar, revisita a 3d, 7d)

In [0]:
# Recurrencia: % de devices con >=2 días distintos
recurrence = (per_dev.groupBy("osm_id")
                .agg(
                    F.sum(F.when(F.col("distinct_days_per_device")>=2, F.lit(1)).otherwise(F.lit(0))).alias("devices_recurrentes"),
                    F.count("*").alias("devices_total_here")  # = unique_devices_count (por seguridad lo recalculamos aquí)
                )
                .withColumn("recurrence_rate",
                            F.when(F.col("devices_total_here")>0,
                                   F.col("devices_recurrentes")/F.col("devices_total_here"))
                             .otherwise(F.lit(None)))
                .select("osm_id","recurrence_rate")
             )

# Días entre visitas consecutivas por device (misma ubicación) → avg_days_to_return
visits_by_day = (df.select("osm_id","device_id","fecha")
                   .distinct()
                   .withColumn("fecha_ts", F.col("fecha").cast("timestamp")))

w_dev = Window.partitionBy("osm_id","device_id").orderBy("fecha_ts")
gaps = (visits_by_day
          .withColumn("prev_fecha", F.lag("fecha_ts").over(w_dev))
          .withColumn("gap_days", F.when(F.col("prev_fecha").isNotNull(),
                                         F.datediff(F.col("fecha_ts"), F.col("prev_fecha")))
                                   .otherwise(F.lit(None)))
       )

avg_days_return = (gaps.groupBy("osm_id")
                    .agg(F.avg("gap_days").alias("avg_days_to_return")))

# Revisit rates 3/7: para cada device, su min gap y banderas
min_gap = (gaps.groupBy("osm_id","device_id")
             .agg(F.min("gap_days").alias("min_gap_days")))

revisit = (min_gap.groupBy("osm_id")
             .agg(
                 F.avg(F.when(F.col("min_gap_days")<=3, 1).otherwise(0)).alias("revisit_rate_3d"),
                 F.avg(F.when(F.col("min_gap_days")<=7, 1).otherwise(0)).alias("revisit_rate_7d")
             ))


6 - Dwell time (minutos) global / semana / fin de semana

In [0]:
# Por device, osm_id, fecha: max - min datetime
per_stay = (df.groupBy("osm_id","device_id","fecha","is_weekend")
              .agg((F.unix_timestamp(F.max("datetime")) - F.unix_timestamp(F.min("datetime"))).alias("dwell_secs"))
              .withColumn("dwell_min", (F.col("dwell_secs")/60.0))
           )

dwell_global = (per_stay.groupBy("osm_id")
                  .agg(F.avg("dwell_min").alias("dwell_time_mean"),
                       F.stddev_samp("dwell_min").alias("dwell_time_std"),
                       F.expr("percentile_approx(dwell_min, 0.5)").alias("dwell_time_median"))
               )

dwell_by_weekflag = (per_stay.groupBy("osm_id","is_weekend")
                      .agg(F.avg("dwell_min").alias("dwell_time_mean_flag"))
                      .groupBy("osm_id")
                      .pivot("is_weekend",[0,1])
                      .agg(F.first("dwell_time_mean_flag"))
                      .withColumnRenamed("0","dwell_time_weekday_mean")
                      .withColumnRenamed("1","dwell_time_weekend_mean")
                      .select("osm_id","dwell_time_weekday_mean","dwell_time_weekend_mean"))

7 - union final y seleccion final de variables

In [0]:
by_osm_columns=["osm_id","footfall_avg_per_day","avg_visits_per_active_day","std_visits_per_day"]

features = (base
    .join(by_osm_day.select(by_osm_columns), "osm_id", "left")
    #.join(by_osm_day,"osm_id", "left")
    .join(wk_ratio, "osm_id", "left")
    .join(hour_props, "osm_id", "left")
    .join(entropy_hour, "osm_id", "left")
    .join(dow_props, "osm_id", "left")
    .join(entropy_day, "osm_id", "left")
    .join(peak, "osm_id", "left")
    .join(morning_to_evening, "osm_id", "left")
    .join(visits_stats, "osm_id", "left")
    .join(repeat_unique, "osm_id", "left")
    .join(recurrence, "osm_id", "left")
    .join(avg_days_return, "osm_id", "left")
    .join(revisit, "osm_id", "left")
    .join(dwell_global, "osm_id", "left")
    .join(dwell_by_weekflag, "osm_id", "left")
)

meta = (df.groupBy("osm_id")
          .agg(F.first("clase").alias("clase"),
               F.first("nombre").alias("nombre"),
               F.first("geometry_wkt").alias("geometry_wkt")))

In [0]:
features = features.join(meta, "osm_id", "left")


In [0]:
column_list = ["osm_id","clase","nombre","geometry_wkt","unique_visits_weekday","footfall_avg_per_day","unique_devices_count","unique_device_by_timeblock_morning","unique_device_by_timeblock_afternoon","unique_device_by_timeblock_evening","recurrence_rate","avg_days_to_return","peak_hour_weekday","peak_hour_weekend","visits_by_hour_0","visits_by_hour_1","visits_by_hour_2","visits_by_hour_3","visits_by_hour_4","visits_by_hour_5","visits_by_hour_6","visits_by_hour_7","visits_by_hour_8","visits_by_hour_9","visits_by_hour_10","visits_by_hour_11","visits_by_hour_12","visits_by_hour_13","visits_by_hour_14","visits_by_hour_15","visits_by_hour_16","visits_by_hour_17","visits_by_hour_18","visits_by_hour_19","visits_by_hour_20","visits_by_hour_21","visits_by_hour_22","visits_by_hour_23","visits_by_day_0","visits_by_day_1","visits_by_day_2","visits_by_day_3","visits_by_day_4","visits_by_day_5","visits_by_day_6","entropy_hourly_pattern","entropy_daily_pattern","visits_total","visits_per_device_mean","median_visits_per_device","unique_visits_weekend","revisit_rate_3d","revisit_rate_7d","dwell_time_mean","dwell_time_std","dwell_time_median","dwell_time_weekday_mean","dwell_time_weekend_mean","days_active","avg_visits_per_active_day","std_visits_per_day","weekend_to_weekday_ratio","morning_to_evening_ratio","repeat_to_unique_ratio","avg_visits_per_device"]

features.select(column_list).display()

Exportando a csv para modelar localmente

In [0]:
df_spark = features.select(column_list)
df_pandas = df_spark.toPandas()

In [0]:
import csv
out_path = f"../datos/filtered_amss/sv_12_2023_location_features_amss.csv"
df_pandas.to_csv(out_path, encoding='utf-8', index=False, quoting=csv.QUOTE_ALL)