In [None]:
import pymysql as mysql
import pandas as pd
import psycopg2 as psql
import os
from sqlalchemy import create_engine
from dotenv import dotenv_values


## CONFIGURAÇÂO DAS VARIAVEIS DE AMBIENTE

config = dotenv_values(".env")

HOST_MYSQL = config.get("HOST_MYSQL")
USER_MYSQL = config.get("USER_MYSQL")
PASSWORD_MYSQL = config.get("PASSWORD_MYSQL")
DB_MYSQL = config.get("DB_MYSQL")

HOST_PG = config.get("HOST_PG")
USER_PG = config.get("USER_PG")
PASSWORD_PG = config.get("PASSWORD_PG")
DB_PG = config.get("DB_PG")

PARQUET_DIR = os.path.join(os.getcwd())

## PROGRAMA COM INTUITO DE FAZER UM ETL ENTRE BANCO DE DADOS MYSQL E POSTGRESQL

def extract():
    try:
        
        
        path_exists = os.path.exists(PARQUET_DIR)

        ## CRIA O DIRETORIO DO PARQUET CASO NÃO EXISTA
        if not path_exists:
            os.makedirs(PARQUET_DIR)
            print(f'DIRETÓRIO {PARQUET_DIR} CRIADO')

        ## CONECTA COM O BANCO DE DADOS MYSQL
        conn_mysql = mysql.connect(host=HOST_MYSQL, user=USER_MYSQL, password=PASSWORD_MYSQL, database=DB_MYSQL)

        conn_mysql.autocommit = True

        cursor_mysql = conn_mysql.cursor()

        ## EXECUTA UMA QUERY NO BANCO BUSCANDO O NOME DAS TABELAS DE UM SCHEMA, FILTRANDO PELAS TABELAS USERS E PRODUCTS
        cursor_mysql.execute(f"SELECT table_name FROM information_schema.tables WHERE table_schema='{DB_MYSQL}' AND table_name IN ('users', 'products')")
        
        tables = cursor_mysql.fetchall()

        ## PEGA AS TABELAS DA QUERY E AS TRANSFORMA EM ARQUIVO PARQUET, ARMAZENANDO-OS NUM DIRETÓRIO (O PARQUET SALVA OS METADADOS COM OS TIPOS DAS COLUNAS EXTRAÍDAS DO MYSQL)
        for table in tables:

            table_name = table[0]
            print(f'Puxando tabela {table_name}')
            df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn_mysql)
            parquet_file = os.path.join(PARQUET_DIR, f"{table_name}.parquet")
            df.to_parquet(parquet_file, index = False)
            print(f"DADOS DA TABELA {table_name} SALVO EM {parquet_file}")
            
    except Exception as e:
        print(f"Data Connection Error {e}")
        return None
    finally:
        ## APÓS A EXTRAÇÃO SER FEITA, A CONEXÃO É FECHADA
        conn_mysql.close()

def load():
    try:

        ## CONEXAO COM O POSTGRESQL
        conn_pg = psql.connect(user=USER_PG, host=HOST_PG, port=1433, password=PASSWORD_PG)

        conn_pg.autocommit = True
        
        cursor_pg = conn_pg.cursor()

        ## VERIFICA A EXISTÊNCIA DO DB QUE IRÁ RECEBER A INGESTÃO
        cursor_pg.execute(f"SELECT datname from pg_database WHERE datname = '{DB_PG}'")

        db_exists = cursor_pg.fetchone()

        ## SE O DB NÃO EXISTIR, ELE É CRIADO NO POSTGRESQL
        if not db_exists:
            print("Banco de Dados não existe e será criado.")
            cursor_pg.execute(f"CREATE DATABASE {DB_PG}")
            print("Banco de Dados Criado.")
        
        ## ENGINE DO BD PSQL
        engine = create_engine(f'postgresql://{USER_PG}:{PASSWORD_PG}@{HOST_PG}:1433/{DB_PG}')

        ## LISTA DOS ARQUIVOS PARQUET

        parquet_files = [os.path.join(PARQUET_DIR, file) for file in os.listdir(PARQUET_DIR) if file.endswith(".parquet")]

        for parquet_file in parquet_files:
            
            ## EXTRAI O NOME DA TABELA A PARTIR DO CAMINHO DO ARQUIVO
            table_name = os.path.basename(parquet_file).replace(".parquet", "")

            ## LÊ O ARQUIVO COMO UM DATAFRAME

            df = pd.read_parquet(parquet_file)

            ## CARREGA O PARQUET NO BD PSQL
            df.to_sql(table_name, engine, if_exists='replace', index= False)
            print(f"TABELA {table_name} CARREGADA NO PSQL")
        
        print("CARGA CONCLUIDA COM SUCESSO")
    
    except Exception as e:
        print(e)
    finally:
        conn_pg.close()


extract()
load()
