In [16]:
import duckdb
import json
from datetime import date, timedelta
import pandas as pd

In [17]:
with open("credentials.json") as f:
    d = json.load(f)

pg = d["postgres"]
aws = d["aws"]
bucket = "s3://carlos-s3-bdet-ducklake" 

def secreto(con):
    print("Iniciando")
    #con = duckdb.connect( )
    con.sql("INSTALL ducklake; LOAD ducklake;")
    con.sql("INSTALL spatial; LOAD spatial;")
    con.sql("INSTALL httpfs; LOAD httpfs;")
    con.sql("INSTALL postgres; LOAD postgres;")

    print("Librerias cargadas")

    con.sql(f"""
        CREATE OR REPLACE SECRET secreto_s3 (
        TYPE s3,
        KEY_ID '{aws["login"]}',
        SECRET '{aws["password"]}',
        REGION 'eu-central-1'
    )
    """)

    print("Secreto s3 creado")

    con.sql(f"""
        CREATE OR REPLACE SECRET secreto_postgres (
        TYPE postgres,
        HOST '{pg["host"]}',
        PORT {pg["port"]},
        DATABASE '{pg["schema"]}',
        USER '{pg["login"]}',
        PASSWORD '{pg["password"]}'
        )
    """)

    print("Secreto postgres creado")

    con.sql("""
        CREATE OR REPLACE SECRET secreto_ducklake (
            TYPE ducklake,
            METADATA_PATH '',
            METADATA_PARAMETERS MAP {'TYPE': 'postgres', 'SECRET': 'secreto_postgres'}
        );
        """)
    
    print("secreto ducklake creado")

    con.sql(f"""
        ATTACH 'ducklake:secreto_ducklake' AS mobility_ducklake (DATA_PATH '{bucket}', OVERRIDE_DATA_PATH TRUE) """)
    con.sql("""
        USE mobility_ducklake """)
    
    print("fin")

In [18]:
con = duckdb.connect()
secreto(con)

Iniciando
Librerias cargadas
Secreto s3 creado
Secreto postgres creado
secreto ducklake creado
fin


In [20]:
con.sql("SELECT * FROM gold_infrastructure_gaps LIMIT 10")

┌───────────┬────────────────┬────────────────────┬─────────────────┬──────────────────┬───────────┐
│ origin_id │ destination_id │    actual_trips    │ distance_meters │ potential_demand │ gap_ratio │
│  varchar  │    varchar     │       double       │      int32      │      int32       │  double   │
├───────────┼────────────────┼────────────────────┼─────────────────┼──────────────────┼───────────┤
│ 0200301   │ 0200304        │  45724.53656050141 │            4050 │           171483 │    0.2666 │
│ 0206902   │ 0200303        │            131.607 │           36962 │              492 │    0.2673 │
│ 0200304   │ 0202501        │ 39.565000000000005 │           81589 │              147 │     0.269 │
│ 0203002   │ 0206901        │              2.929 │           82145 │               11 │     0.276 │
│ 0200305   │ 0206902        │ 104.24000000000001 │           37871 │              376 │     0.277 │
│ 0208104   │ 0206902        │ 145.88299999999998 │           21036 │              526 │   

In [13]:
date_start = '2023-05-08'
date_end = '2023-01-08'

region = "POLYGON((536923.45 4188168.32, 535422.18 4515582.45, 729358.76 4518970.12, 739506.89 4191478.56, 536923.45 4188168.32))"

