For excecution, go to: https://drive.google.com/file/d/10l9ekdvLgXTOqEyskDkyo1lpumFWA9q5/view?usp=sharing

This code contains a more detailed explenation of the ETL creation process

In [1]:
!pip install pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark



# Import Libraries

In [13]:
from pyspark.sql.functions import col, lit, count, abs, first, round, sum, collect_list, floor,struct, sqrt, pow, array, expr,current_date,size
from pyspark.sql.types import StringType, IntegerType, DoubleType
from pyspark.sql import DataFrame


## Loading Data

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!ls -l drive/'My Drive'/prueba_lla/data_sample

total 8
drwx------ 2 root root 4096 Feb 18 17:12 geo
drwx------ 2 root root 4096 Feb 18 17:12 labels


In [5]:
geo = spark.read.parquet("drive/My Drive/prueba_lla/data_sample/geo/*", header=True)
print(f"Number of Rows = {geo.count()}")
geo.show(5)

Number of Rows = 611959
+------------------+-----------------+--------------------+--------------------+
|           latitud|         longitud|              comuna|                  ID|
+------------------+-----------------+--------------------+--------------------+
|  353894.669721904|6301392.600521904|b13b671cb296c1ce5...|ab6f6062e7fac953a...|
| 297273.7487498676|6271440.347349868|d10ad8071d7270bc1...|b12fac130e9008be6...|
|172956.24831402366|5702581.978114024|20c5891e1d78fe2f3...|cf29a4e836a1c4ba4...|
| 135846.1650072791|5920764.953007279|c51ed7a673a2184f2...|46b48d3aa7694ae78...|
|351678.75214929937|6282760.015749299|8e7e23148e55a25a0...|c86eb4ca0aeb0a981...|
+------------------+-----------------+--------------------+--------------------+
only showing top 5 rows



In [6]:
labels = spark.read.parquet("drive/My Drive/prueba_lla/data_sample/labels/*", header=True)
print(f"Number of Rows = {labels.count()}")
labels.show(5)

Number of Rows = 84435
+--------------------+-----+
|                  ID|event|
+--------------------+-----+
|ea165b785d74859a9...|    2|
|558d0ed3e3cf87a3d...|    2|
|2da14424526a7d741...|    2|
|8f619aaa096c4e9da...|    2|
|cabf40eac3538e1a6...|    1|
+--------------------+-----+
only showing top 5 rows



Joining the two dfs

In [7]:
merged_df = geo.join(labels, on="ID", how="left")

cleaned_df = merged_df.groupBy("ID").agg(
    first("comuna").alias("comuna"),
    first("latitud").alias("latitud"),
    first("longitud").alias("longitud"),
    collect_list("event").alias("event")  # Guarda una lista de todos los eventos que tiene un cliente
)

print(f"Number of Rows = {cleaned_df.count()}")
cleaned_df.show(10)

Number of Rows = 601709
+--------------------+--------------------+------------------+------------------+-----+
|                  ID|              comuna|           latitud|          longitud|event|
+--------------------+--------------------+------------------+------------------+-----+
|000002d58f027143d...|7bf0aaf9628a3371b...| 326084.2961682221| 6279531.062668222|   []|
|000057bee0e812792...|a532cdd0fac620d39...| 258275.9814788573| 6341931.224259003|   []|
|00005fe9d2e7a9d12...|5fa3a58d4e35efd78...|342015.01299357676|6298414.1473696055|  [2]|
|000072d007dc1f2cb...|44e33af5f4c88c82d...| 337544.0182890504| 6307956.648998858|   []|
|0000b43cdeba529f3...|c1085325d225ba129...|341918.86591499206| 6283674.388114992|   []|
|0000bbef22650c35d...|41746729733b29ef3...|386112.16969119024| 7757299.506275459|   []|
|0000d7d338169ee5f...|44e33af5f4c88c82d...|339891.48521050595| 6306511.730410506|   []|
|00012403769c505ca...|a1ddd6c69647304a7...|184425.05939029466| 5705525.088733884|   []|
|000141f

