In [0]:
import requests
from pyspark.sql.functions import *
from pyspark.sql.types import *
import uuid
from delta.tables import *
from datetime import datetime
import time

from sodapy import Socrata

In [0]:
dbutils.widgets.dropdown("env", "dev", ["dev", "prod"], "Execution Environment")
dbutils.widgets.dropdown("source_data", "yellow_tripdata", ["yellow_tripdata", "green_tripdata"], "NYCOpenData")

env = dbutils.widgets.get('env')
source_data = dbutils.widgets.get('source_data')

In [0]:
# Criação dos Schemas

def create_schemas(catalog: str, schemas: list[str]):
    """
    Criação dos schemas no respectivo catálogo.

    Args:
        - catalog(str): nome do catálogo.
        - schemas(list[str]): lista com o nome dos schemas a serem criados.
    """

    for schema in schemas:

        full_schema = f"{catalog}.{schema}"
        print(f"Garantindo que o respectivo Schema existe: {full_schema}")
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS {full_schema}")

schemas = ["bronze", "silver", "metadata"]
catalog = "main"

create_schemas(catalog, schemas)

In [0]:
# Recuperando os segredos

scope = 'db-scope'
aws_account_id = dbutils.secrets.get(scope, 'aws_account_id')
nyc_app_token = dbutils.secrets.get(scope, 'nyc_app_token')

In [0]:
# Definindo variáveis de execução

# env = 'dev'
aws_region = 'us-east-2'
bucket_s3 = 'ifood-architect-taxi-case'
s3_path = f"s3://{bucket_s3}-{env}-{aws_region}-{aws_account_id}"

In [0]:
# Criação da classe NYCData com os atributos respectivos ao objeto 

class NYCData():

    def __init__(self, source: str, dataset_id: str, datetime_col: str) -> None:
        self.source = source
        self.dataset_id = dataset_id
        self.datetime_col = datetime_col
    

if source_data == 'yellow_tripdata':
    nyc_data = NYCData('yellow_tripdata', '4b4i-vvec', 'tpep_pickup_datetime')

if source_data == 'green_tripdata':
    nyc_data = NYCData('green_tripdata', 'peyi-gg4n', 'lpep_pickup_datetime')

In [0]:
def create_deterministic_uuid(text: str) -> str:
    """
    Criação de um UUID determinístico a partir de um texto.

    Args:
        - text(str): texto de origem.

    Returns:
        str: uuid determinístico.
    """

    return str(uuid.uuid5(uuid.NAMESPACE_OID, text))

# Registro da função como UDF

uuid5_udf = udf(create_deterministic_uuid, StringType())

In [0]:
# # Registrando External Table

# def register_external_delta_table(
#     catalog_name: str, 
#     schema_name: str, 
#     table_name: str, 
#     s3_path: str
# ):
#     """
#     Registro das tabelas externas.

#     Args:
#         - catalog_name(str): nome do catálogo.
#         - schema_name(str): nome do schema.
#         - table_name(str): nome da tabela.
#         - s3_path(str): caminho do objeto no bucket s3.
#     """

    
#     full_table_name = f"{catalog_name}.{schema_name}.{table_name}"
    
#     sql_command = f"""
#     CREATE TABLE IF NOT EXISTS {full_table_name}
#     USING DELTA 
#     LOCATION '{s3_path}/{table_name}'
#     """
    
#     print(f"Registrando tabela: {full_table_name}...")
#     spark.sql(sql_command)
#     print(f"Tabela '{full_table_name}' registrada, apontando para: {s3_path}/{table_name}")

# # Definindo uma lista com os datasets

# datasets = ["green_tripdata", "yellow_tripdata"]

# # Iterando sobre os datasets para registro das tabelas externas com os dados NYCOpenData

# for dataset in datasets:

#     register_external_delta_table(
#         catalog_name = "main",
#         schema_name = "bronze",
#         table_name = dataset,
#         s3_path = s3_path
#     )

# # Registro da tabela de checkpoint

# register_external_delta_table(
#     catalog_name = "main"
#     , schema_name = "metadata"
#     , table_name = "socrata_checkpoint"
#     , s3_path = s3_path
# )

In [0]:
def get_last_success_datetime(source: str, initial_datetime: str = '2023-01-01T00:00:00'):

    try: 

        checkpoint_df = spark.table("main.metadata.socrata_checkpoint")

        result = (
            checkpoint_df
            .filter(col("pipeline_name") == source)
            .select("last_successful_datetime")
        ).collect()

        if result and result[0][0]:
            print(f"Checkpoint encontrado para {source}: {result[0][0]}")
            return result[0][0]

    except Exception:
        pass 

    print(f"Nenhum checkpoint encontrado para {source}. Iniciado em {initial_datetime}")
    return initial_datetime

