# ETL DataSUS

## Importação de bibliotecas

In [65]:
from pysus.ftp.databases.cnes import CNES
import psycopg2
from sqlalchemy import create_engine
import time
import os
import shutil

## Extração dos Dados e conversão para DataFrame

In [2]:
cnes = CNES()

In [4]:
def extracao_cnes_leitos(uf, ano, mes=[1,2,3,4,5,6,7,8,9,10,11,12]):
    arquivos = cnes.get_files('LT', uf=uf, year=ano, month=mes)

    return arquivos

In [5]:
def geracao_nomes_dataframes(arquivos):
    nomes = [cnes.describe(arquivo)['name'].split('.')[0] for arquivo in arquivos]
    
    return nomes

In [6]:
def download_cnes_leitos(arquivos):
    arquivos_parquet = cnes.download(arquivos)

    return arquivos_parquet

In [64]:
def remove_arquivos():
    diretorio_pysus = '/home/nandaoc/pysus/'
    diretorios_parquet = os.listdir(diretorio_pysus)

    for diretorio in diretorios_parquet:
        caminho_diretorio = os.path.join(diretorio_pysus, diretorio)
        if os.path.isdir(caminho_diretorio) and diretorio.endswith('.parquet'):
            shutil.rmtree(caminho_diretorio)
            print(f'O diretório {diretorio} foi excluído com sucesso.')

In [80]:
def parquet_para_dataframe(arquivos_parquet, nomes_arquivos):
    dataframes = []
    i = 0
    
    for arquivo in arquivos_parquet:
        dataframe = {}
        nome_dataframe = f'{nomes_arquivos[i]}'
        dataframe[nome_dataframe] = arquivo.to_dataframe()
        dataframes.append(dataframe)
        i += 1

    return dataframes

## Persistência dos dados em staging

In [87]:
def concexao_bancopg(host, porta, banco, usuario, senha):
    return f'postgresql://{usuario}:{senha}@{host}:{porta}/{banco}'

In [1]:
def insercao_dados(tabela, conexao, dataframes):
    i = 0
    engine = create_engine(conexao)
    
    for dicionario_df in dataframes:
        chave = nomes[i]
        df = dicionario_df[chave]
        df = df.astype('object')
        df.columns = df.columns.str.lower()
        df.to_sql(tabela, engine, if_exists='append', index=False)
        i += 1
    
    print('Dados persistidos no banco com sucesso.')
    print('Limpando diretório...')
    time.sleep(3)
    remove_arquivos()    