Se define un sistema cartesiano:
 * A cada punto se le asigna una región de 50m X 50m
 * Ej: Si un punto se encuentra en (20,30), este está en la grilla (1,1). Si un punto está en (70,30), esté está en la grilla (2,1).



In [8]:
def asignar_grids(df: DataFrame, grid_size: int = 50) -> DataFrame:
    """
    Asigna una cuadrícula (grid_x, grid_y) de tamaño `grid_size` a cada punto en el DataFrame de PySpark.

    :param df: DataFrame de PySpark con las columnas 'longitud' y 'latitud'
    :param grid_size: Tamaño del grid en metros (por defecto 50m)
    :return: DataFrame con la nueva columna 'grid' que contiene las coordenadas del grid (x, y)
    """
    df = df.withColumn("grid_x", floor(col("longitud") / grid_size) + 1)
    df = df.withColumn("grid_y", floor(col("latitud") / grid_size) + 1)
    df = df.withColumn("grid", struct(col("grid_x").cast("int"), col("grid_y").cast("int")))

    return df.drop("grid_x", "grid_y")  # Elimina columnas auxiliares si no son necesarias

In [9]:
grid_df = asignar_grids(cleaned_df)

In [None]:
grid_df.show(10)

+--------------------+--------------------+------------------+------------------+-----+---------------+
|                  ID|              comuna|           latitud|          longitud|event|           grid|
+--------------------+--------------------+------------------+------------------+-----+---------------+
|00005fe9d2e7a9d12...|5fa3a58d4e35efd78...|342015.01299357676|6298414.1473696055|  [2]| {125969, 6841}|
|000271a03b6ea2b4c...|4a8bc878fecae0db7...| 339439.8191546541| 6294442.820354654|  [2]| {125889, 6789}|
|0002df5d589671f89...|192579a17a7c6faf9...|344142.63802056597| 6298069.790666015|  [2]| {125962, 6883}|
|0003fdbfd1b04a1f0...|28c2b23257ff4840a...| 345955.5261761797|  6300707.17787618|  [2]| {126015, 6920}|
|000b1292f1fce506c...|1663f043b1b1201a0...| 352780.2597005477| 6299127.371464686|  [2]| {125983, 7056}|
|0015c43ceb8d6e183...|4a8bc878fecae0db7...|335728.43385461404| 6289345.378554614|  [2]| {125787, 6715}|
|0018b4d36f7a0d47d...|b13b671cb296c1ce5...| 355921.8515706076|  

Encontrar la distancia para los puntos, solo en regiones adyacentes, pues sabemos que todos los puntos más cercanos a 50m están en las regiones adyacentes.



In [10]:


