# Coletando dados de saúde suplementar da ANS
Esta rotina de coleta foi feita para ser compatível com ambientes de computação distribuída, para chegar neste objetivo, será usado:

- `PySpark` para rotina de coleta e armazenamento de dados
- Oracle Cloud Infrastructure (OCI), modalidade gratuita para Data Warehousing

Inicialmente, este projeto foi pensado para executar nas instâncias de computação distribuída gratuitas do **Databicks**, mas isto acabou sendo descartado por causa do método de autenticação escolhido para acessar o banco de dados. Para manter o maior nível de segurança, esta rotina será executada localmente.




### Dependências

Vamos instalar um [pacote desenvolvido por mim](https://pypi.org/project/ftp-download/) para superar o desafio de realizar múltiplos downloads de arquivos em servidores usando um código simples, e na sequência vamos instalar as demais dependências, incluindo o PySpark, que será usado aqui para transportar os dados para o banco de dados remoto.

Para mais informações sobre como o ambiente de desenvolvimento foi preparado para executar o código abaixo, visite [este artigo](https://medium.com/data-hackers/prepare-seu-ambiente-de-estudos-de-big-data-com-pyspark-e-uma-data-warehouse-na-nuvem-8adacd895ccf).

In [4]:
# %%python3 -m pip install --upgrade -q pip
# %%python3 -m pip install -q ftp_download pyspark==3.5.1

In [7]:
import sys
import re
import ftp_download as ftpd
from os import path, listdir, makedirs
import logging
from ftplib import FTP
from getpass import getpass
from shutil import unpack_archive
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, FloatType, StructType, StructField

spark = SparkSession.builder.appName("dbInteract").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")


### Preparando a rotina de coleta

Nosso objetivo é conectar à um servidor FTP e baixar uma quantidade considerável de arquivos, o design da rotina abaixo leva em consideração **nossas limitações, que são**:

1. Os arquivos são `.csv` compactados dentro de arquivos `.zip`;
2. Existem dois tipos de tabelas: 
  a. *Detalhada*, e
  b. *Consolidada*, estas devem compor tabelas diferentes do nosso banco de dados;
3. Todos estes arquivos estão separados em pastas, uma para cada estado

A **estratégia adotada** consiste em:

1. Fazer múltiplos downloads concorrentes usando computação assíncrona;
2. Descompactar e organizar os arquivos localmente;
3. Usar o pyspark para ler e guardar os dados no banco de dados.

In [3]:
FTP_SERVER = "ftp.dadosabertos.ans.gov.br"
ROOT_FOLDER_SRC = "/FTP/PDA/TISS/HOSPITALAR/2019/"

ftp = FTP(FTP_SERVER)
ftp.login()
remote_paths = ftp.nlst(ROOT_FOLDER_SRC)

print(remote_paths[:3])

['/FTP/PDA/TISS/HOSPITALAR/2019/AC', '/FTP/PDA/TISS/HOSPITALAR/2019/AL', '/FTP/PDA/TISS/HOSPITALAR/2019/AM']


Já sabemos que os arquivos no servidor possuem nomes regulares, eles possuem sufixo "DET" nos arquivos com a tabela detalhada, e "CONS" nas tabelas consolidadas. Sabemos também que todos os arquivos estão compactados com extensão `.zip`.

Para superar estes dois desafios, vamos criar uma função que identifica o tipo de tabela pelo nome do arquivo e extrai os conteúdos em um diretório em comum, de modo que todas as tabelas consolidadas estejam armazenadas em uma pasta, e todas as tabelas detalhadas estejam armazenadas em uma outra pasta.

In [4]:
def extract_and_organize(search_dir: str, find_patterns=["CONS", "DET"]):
    """
    Extrai arquivos `.zip` em pastas, procurando por sequências de strings em
    seu nome para definir onde o conteúdo de cada arquivo será extraído. Se um
    arquivo contém mais de uma das sequências, ele será extraído em mais de uma
    pasta, se um arquivo não contém nenhuma das sequências, ele não será extraído.

    ### Parâmetros:

    - search_dir (`str`):
        Caminho para a pasta com os arquivos.

    - find_patterns (`Iterable[str]`):
        Uma lista com as sequências que serão procuradas, não é _case sensitive_.
    """
    def filter_by_pattern(pattern, elements):
        """
        Retorna uma lista que contém os nomes dos arquivos que se ajustam à uma
        determinada sequência.

        ### Parâmetros:

        - pattern (`str`):
            Sequência que será procurada nos itens de `elements`.
        
        - elements (`Iterable[str]`):
            Lista com itens que serão filtrados sequência `pattern`.

        ### Retorna:

        (`Iterable[str]`) com os itens que contém a sequência.
        """
        matches = re.compile(pattern, re.IGNORECASE)
        return list(filter(matches.search, elements))

    filepaths = [
        f for f in listdir(search_dir) 
        if path.isfile(path.join(search_dir, f))
    ]
    listings = {i:filter_by_pattern(i, filepaths) for i in find_patterns}

    for pattern in find_patterns:
        destination = path.join(search_dir, pattern)

        if not path.exists(destination):
            makedirs(destination)

        for f in listings[pattern]:
            origin = path.join(search_dir, f)
            unpack_archive(origin, destination, "zip")

A próxima etapa vai envolver o uso de um pacote de minha autoria `ftp_download`, para saber mais sobre o projeto, visite [https://pypi.org/project/ftp-download/](https://pypi.org/project/ftp-download/). A rotina na célula abaixo segue três etapas:

1. **Download** dos arquivos para o ambiente local com `ftp_download`
2. **Descompactação e organização** dos arquivos baixados em pastas usando a rotina desenvolvida na célula anterior
3. **Upload dos arquivos** da unidade local para a base de dados, que será acessada pelo PySpark

In [5]:
# 1
# exibir log apenas com avisos e erros
# se todos os downloads forem um sucesso, veremos apenas
# "=== Concluído! ===" no stdout
ftpd.Conf.verbose = False
ftpd.timings.log.handler.setLevel(logging.WARNING)
ftpd.timings.log.logger.setLevel(logging.WARNING)

for i, rp in enumerate(remote_paths):
    print(f"Progresso: {i/len(remote_paths)*100:.2f}%", end="\r")
    ftpd.from_folder(ftp, remote_path=rp)
print("=== Concluído! ===")

with open(ftpd.prefs.LOG_FILE) as logfile:
    logfile_contents = logfile.read()
    print(logfile_contents)

=== Concluído! ===



In [12]:
# 2
download_place = ftpd.Conf.download_folder
extract_and_organize(search_dir=download_place)

# 3...

# Preparando a rotina de armazenamento

Agora que já temos os dados prontos para manipulação, podemos usar o `pyspark` para inserir os dados em nosso banco de dados relacional da Oracle. Para isto, vamos usar usar o [driver JDBC da Oracle](https://www.oracle.com/br/database/technologies/appdev/jdbc-downloads.html). *Esta etapa vai falhar se o driver __não__ estiver instalado no cluster atual*, para efetuar a instalação, seguimos os passos indicados [neste vídeo](https://youtu.be/3tAVXfIBqA8?si=jNeO459775ag9x44&t=261).

Vamos começar definindo um `schema` para todos os nomes de colunas possíveis. Para decidir qual o *data type* ideal para cada coluna, usamos o [dicionário de dados](https://dadosabertos.ans.gov.br/FTP/PDA/TISS/DICIONARIO/Dicionario_de_variaveis.ods) fornecido pela ANS.

In [9]:
CONS_TYPES = StructType([
    StructField("ID_EVENTO_ATENCAO_SAUDE", IntegerType(), False),
    StructField("ID_PLANO", IntegerType(), True),
    StructField("FAIXA_ETARIA", StringType(), True),
    StructField("SEXO", StringType(), True),
    StructField("CD_MUNICIPIO_BENEFICIARIO", StringType(), True),
    StructField("PORTE", StringType(), True),
    StructField("CD_MODALIDADE", IntegerType(), True),
    StructField("NM_MODALIDADE", StringType(), True),
    StructField("CD_MUNICIPIO_PRESTADOR", StringType(), True),
    StructField("UF_PRESTADOR", StringType(), True),
    StructField("TEMPO_DE_PERMANENCIA", IntegerType(), True),
    StructField("ANO_MES_EVENTO", StringType(), True),
    StructField("CD_CARATER_ATENDIMENTO", StringType(), True),
    StructField("CD_TIPO_INTERNACAO", StringType(), True),
    StructField("CD_REGIME_INTERNACAO", StringType(), True),
    StructField("CD_MOTIVO_SAIDA", StringType(), True),
    StructField("CID_1", StringType(), True),
    StructField("CID_2", StringType(), True),
    StructField("CID_3", StringType(), True),
    StructField("CID_4", StringType(), True),
    StructField("QT_DIARIA_ACOMPANHANTE", IntegerType(), True),
    StructField("QT_DIARIA_UTI", IntegerType(), True),
    StructField("IND_ACIDENTE_DOENCA", StringType(), True),
    StructField("LG_VALOR_PREESTABELECIDO", IntegerType(), True),
])

DET_TYPES = StructType([
    StructField("ID_EVENTO_ATENCAO_SAUDE", IntegerType(), False),
    StructField("UF_PRESTADOR", StringType(), True),
    StructField("TEMPO_DE_PERMANENCIA", IntegerType(), True),
    StructField("ANO_MES_EVENTO", StringType(), True),
    StructField("CD_PROCEDIMENTO", StringType(), True),
    StructField("CD_TABELA_REFERENCIA", StringType(), True),
    StructField("QT_ITEM_EVENTO_INFORMADO", IntegerType(), True),
    StructField("VL_ITEM_EVENTO_INFORMADO", FloatType(), True),
    StructField("VL_ITEM_PAGO_FORNECEDOR", FloatType(), True),
    StructField("IND_PACOTE", IntegerType(), True),
    StructField("IND_TABELA_PROPRIA", IntegerType(), True)
])



### Variáveis de conexão com o banco de dados
Agora vamos obter as variáveis que vão nos ajudar a conectar à *"autonomous database"* que já temos alocada na Oracle.

- A `TNS_STR` é providenciada pela OCI como uma opção de conexão ao banco de dados;
- `DRIVER` é o driver de conexão que o `PySpark` vai usar para conectar com o banco de dados da Oracle;
- Vamos usar uma string de conexão `URL`, que usa informações contidas na `TNS_STR`, além do usuário `USR` e senha ` PWD` 

In [10]:
drivername = "oracle.jdbc.OracleDriver"
WALLET_PLACE = "/home/user/Downloads/wallet_demodb"
URL = f"jdbc:oracle:thin:@demodb_medium?TNS_ADMIN={WALLET_PLACE}"
USR = "ADMIN"
PWD = getpass("Insira a senha de administrador do banco de dados: ")

Insira a senha de administrador do banco de dados:  ········


Agora que temos o `schema` prontos, podemos usá-los para ler os dados que obtemos interpretanto os tipos corretamente:

In [20]:
cons_df = spark.read.format("csv")\
    .option("header", True)\
    .option("delimiter", ";")\
    .schema(CONS_TYPES)\
    .csv(f"{download_place}/CONS/")

det_df = spark.read.format("csv")\
    .option("header", True)\
    .option("delimiter", ";")\
    .schema(DET_TYPES)\
    .csv(f"{download_place}/DET/")

A última parte desta etapa é usar as **variáveis de conexão** para enviar todos os dados para a nossa Data Warehouse: 

In [22]:
tablename = "HOSP_CONS"

cons_df.write.format("jdbc")\
    .option("driver", drivername)\
    .option("dbtable", tablename )\
    .option("url", URL)\
    .option("user", USR)\
    .option("password", PWD)\
    .save()

                                                                                

In [24]:
tablename = "HOSP_DET"

det_df.write.format("jdbc")\
    .option("driver", drivername)\
    .option("dbtable", tablename )\
    .option("url", URL)\
    .option("user", USR)\
    .option("password", PWD)\
    .save()

                                                                                