In [0]:
# coding=utf-8
import psycopg2
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType, DoubleType, DecimalType, BooleanType, DateType, TimestampType
from datetime import datetime, timedelta


url = dbutils.secrets.get("#####-secrets", "#####-url")
user = dbutils.secrets.get("#####-secrets", "#####-user")
password = dbutils.secrets.get("#####-secrets", "#####-password")

PORT = 5432
DATABASE_NAME = '#####'

INITIAL_START_DATE_STR = '2025-07-25T00:00:00.00'
NUM_DAYS_TO_LOAD = 30
DEFAULT_INCREMENT_INTERVAL_HOURS = 24

def map_postgres_to_spark_type(pg_type_code, pg_type_name):
    """
    Mapeia os tipos de dados do PostgreSQL para os tipos de dados do Spark.
    """
    if pg_type_name in ('int2', 'int4'):
        return IntegerType()
    elif pg_type_name in ('int8', 'bigint'):
        return LongType()
    elif pg_type_name in ('float4', 'real'):
        return FloatType()
    elif pg_type_name in ('float8', 'double precision'):
        return DoubleType()
    elif pg_type_name in ('numeric', 'decimal'):
        return DecimalType(38, 18)
    elif pg_type_name in ('bool', 'boolean'):
        return BooleanType()
    elif pg_type_name in ('date'):
        return DateType()
    elif pg_type_name in ('timestamp', 'timestamptz'):
        return TimestampType()
    elif pg_type_name in ('jsonb', 'json', 'text', 'varchar', 'char', 'uuid', 'cidr', 'inet', 'macaddr'):
        return StringType()
    
    print(f"Aviso: Tipo PostgreSQL '{pg_type_name}' (OID: {pg_type_code}) não tem mapeamento direto para Spark. Usando StringType.")
    return StringType()

def get_postgres_connection(db_url, db_user, db_password, db_port, db_name):
    """
    Cria e retorna uma conexão com o banco de dados PostgreSQL.
    """
    return psycopg2.connect(
        host=db_url,
        user=db_user,
        password=db_password,
        port=db_port,
        dbname=db_name
    )

def infer_and_get_schema(conn, sql_query_template):
    """
    Inferencia o schema PySpark a partir de uma query SQL de template,
    consultando o banco de dados PostgreSQL.
    """
    cur = None
    try:
        cur = conn.cursor()
        if "WHERE" in sql_query_template.upper():
            base_query_for_schema = sql_query_template.split("WHERE")[0].strip()
        else:
            base_query_for_schema = sql_query_template.strip()
        
        schema_query = f"{base_query_for_schema} LIMIT 1;"
        
        print(f"Inferindo schema com query: {schema_query}")
        cur.execute(schema_query)

        column_descriptions = cur.description
        spark_schema_fields = []
        for col_desc in column_descriptions:
            col_name = col_desc[0]
            pg_type_code = col_desc[1]

            cur_type_name = conn.cursor()
            cur_type_name.execute(f"SELECT typname FROM pg_type WHERE oid = {pg_type_code};")
            pg_type_name = cur_type_name.fetchone()[0]
            cur_type_name.close()

            spark_type = map_postgres_to_spark_type(pg_type_code, pg_type_name)
            spark_schema_fields.append(StructField(col_name, spark_type, True))

        schema = StructType(spark_schema_fields)
        return schema
    finally:
        if cur:
            cur.close()

def get_max_updated_at_from_unity_catalog(unity_catalog_table_name):
    """
    Busca a data máxima da coluna 'updated_at' na tabela do Unity Catalog.
    Retorna None se a tabela não existir ou estiver vazia.
    """
    try:
        spark.sql(f"DESCRIBE TABLE {unity_catalog_table_name}").collect()
        
        max_date_df = spark.sql(f"SELECT MAX(updated_at) FROM {unity_catalog_table_name}")
        max_date_row = max_date_df.collect()[0]
        
        if max_date_row[0] is not None:
            print(f"Data máxima 'updated_at' encontrada no Unity Catalog: {max_date_row[0]}")
            return max_date_row[0]
        else:
            print(f"Tabela '{unity_catalog_table_name}' no Unity Catalog está vazia.")
            return None
    except Exception as e:
        print(f"Erro ao buscar data máxima no Unity Catalog (tabela pode não existir): {e}")
        return None

