In [None]:
import pandas as pd
from sqlalchemy import create_engine, text
import logging

In [None]:
# Configuração de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [None]:
# Configuração do engine do SQLAlchemy para conectar ao PostgreSQL
engine = create_engine('postgresql://airflow:airflow@127.0.0.1:5432/airflow', echo=False)


In [None]:
def criar_schema_dw():
    with engine.begin() as conn:
        try:
            conn.execute(text("CREATE SCHEMA dw;"))
            logging.info("Schema DW criado.")
        except Exception as e:
            logging.info("Schema DW já existe.")

In [None]:
def criar_tabela_fato_contratos():
    comando_sql = """
        CREATE TABLE IF NOT EXISTS dw.fato_contratos (
            id_fato_contrato SERIAL PRIMARY KEY,
            valor_contrato NUMERIC,
            valor_can_rstpg NUMERIC,
            valor_original_concedente NUMERIC,
            valor_original_contrapartida NUMERIC,
            valor_atualizado_concedente NUMERIC,
            valor_atualizado_contrapartida NUMERIC,
            calculated_valor_aditivo NUMERIC,
            calculated_valor_ajuste NUMERIC,
            calculated_valor_empenhado NUMERIC,
            calculated_valor_pago NUMERIC,
            data_assinatura DATE,
            data_processamento DATE,
            data_termino DATE,
            data_publicacao_doe DATE,
            data_auditoria DATE,
            data_termino_original DATE,
            data_inicio DATE,
            data_rescisao DATE,
            data_finalizacao_prestacao_contas DATE,
            id_entidade BIGINT REFERENCES dw.dim_entidade(id_entidade),
            id_modalidade BIGINT REFERENCES dw.dim_modalidade(id_modalidade),
            id_projeto BIGINT REFERENCES dw.dim_projeto(id_projeto),
            id_contrato BIGINT REFERENCES dw.dim_contrato(id_contrato),
            id_participacao BIGINT REFERENCES dw.dim_participacao(id_participacao)
        );
    """

    with engine.begin() as conn:
        conn.execute(text(comando_sql))
        logging.info("Tabela fato_contratos criada ou já existente.")

In [1]:
def extrair_dados(engine, nome_tabela, schema='stage'):
    query = f"SELECT * FROM {schema}.{nome_tabela};"
    df = pd.read_sql_query(query, engine)
    logging.info(f"Dados extraídos de {schema}.{nome_tabela} com sucesso.")
    return df

In [2]:
def carregar_dados_fato_contratos(engine, df):
    schema = 'dw'
    nome_tabela = 'fato_contratos'
    df.to_sql(nome_tabela, engine, schema=schema, if_exists='append', index=None, method='multi')
    logging.info(f"Dados carregados em {schema}.{nome_tabela} com sucesso.")
    

In [3]:
def limpar_tabelas():
    tabelas = [
        'dw.fato_contratos',
        # Adicione os nomes de outras tabelas aqui
    ]
    with engine.begin() as conn:
        for tabela in tabelas:
            try:
                conn.execute(text(f"TRUNCATE TABLE {tabela} RESTART IDENTITY CASCADE;"))
                logging.info(f"Tabela {tabela} limpa com sucesso.")
            except Exception as e:
                logging.warning(f"Tabela {tabela} não pôde ser limpa: {str(e)}")
            

In [None]:
import pandas as pd
from sqlalchemy import create_engine, text
import logging

# Configuração de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Configuração do engine do SQLAlchemy para conectar ao PostgreSQL
engine = create_engine('postgresql://airflow:airflow@host.docker.internal:5432/airflow', echo=False)

def criar_schema_dw():
    with engine.begin() as conn:
        try:
            conn.execute(text("CREATE SCHEMA dw;"))
            logging.info("Schema DW criado.")
        except Exception as e:
            logging.error(f"Erro ao criar schema DW: {e}")

def criar_tabela_fato_contratos():
    comando_sql = """
        CREATE TABLE IF NOT EXISTS dw.fato_contratos (
            id_fato_contrato SERIAL PRIMARY KEY,
            valor_contrato NUMERIC,
            valor_can_rstpg NUMERIC,
            valor_original_concedente NUMERIC,
            valor_original_contrapartida NUMERIC,
            valor_atualizado_concedente NUMERIC,
            valor_atualizado_contrapartida NUMERIC,
            calculated_valor_aditivo NUMERIC,
            calculated_valor_ajuste NUMERIC,
            calculated_valor_empenhado NUMERIC,
            calculated_valor_pago NUMERIC,
            data_assinatura DATE,
            data_processamento DATE,
            data_termino DATE,
            data_publicacao_doe DATE,
            data_auditoria DATE,
            data_termino_original DATE,
            data_inicio DATE,
            data_rescisao DATE,
            data_finalizacao_prestacao_contas DATE,
            num_contrato TEXT,
            cod_orgao TEXT,
            descricao_modalidade TEXT,
            gestor_contrato TEXT,
            descricao_edital TEXT,
            id_entidade BIGINT REFERENCES dw.dim_entidade(id_entidade),
            id_modalidade BIGINT REFERENCES dw.dim_modalidade(id_modalidade),
            id_projeto BIGINT REFERENCES dw.dim_projeto(id_projeto),
            id_contrato BIGINT REFERENCES dw.dim_contrato(id_contrato),
            id_participacao BIGINT REFERENCES dw.dim_participacao(id_participacao)
        );
    """
    with engine.begin() as conn:
        conn.execute(text(comando_sql))
        logging.info("Tabela fato_contratos atualizada com colunas adicionais.")

def extrair_dados(engine, nome_tabela, schema='stage'):
    query = f"SELECT * FROM {schema}.{nome_tabela};"
    df = pd.read_sql_query(query, engine)
    logging.info(f"Dados extraídos de {schema}.{nome_tabela} com sucesso.")
    return df

def carregar_dados_fato_contratos(engine, df):
    schema = 'dw'
    nome_tabela = 'fato_contratos'
    df.to_sql(nome_tabela, engine, schema=schema, if_exists='append', index=False, method='multi')
    logging.info(f"Dados carregados em {schema}.{nome_tabela} com sucesso.")

def limpar_tabelas(engine):
    tabelas = ['dw.fato_contratos']
    with engine.begin() as conn:
        for tabela in tabelas:
            conn.execute(text(f"TRUNCATE TABLE {tabela} RESTART IDENTITY CASCADE;"))
            logging.info(f"Tabela {tabela} limpa com sucesso.")

def main():
    criar_schema_dw()
    limpar_tabelas(engine)
    criar_tabela_fato_contratos()
    
    df_contratos = extrair_dados(engine, 'contratos')
    # Certifique-se de que as colunas novas estejam presentes no DataFrame. Se necessário, faça adaptações.
    # Exemplo de como adicionar uma nova coluna se ela já não estiver presente no DataFrame extraído:
    # df_contratos['num_contrato'] = df_contratos.apply(lambda row: <lógica_para_definir_valor>, axis=1)
    
    carregar_dados_fato_contratos(engine, df_contratos)
    
    logging.info("Processo de ETL concluído com sucesso.")

if __name__ == "__main__":
    main()