query = f"""--sql
CREATE OR REPLACE TABLE gold_gravity_model AS

WITH daily_trips AS (
    SELECT 
        origin_id,
        destination_id,
        SUM(travels) as actual_trips
    FROM silver_trips
    WHERE 
        -- Filtro ÚNICO de Fecha
        date BETWEEN '{date_start}' AND '{date_end}'
    GROUP BY 1, 2
)

SELECT 
    t.origin_id,
    t.destination_id,
    t.actual_trips,
    
    -- Distancia (Evitamos división por cero)
    GREATEST(d.distance_meters, 500.0) as dist_meters,
    
    -- Datos Demográficos
    CAST(pop.poblacion AS DOUBLE) as P_i,
    CAST(econ.renta_total_euros AS DOUBLE) as E_j,
    
    -- FÓRMULA DE GRAVEDAD
    -- G = (Pob_Origen * Renta_Destino) / Distancia^2
    (CAST(pop.poblacion AS DOUBLE) * CAST(econ.renta_total_euros AS DOUBLE)) 
        / POWER(GREATEST(d.distance_meters, 500.0), 2) as gravity_raw

FROM daily_trips t

-- Joins
JOIN silver_distances d 
    ON t.origin_id = d.origin_id 
    AND t.destination_id = d.destination_id

JOIN silver_demographics pop 
    ON t.origin_id = pop.distrito_id 
    AND pop.year = year(CAST('{date_start}' AS DATE))

JOIN silver_demographics econ 
    ON t.destination_id = econ.distrito_id 
    AND econ.year = year(CAST('{date_start}' AS DATE));
"""

In [7]:
con.sql(f"SELECT CAST('2023-05-08' AS DATE)")

┌────────────────────────────┐
│ CAST('2023-05-08' AS DATE) │
│            date            │
├────────────────────────────┤
│ 2023-05-08                 │
└────────────────────────────┘

In [14]:
con.sql(query)

In [15]:
con.sql("SELECT * FROM gold_gravity_model LIMIT 10")

┌───────────┬────────────────┬──────────────┬─────────────┬────────┬────────┬─────────────┐
│ origin_id │ destination_id │ actual_trips │ dist_meters │  P_i   │  E_j   │ gravity_raw │
│  varchar  │    varchar     │    double    │   double    │ double │ double │   double    │
├───────────┴────────────────┴──────────────┴─────────────┴────────┴────────┴─────────────┤
│                                         0 rows                                          │
└─────────────────────────────────────────────────────────────────────────────────────────┘

In [11]:
con.sql("SELECT * FROM gold_date_patterns")

┌────────────┬──────────────┐
│    date    │ pattern_type │
│    date    │    int32     │
├────────────┼──────────────┤
│ 2023-01-01 │            4 │
│ 2023-01-02 │            2 │
│ 2023-01-03 │            2 │
│ 2023-01-04 │            2 │
│ 2023-01-05 │            2 │
│ 2023-01-06 │            2 │
│ 2023-01-07 │            2 │
│ 2023-01-08 │            2 │
│ 2023-01-09 │            0 │
│ 2023-01-10 │            0 │
│     ·      │            · │
│     ·      │            · │
│     ·      │            · │
│ 2023-12-22 │            0 │
│ 2023-12-23 │            2 │
│ 2023-12-24 │            2 │
│ 2023-12-25 │            1 │
│ 2023-12-26 │            2 │
│ 2023-12-27 │            2 │
│ 2023-12-28 │            2 │
│ 2023-12-29 │            2 │
│ 2023-12-30 │            2 │
│ 2023-12-31 │            2 │
├────────────┴──────────────┤
│ 358 rows        2 columns │
│ (20 shown)                │
└───────────────────────────┘

In [None]:
date_start = date(2023, 1, 1)
date_end = date(2023, 12, 31)
current_date = pd.to_datetime(date_start)
final_date = pd.to_datetime(date_end)

In [17]:
date_start = date(2023, 1, 1)
date_end = date(2023, 12, 31)
current_date = pd.to_datetime(date_start)
final_date = pd.to_datetime(date_end)
configs = []