def ingest_data_to_unity_catalog(
    catalog_name,
    schema_name,
    table_name,
    base_sql_query_template,
    initial_start_dt_str=None,
    num_days_to_load=NUM_DAYS_TO_LOAD,
    increment_interval_hours=DEFAULT_INCREMENT_INTERVAL_HOURS
):
    """
    Ingere dados de forma incremental de uma base de dados PostgreSQL para uma tabela
    no Unity Catalog do Databricks, com base na coluna 'updated_at'.
    
    Args:
        catalog_name (str): O nome do catálogo no Unity Catalog.
        schema_name (str): O nome do schema de destino.
        table_name (str): O nome da tabela de destino.
        base_sql_query_template (str): A query SQL base para buscar os dados do PostgreSQL.
            Deve conter uma cláusula `WHERE` para ser complementada pela função.
        initial_start_dt_str (str, opcional): Data de início para a carga inicial no formato
            'YYYY-MM-DDTHH:MM:SS.ff'. Se None, usará o valor padrão da constante.
            Padrão: None.
        num_days_to_load (int): Número de dias a serem carregados a partir da data de início.
            Padrão: 30.
        increment_interval_hours (int): O intervalo de tempo, em horas, para cada carga incremental.
            Padrão: 24 horas.
    """
    
    start_date_to_use = initial_start_dt_str if initial_start_dt_str is not None else INITIAL_START_DATE_STR

    unity_catalog_table = f"{catalog_name}.{schema_name}.{table_name}"
    interval_td = timedelta(hours=increment_interval_hours)

    max_uc_date = get_max_updated_at_from_unity_catalog(unity_catalog_table)
    
    if max_uc_date:
        current_interval_start = max_uc_date + timedelta(microseconds=1)
        print(f"Iniciando carga incremental a partir da data máxima do Unity Catalog: {current_interval_start}")
    else:
        current_interval_start = datetime.strptime(start_date_to_use, '%Y-%m-%dT%H:%M:%S.%f')
        print(f"Iniciando carga inicial a partir de: {current_interval_start}")

    calculated_end_datetime = current_interval_start + timedelta(days=num_days_to_load)
    end_datetime = min(calculated_end_datetime, datetime.now())
    
    if current_interval_start >= end_datetime:
        print(f"A data de início ({current_interval_start}) é igual ou posterior à data de fim calculada ({end_datetime}). Nenhuma nova carga será realizada.")
        return

    conn_schema = None
    inferred_schema = None
    try:
        conn_schema = get_postgres_connection(url, user, password, PORT, DATABASE_NAME)
        inferred_schema = infer_and_get_schema(conn_schema, base_sql_query_template)
        print("--- Schema PySpark Inferido ---")
        spark.createDataFrame([], inferred_schema).printSchema()
        print("-------------------------------")
    except Exception as e:
        print(f"Erro ao inferir o schema: {e}")
        return
    finally:
        if conn_schema:
            conn_schema.close()

    if not inferred_schema:
        print("Não foi possível inferir o schema. Abortando.")
        return

    while current_interval_start < end_datetime:
        current_interval_end = current_interval_start + interval_td

        if current_interval_end > end_datetime:
            current_interval_end = end_datetime

        start_date_str_sql = current_interval_start.strftime('%Y-%m-%d %H:%M:%S.%f')
        end_date_str_sql = current_interval_end.strftime('%Y-%m-%d %H:%M:%S.%f')

        sql_query = f"{base_sql_query_template.strip()} AND updated_at >= '{start_date_str_sql}' AND updated_at < '{end_date_str_sql}'"

        print(f"\n--- Carregando dados para o período: {start_date_str_sql} a {end_date_str_sql} ---")
        print(f"Query SQL: {sql_query}")

        conn = None
        try:
            conn = get_postgres_connection(url, user, password, PORT, DATABASE_NAME)
            cur = conn.cursor()
            cur.execute(sql_query)
            rows = cur.fetchall()
            cur.close()

            if rows:
                df = spark.createDataFrame(rows, schema=inferred_schema)
                print(f"DataFrame PySpark criado com {df.count()} linhas.")
                
                df.write.mode("append").saveAsTable(unity_catalog_table)
                print(f"Dados gravados com sucesso na tabela do Unity Catalog: {unity_catalog_table}")
                
            else:
                print("Nenhum dado encontrado para este período.")

        except psycopg2.Error as e:
            print(f"Erro ao conectar, consultar ou gravar o banco de dados para o período {start_date_str_sql} a {end_date_str_sql}: {e}")
        except Exception as e:
            print(f"Ocorreu um erro inesperado durante o carregamento do período {current_interval_start} a {end_date_str_sql}: {e}")
        finally:
            if conn:
                conn.close()
        
        current_interval_start = current_interval_end
        
        if current_interval_start >= end_datetime:
             break

    print("\n--- Carregamento incremental concluído ---")
