In [21]:
from ingestion import simulate_cdc_daily_parquets
import duckdb
import os
from dotenv import load_dotenv
import pandas as pd
from azure.storage.blob import BlobServiceClient
import io
import json
import pyarrow as pa
import pyarrow.parquet as pq
load_dotenv()


ADLS_CONN_STRING = os.getenv("ADLS_CONN_STRING")


In [27]:
def get_arrow_schema_from_json(tipo):
    """
    Lê o schema do arquivo schema/{tipo}.json e retorna um pyarrow.Schema correspondente.
    """
    schema_path = f"schema/{tipo}.json"
    with open(schema_path, "r", encoding="utf-8") as f:
        fields = json.load(f)
    pa_fields = []
    for field in fields:
        name = field["name"]
        typ = field["type"]
        # Mapear tipos simples
        if typ.startswith("int"):
            pa_type = pa.int32()
        elif typ.startswith("decimal"):
            # For decimal types, use float64 instead to avoid conversion issues
            pa_type = pa.float64()
        elif typ == "double":
            pa_type = pa.float64()
        elif typ == "string":
            pa_type = pa.string()
        elif typ == "timestamp":
            pa_type = pa.timestamp("ns")
        else:
            pa_type = pa.string()
        pa_fields.append(pa.field(name, pa_type))
    return pa.schema(pa_fields)

def simulate_cdc_daily_parquets(
    container_name,
    tipo="yellow",
    ano=2022,
    mes=1,
    fonte_prefix="",
    cdc_prefix="cdc",
    data_col_map=None
):
    """
    Lê um arquivo Parquet do Azure Data Lake Storage (ADLS) no container especificado, filtra por dia e salva arquivos Parquet diários no caminho cdc/{tipo}/{ano}/{mes}/{date}.parquet.
    O schema é lido de schema/{tipo}.json.
    """

    blob_service_client = BlobServiceClient.from_connection_string(ADLS_CONN_STRING)
    container_client = blob_service_client.get_container_client(container_name)

    # Descobre o arquivo Parquet de origem no ADLS
    file_name = f"{tipo}_tripdata_{ano}-{mes:02d}.parquet"
    blob_path = f"source/{tipo}/{ano}/{file_name}"
    print(blob_path)
    # Baixa o arquivo Parquet diretamente na memória
    blob_data = container_client.download_blob(blob_path).readall()
    
    # Descobre a coluna de data/hora
    if data_col_map is None:
        data_col_map = {
            "yellow": "tpep_pickup_datetime",
            "green": "lpep_pickup_datetime"
        }
    datetime_col = data_col_map[tipo]

    # Descobre todos os dias presentes no arquivo
    parquet_bytes = io.BytesIO(blob_data)
    df = pd.read_parquet(parquet_bytes)

    # Descobre todos os dias presentes no arquivo
    days = df[datetime_col].dt.date.unique()
    days = sorted(days)

    # Carrega o schema do tipo
    arrow_schema = get_arrow_schema_from_json(tipo)

    # Para cada dia, filtra e salva um parquet diário no caminho cdc/{tipo}/{ano}/{mes}/{date}.parquet
    for dia in days:
        dia_str = dia.strftime('%Y-%m-%d')
        out_blob_path = f"{cdc_prefix}/{tipo}/{ano}/{mes:02d}/{dia_str}.parquet"

        # Filtra os dados para o dia específico
        filtered_data = df[df[datetime_col].dt.date == dia]

        # Converte o DataFrame filtrado para Parquet em memória com o schema correto
        # Converte o DataFrame filtrado para Parquet em memória
        # Use schema inference instead of forcing a specific schema to avoid type conflicts
        table = pa.Table.from_pandas(filtered_data, preserve_index=False)
        output_buffer = io.BytesIO()
        pq.write_table(table, output_buffer, version="2.6")
        output_buffer.seek(0)
        # Envia para o blob no ADLS
        container_client.upload_blob(out_blob_path, output_buffer.getvalue(), overwrite=True)
        print(f"CDC diário gerado: {out_blob_path}")

simulate_cdc_daily_parquets(
    container_name="taxi",
    tipo="yellow",
    ano=2022,
    mes=3
)

source/yellow/2022/yellow_tripdata_2022-03.parquet
CDC diário gerado: cdc/yellow/2022/03/2008-12-31.parquet
CDC diário gerado: cdc/yellow/2022/03/2009-01-01.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-02-28.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-01.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-02.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-03.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-04.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-05.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-06.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-07.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-08.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-09.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-10.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-11.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-12.parquet
CDC diário gerado: cdc/yellow/2022/03/2022-03-13.parquet
CDC diário gerado: cdc/yellow/2022/03

In [28]:
simulate_cdc_daily_parquets(
    container_name="taxi",
    tipo="yellow",
    ano=2022,
    mes=4
)

source/yellow/2022/yellow_tripdata_2022-04.parquet
CDC diário gerado: cdc/yellow/2022/04/2008-12-31.parquet
CDC diário gerado: cdc/yellow/2022/04/2009-01-01.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-03-31.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-01.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-02.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-03.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-04.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-05.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-06.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-07.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-08.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-09.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-10.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-11.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-12.parquet
CDC diário gerado: cdc/yellow/2022/04/2022-04-13.parquet
CDC diário gerado: cdc/yellow/2022/04