def calculate_distances(grid_df: DataFrame) -> DataFrame:
    """
    Calcula la distancia entre clientes en grid_df a clientes con eventos en grid_df en un radio de 50 metros,
    considerando solo los puntos dentro del mismo grid o en grids adyacentes.
    También elimina filas donde ID_geo == ID_event (mismo cliente).

    :param grid_df: DataFrame con las columnas ['ID', 'comuna', 'latitud', 'longitud', 'event', 'grid']
    :return: DataFrame con las columnas ['ID_geo', 'comuna', 'latitude_geo', 'longitude_geo',
                                         'ID_event', 'latitude_event', 'longitude_event', 'event', 'distance']
    """

    # Extraer las coordenadas del grid como columnas separadas con sus nombres reales
    grid_df = grid_df.withColumn("grid_x", col("grid.col1")) \
                     .withColumn("grid_y", col("grid.col2"))

    # Filtrar clientes que tienen eventos
    events_df = grid_df.filter(col("event").isNotNull() & (col("event") != array()))

    # Hacer un self-join en la misma comuna y solo en grids cercanos
    distance_df = grid_df.alias("g1").join(
        events_df.alias("g2"),
        (col("g1.comuna") == col("g2.comuna")) &  # Solo en la misma comuna
        (
            (col("g1.grid_x") == col("g2.grid_x")) & (abs(col("g1.grid_y") - col("g2.grid_y")) <= 1) |
            (col("g1.grid_y") == col("g2.grid_y")) & (abs(col("g1.grid_x") - col("g2.grid_x")) <= 1) |
            (abs(col("g1.grid_x") - col("g2.grid_x")) == 1) & (abs(col("g1.grid_y") - col("g2.grid_y")) == 1)
        ),
        "inner"
    ).filter(col("g1.ID") != col("g2.ID"))  # Eliminar filas donde ID_geo == ID_event

    # Calcular la distancia Euclidiana
    distance_df = distance_df.withColumn(
        "distance",
        sqrt(
            pow(col("g1.latitud") - col("g2.latitud"), 2) +
            pow(col("g1.longitud") - col("g2.longitud"), 2)
        )
    ).filter(col("distance") <= 50)  # Filtrar solo los clientes en un radio de 50m

    # Seleccionar las columnas finales
    distance_df = distance_df.select(
        col("g1.ID").alias("ID_geo"),
        col("g1.comuna"),
        col("g1.latitud").alias("latitude_geo"),
        col("g1.longitud").alias("longitude_geo"),
        col("g2.ID").alias("ID_event"),
        col("g2.latitud").alias("latitude_event"),
        col("g2.longitud").alias("longitude_event"),
        col("g2.event"),
        col("distance")
    )

    return distance_df.withColumn("date_processed", current_date()) #Agregar fecha para mantener track semanal



In [11]:
dist_df = calculate_distances(grid_df)
print(f"Number of Rows = {dist_df.count()}")


Number of Rows = 373063
+--------------------+--------------------+------------------+------------------+--------------------+------------------+-----------------+-----+------------------+--------------+
|              ID_geo|              comuna|      latitude_geo|     longitude_geo|            ID_event|    latitude_event|  longitude_event|event|          distance|date_processed|
+--------------------+--------------------+------------------+------------------+--------------------+------------------+-----------------+-----+------------------+--------------+
|00944c192e588e812...|f3de540ebefb09aaf...|358859.89315310353| 6307671.723495424|773d9d14c68cc25bc...| 358857.0619159535|6307696.733227695|  [2]|  25.1694777874695|    2025-02-22|
|00944c192e588e812...|f3de540ebefb09aaf...|358859.89315310353| 6307671.723495424|657b8c452fcae8946...|358857.83758169226|6307697.508893435|  [2]|25.867201712157765|    2025-02-22|
|00e7887af5140e340...|f3de540ebefb09aaf...| 358451.6623842446| 6307001.59333

In [15]:
#Hacer mas sencillas las queries -> crear columnas de numero de eventos.
def add_event_counts(df: DataFrame) -> DataFrame:
    """
    Dado un DataFrame que contiene la columna 'event' (un collect_list de 1s y 2s),
    crea dos columnas nuevas con el conteo de 1s y 2s y elimina la columna 'event'.
    """
    df_new = (
        df
        # Cuenta el número de 1s en la lista
        .withColumn("number_of_type1_events", size(expr("filter(event, x -> x == 1)")))
        # Cuenta el número de 2s en la lista
        .withColumn("number_of_type2_events", size(expr("filter(event, x -> x == 2)")))
        # Elimina la columna original event
        .drop("event")
    )
    return df_new

final_df = add_event_counts(dist_df)
final_df.show(25)


+--------------------+--------------------+------------------+------------------+--------------------+------------------+------------------+------------------+--------------+----------------------+----------------------+
|              ID_geo|              comuna|      latitude_geo|     longitude_geo|            ID_event|    latitude_event|   longitude_event|          distance|date_processed|number_of_type1_events|number_of_type2_events|
+--------------------+--------------------+------------------+------------------+--------------------+------------------+------------------+------------------+--------------+----------------------+----------------------+
|00944c192e588e812...|f3de540ebefb09aaf...|358859.89315310353| 6307671.723495424|773d9d14c68cc25bc...| 358857.0619159535| 6307696.733227695|  25.1694777874695|    2025-02-22|                     0|                     1|
|00944c192e588e812...|f3de540ebefb09aaf...|358859.89315310353| 6307671.723495424|657b8c452fcae8946...|358857.8375816