# con = duckdb.connect()
# secreto(con)
# con.sql("""
#             CREATE OR REPLACE TABLE temp_trips_batch_agg (
#                 origin_id VARCHAR, 
#                 destination_id VARCHAR, 
#                 partial_trips BIGINT
#             )""")
i = 1
while current_date <= final_date:
    # A. Calcular el final del mes actual
    # Truco: Ir al día 1 del mes siguiente y restar un día
    next_week = (current_date + pd.DateOffset(days=2))
    end_of_current_week = next_week - pd.Timedelta(days=1)
    
    # B. Definir el final del lote (Batch End)
    # Si el final del mes está ANTES que la fecha final global, cortamos en fin de mes.
    # Si no, cortamos en la fecha final global.
    batch_end_date = min(end_of_current_week, final_date)
    
    # C. Formatear para SQL
    s_str = current_date.strftime('%Y-%m-%d')
    e_str = batch_end_date.strftime('%Y-%m-%d')
    
    #print(f" >> Procesando lote: {s_str} al {e_str}")
    
    # D. Ejecutar Agregación Parcial
    query = f"""
    INSERT INTO temp_trips_batch_agg
    SELECT 
        origin_id, 
        destination_id, 
        SUM(travels) as partial_trips
    FROM silver_trips
    WHERE date BETWEEN '{s_str}' AND '{e_str}'
    GROUP BY 1, 2
    """
    #con.sql(query_insert)
    print(query)
    print(i)
    i += 1
    current_date = batch_end_date + pd.Timedelta(days=1)


    INSERT INTO temp_trips_batch_agg
    SELECT 
        origin_id, 
        destination_id, 
        SUM(travels) as partial_trips
    FROM silver_trips
    WHERE date BETWEEN '2023-01-01' AND '2023-01-02'
    GROUP BY 1, 2
    
1

    INSERT INTO temp_trips_batch_agg
    SELECT 
        origin_id, 
        destination_id, 
        SUM(travels) as partial_trips
    FROM silver_trips
    WHERE date BETWEEN '2023-01-03' AND '2023-01-04'
    GROUP BY 1, 2
    
2

    INSERT INTO temp_trips_batch_agg
    SELECT 
        origin_id, 
        destination_id, 
        SUM(travels) as partial_trips
    FROM silver_trips
    WHERE date BETWEEN '2023-01-05' AND '2023-01-06'
    GROUP BY 1, 2
    
3

    INSERT INTO temp_trips_batch_agg
    SELECT 
        origin_id, 
        destination_id, 
        SUM(travels) as partial_trips
    FROM silver_trips
    WHERE date BETWEEN '2023-01-07' AND '2023-01-08'
    GROUP BY 1, 2
    
4

    INSERT INTO temp_trips_batch_agg
    SELECT 
        origin_id,

In [14]:
con = duckdb.connect()
secreto(con)
con.sql("""SELECT 
        origin_id, 
        destination_id, 
        SUM(travels) as partial_trips
    FROM silver_trips
    WHERE date BETWEEN '2023-01-01' AND '2023-01-02'
    GROUP BY 1, 2""")

Iniciando
Librerias cargadas
Secreto s3 creado
Secreto postgres creado
secreto ducklake creado
fin


┌───────────┬────────────────┬────────────────────┐
│ origin_id │ destination_id │   partial_trips    │
│  varchar  │    varchar     │       double       │
├───────────┼────────────────┼────────────────────┤
│ 0101701   │ 3101301        │            153.395 │
│ 0105501   │ 0905701        │  383.5989999999999 │
│ 0314001   │ 4613105        │              2.793 │
│ 0314006   │ 0311601        │ 2412.2443651794797 │
│ 0810205   │ 0825001        │ 29210.058784820536 │
│ 0819601   │ 0820801        │  777.1475442473998 │
│ 0819406   │ 0801909        │ 13643.552725735703 │
│ 0830701   │ 0801902        │ 456.35200000000003 │
│ 1103106   │ 1101505        │ 11662.593475582004 │
│ 0901801   │ 0916401        │           1450.788 │
│    ·      │    ·           │               ·    │
│    ·      │    ·           │               ·    │
│    ·      │    ·           │               ·    │
│ 3123601   │ 3190101        │ 398.41833333334006 │
│ 4625801   │ 1207801        │              9.416 │
│ 5017501   

In [20]:
con.sql("DROP TABLE bronze_mitma_viajes_distritos")

In [18]:
con.sql("SELECT * FROM gold_date_patterns LIMIT 10")

┌────────────┬──────────────┐
│    date    │ pattern_type │
│    date    │    int32     │
├────────────┼──────────────┤
│ 2023-01-01 │            2 │
│ 2023-01-02 │            0 │
│ 2023-01-03 │            0 │
│ 2023-01-04 │            0 │
│ 2023-01-05 │            0 │
│ 2023-01-06 │            1 │
│ 2023-01-07 │            1 │
└────────────┴──────────────┘