## Optimización de un Delta Lake

In [71]:
from datetime import datetime, timedelta
import itertools

from deltalake import DeltaTable, write_deltalake
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc

### Creando un Delta Lake con todos los viajes de taxis para un año

In [73]:
anyo_actual = datetime.now().year
numero_anyos = anyo_actual - 2021
for m in range(1,13):
    nombre_fichero_csv = f"yellow_tripdata_2021-{m:02d}.csv"
    taxis = pd.read_csv (
        f"./datos/taxis/{nombre_fichero_csv}", 
        sep=';', 
        parse_dates=["tpep_pickup_datetime","tpep_dropoff_datetime"] ,  
        dtype={"store_and_fwd_flag": "str"}
    )
    taxis["tpep_pickup_datetime"] =  taxis["tpep_pickup_datetime"] + pd.DateOffset(years=numero_anyos)
    taxis["tpep_dropoff_datetime"] =  taxis["tpep_dropoff_datetime"] + pd.DateOffset(years=numero_anyos)
    taxis["pickup_date"] = pd.to_datetime(taxis["tpep_pickup_datetime"].dt.date)
    taxis["pickup_year_month"] = taxis["tpep_pickup_datetime"].dt.year * 100 + taxis["tpep_pickup_datetime"].dt.month

    taxis = taxis[(taxis["pickup_date"].dt.year == anyo_actual) & (taxis["pickup_date"].dt.month == m)]
    
    print(f"Agregando datos del fichero {nombre_fichero_csv} al Delta Lake...")
    write_deltalake("./datos/taxis_delta/", taxis, mode="append")

Agregando datos del fichero yellow_tripdata_2021-01.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-02.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-03.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-04.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-05.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-06.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-07.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-08.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-09.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-10.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-11.csv al Delta Lake...
Agregando datos del fichero yellow_tripdata_2021-12.csv al Delta Lake...


In [74]:
dt = DeltaTable("./datos/taxis_delta/")
taxis = dt.to_pandas()

### Particiones

In [75]:
write_deltalake("./datos/taxis_delta_part_month/", taxis, partition_by="pickup_year_month")

### Z Order

In [None]:
write_deltalake("./datos/taxis_delta_z", taxis, name="taxis")

In [None]:
dt = DeltaTable("./datos/taxis_delta_z")
dt.optimize.z_order(columns=["pickup_year_month"])

### Compactación de ficheros pequeños

In [None]:
def record_observations(date):
    """Pulls data for a certain datetime"""
    nrows = 1000
    return pa.table (
        {
            "date": pa.array([date.date()] * nrows),
            "timestamp": pa.array([date] * nrows),
            "value": pc.random(nrows),
        }
    )

# Every hour starting at midnight on 2023-12-14
hours_iter = (datetime(2023, 12, 14) + timedelta(hours=i) for i in itertools.count())

# Write 100 hours worth of data
for timestamp in itertools.islice(hours_iter, 100):
    write_deltalake(
        "./datos/observation_data",
        record_observations(timestamp),
        partition_by=["date"],
        mode="append",
    )


In [None]:
dt = DeltaTable('./datos/observation_data')
dt.to_pandas()

In [None]:
dt.files()

In [None]:
dt.optimize.compact()

In [None]:
dt.files()

In [None]:
dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)