Guardar las tablas para los diferentes equipos

1. Data scientists: Toda la información - muy granular. Archivo tipo CSV por facilidad
2. Analistas: Más agregada Archivo tipo CSV por facilidad

Se propone agregar columna de fecha para llevar registro de las semanas


In [24]:
#Para los cientificos de datos

final_df.write.mode("append").csv("/processed_data/csv/", header=True)

#Para los analistas

# Crear un nuevo DataFrame con solo las columnas requeridas para los analistas
dist_simplified_df = final_df.select(
    col("ID_geo"),
    col("comuna"),
    col("ID_event"),
    col("distance"),
    col("date_processed"),
    col("number_of_type1_events"),
    col("number_of_type2_events")
)
# Mostrar los primeros registros para verificar
dist_simplified_df.write.mode("append").csv("/processed_data_analista/csv/", header=True)



In [None]:
#Enviar los dos df a buckets en AWS

# Connect SQL Database to SQL (SQL Execution)

In [17]:
# Connection sqlite and init Database
import sqlite3

conn = sqlite3.connect('datalake_stage.db')
print("Opened database successfully");

curs = conn.cursor()


Opened database successfully


Conectar las tablas a SQL

In [18]:
labels_df_pandas = labels.toPandas()
labels_df_pandas.to_sql("labels", conn, if_exists="replace", index=False)
print("Tabla 'labels' cargada en SQLite")

geo_df_pandas = geo.toPandas()
geo_df_pandas.to_sql("geo", conn, if_exists="replace", index=False)
print("Tabla 'geo' cargada en SQLite")


dist_df_pandas = final_df.toPandas()
dist_df_pandas.to_sql("dist", conn, if_exists="replace", index=False)
print("Tabla 'dist' cargada en SQLite")


#Esta es la tabla en la que geo y labels están unidas, limpiada de duplicados
cleaned_df = cleaned_df.withColumn("event", expr("concat_ws(',', event)")) #Cambia el tipo de event
combined_df_pandas = cleaned_df.toPandas()
combined_df_pandas.to_sql("combined", conn, if_exists="replace", index=False)
print("Tabla 'combined' cargada en SQLite")

Tabla 'labels' cargada en SQLite
Tabla 'geo' cargada en SQLite
Tabla 'dist' cargada en SQLite
Tabla 'combined' cargada en SQLite


Pruebas de validación

In [19]:
curs.execute("SELECT COUNT(*) FROM labels")
print("Total de filas en labels:", curs.fetchone()[0])

curs.execute("SELECT COUNT(*) FROM geo")
print("Total de filas en geo:", curs.fetchone()[0])

curs.execute("""
    SELECT ID, COUNT(*)
    FROM combined ID
    GROUP BY ID
    HAVING COUNT(*) > 1
""")
print("Clientes en 'combined' con duplicados:", curs.fetchall())

#Revisar si hay valores NULL en los ID de la tabla creada
curs.execute("""
    SELECT *
    FROM dist
    WHERE ID_geo IS NULL OR ID_event IS NULL
""")
print("Clientes con datos nulos en distance_results:", curs.fetchall())

#Verificar si todos los labels tienen un id en GEO (podemos ver que no es el caso)
curs.execute("""
    SELECT COUNT(*)
    FROM labels l
    LEFT JOIN geo g ON l.ID = g.ID
    WHERE g.ID IS NULL;
""")
print("Clientes que están en labels y no en geo:", curs.fetchall())

Total de filas en labels: 84435
Total de filas en geo: 611959
Clientes en 'combined' con duplicados: []
Clientes con datos nulos en distance_results: []
Clientes que están en labels y no en geo: [(66954,)]