In [0]:
def chunk_generator(generator, chunk_size: int):
    chunk = []
    for item in generator:
        chunk.append(item)
        if len(chunk) >= chunk_size:
            yield chunk
            chunk = []
    if chunk:
        yield chunk

In [0]:
def save_new_checkpoint(pipeline_source: str, new_datetime: str, s3_path: str):
    """
    Atualiza o checkpoint para o pipeline usando a operação MERGE INTO.
    """

    checkpoint_schema = StructType([
        StructField("pipeline_name", StringType(), False)
        , StructField("last_successful_datetime", StringType(), False)
        , StructField("updated_at", TimestampType(), False)
    ])
    
    new_checkpoint_df = spark.createDataFrame([
        (pipeline_source, new_datetime, datetime.now())
    ], checkpoint_schema)
    
    full_table_name = "main.metadata.socrata_checkpoint"
    delta_path = f"{s3_path}/socrata_checkpoint"

    if not spark.catalog.tableExists(full_table_name):
        new_checkpoint_df.write.format("delta").mode("overwrite").option("path", delta_path).saveAsTable(full_table_name)
    else:
        delta_table = DeltaTable.forName(spark, full_table_name)
    
        delta_table.alias("target") \
            .merge(
                source = new_checkpoint_df.alias("source"),
                condition = col("target.pipeline_name") == col("source.pipeline_name")
            ) \
            .whenMatchedUpdate(set = {
                "last_successful_datetime": col("source.last_successful_datetime"),
                "updated_at": col("source.updated_at")
            }) \
            .whenNotMatchedInsertAll() \
            .execute()
    
    print(f"Checkpoint atualizado com sucesso para: {new_datetime}")

In [0]:
def process_and_merge_chunk(chunk_list: list, chunk_index: int, s3_path: str, nyc_data: NYCData):

    if not chunk_list:
        return

    print(f"Processando Chunk #{chunk_index} com {len(chunk_list)} registros...")

    df = spark.createDataFrame(chunk_list)

    df = (
        df.withColumns({
            'ride_uuid': uuid5_udf(concat_ws("||", col("vendorid"), col(nyc_data.datetime_col), col("trip_distance"), col("fare_amount")))
            , 'date_partition': date_format(col(nyc_data.datetime_col), "yyyy-MM-dd")
            , 'updated_at': now()
        })
    )

    full_table_name = f"main.bronze.{nyc_data.source}"
    delta_path = f"{s3_path}/{nyc_data.source}"

    if not spark.catalog.tableExists(full_table_name):
        df.write.format("delta").mode("overwrite").option("path", delta_path).partitionBy("date_partition").saveAsTable(full_table_name)
    else:
        delta_table = DeltaTable.forName(spark, full_table_name)
        merge_condition = "target.ride_uuid = source.ride_uuid"
        
        delta_table.alias("target").merge(
            source = df.alias("source"),
            condition = merge_condition
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

    max_date_in_chunk = df.selectExpr(f"MAX({nyc_data.datetime_col})").collect()[0][0]

    save_new_checkpoint(nyc_data.source, max_date_in_chunk, s3_path)

    print(f"Chunk #{chunk_index} salvo com sucesso.")

In [0]:
def ingest_chunks(nyc_data: NYCData, s3_path: str):

    chunk_size = 50000
    initial_datetime = get_last_success_datetime(nyc_data.source)

    client = Socrata("data.cityofnewyork.us", nyc_app_token, timeout=1000)
    date_filter = f"{nyc_data.datetime_col} >= '{initial_datetime}' AND {nyc_data.datetime_col} < '2023-06-01T00:00:00'"

    data_generator = client.get_all(
        nyc_data.dataset_id
        , where = date_filter
        , order = f"{nyc_data.datetime_col} ASC"
    )

    chunk_index = 1
    for chunk in chunk_generator(data_generator, chunk_size):
        process_and_merge_chunk(chunk, chunk_index, s3_path, nyc_data)
        chunk_index += 1

        time.sleep(5)

print("Ingestão completa de todos os chunks.")

In [0]:
ingest_chunks(nyc_data, s3_path)

In [0]:
# chunk_size = 50000
# initial_datetime = get_last_success_datetime(yellow.source)

# client = Socrata("data.cityofnewyork.us", nyc_app_token, timeout=1000)
# date_filter = f"{yellow.datetime_col} >= '{initial_datetime}' AND {yellow.datetime_col} < '2023-06-01T00:00:00'"

# data_generator = client.get_all(
#     yellow.dataset_id
#     , where = date_filter
#     , order = f"{yellow.datetime_col} ASC"
# )

# chunk_index = 1
# for chunk in chunk_generator(data_generator, chunk_size):
#     process_and_merge_chunk(chunk, chunk_index, s3_path, yellow)
#     chunk_index += 1

#     time.sleep(5)

# print("Ingestão completa de todos os chunks.")