Encontrar las top 20 comunas con evento 2

In [20]:
query_top_20_communes = """
  SELECT
    comuna,
    SUM(number_of_type2_events) AS total_eventos
  FROM dist
  GROUP BY comuna
  ORDER BY total_eventos DESC
  LIMIT 20;
"""

curs.execute(query_top_20_communes)
top_20_communes = curs.fetchall()

print("Top 20 comunas con más eventos tipo 2:")
for row in top_20_communes:
    print(row)


Top 20 comunas con más eventos tipo 2:
('192579a17a7c6faf9fa70fcff6bc208aa9187dbe4ea70f69ffd991d180c1f823', 57780)
('d6914cd246f4202bf3291abeba40824e6148f7d4042302bcdb449ddf7f5183e1', 28667)
('b13b671cb296c1ce5eb94117f308118364cd258b322f61872cc7364dfcf5f2ad', 26748)
('630d8cf93464322637f0c27c04d65fd7565a04c43decc27f0f5685a689c23cb2', 22737)
('1663f043b1b1201a010d6965765c283b6e068ff0cd071fb7aa6ed1a7fd120172', 18831)
('b8df8fb0f19ee92b800af6f7bd277b2c6c0f660ff2ea13ae5bdaba762f92abf2', 15113)
('5029af19eafc8e0cf42b8bb6b2d05e7fb8c106e5926a2a03e541916e3f8812aa', 10363)
('0bd00d09bd2a382f3f8967d5dd3189af9d2573f6d49a0ba165ffa864cd4fb8bd', 9988)
('8e7e23148e55a25a0a788a413727bcf5079c21bc5f7310187fd4132c15404052', 8904)
('28c2b23257ff4840a5fb733bda4ffa4757d57a4e923ea8b7cc35556e76e80f0f', 8011)
('cc80409f27e2a1b5e3a0feeb3f976bdb13e93ce8144e686c8abdf6762ba6a1a7', 7749)
('4a8bc878fecae0db731883c790c3fdfc623220d388a990cf3be6429f214048ad', 7577)
('5fa3a58d4e35efd789c577a6f721358b0923c32a6a48f87ddae3

Calcular datos de eventos tipo 1

In [23]:
import pandas as pd

# Ejecuta la consulta para crear la tabla
query_event1_communes = """
    CREATE TABLE eventos_tipo_1 AS
    SELECT
        comuna,
        SUM(number_of_type1_events) AS total_events,
        SUM(latitude_event * number_of_type1_events)
          / NULLIF(SUM(number_of_type1_events), 0) AS avg_latitude,
        MAX(latitude_event) AS max_latitude,
        MIN(latitude_event) AS min_latitude,
        SUM(longitude_event * number_of_type1_events)
          / NULLIF(SUM(number_of_type1_events), 0) AS avg_longitude,
        MAX(longitude_event) AS max_longitude,
        MIN(longitude_event) AS min_longitude
    FROM dist
    WHERE number_of_type1_events > 0
    GROUP BY comuna;
"""

curs.execute(query_event1_communes)

# Cargar los datos en un DataFrame de Pandas
df = pd.read_sql_query("SELECT * FROM eventos_tipo_1", conn)

# Renombrar las columnas para mayor legibilidad
df.columns = [
    "Comuna",
    "num_eventos_tipo1",
    "avg_latitude",
    "max_latitude",
    "min_latitude",
    "avg_longitude",
    "max_longitude",
    "min_longitude"
]

print(df.head())


OperationalError: table eventos_tipo_1 already exists

In [None]:
# Close connection to save the db in file datalake_stage.db
conn.close()

In [None]:
# We will first load an sql extension into our environment
# This extension will allow us to work with sql on Colaboratory
#
%load_ext sql

# We will then connect to our in memory sqlite database
# NB: This database will cease to exist as soon as the database connection is closed.
# We will learn more about how databases are created later in prep.
#
%sql sqlite:///datalake_stage.db
# Nothing to do here

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [None]:
 %reload_ext sql