In [2]:
# importing required modules
from zipfile import ZipFile     # to unzip files
import os                       # to add OS directories
from urllib import request      # to download web files
import csv                      # to read/write csv files
import shutil                   # to remove OS directories

In [None]:
# Defining data files to upload {file description, [file name, download link]}
data_files = {
    "sib_ativos": ["sib_ativos.zip", "https://dadosabertos.ans.gov.br/FTP/PDA/dados_de_beneficiarios_por_operadora/sib_ativos.zip"],
    "benef_regiao_geog": ["benef_regiao_geog.zip", "https://dadosabertos.ans.gov.br/FTP/PDA/dados_de_beneficiarios_por_regiao_geografica/benef_regiao_geog.zip"]
}

# Create Data directorie
if "data" not in os.listdir():
    os.mkdir("data")

def download_data_files(data_files):
    print("Downloading data files...")
    
    for file in data_files.keys():
        print("Downloading: ", file)
        # Define the local filename to save data
        local_file = "data/" + data_files[file][0]        

        # Define the remote file to retrieve
        remote_url = data_files[file][1]
        
        # Download remote and save locally
        request.urlretrieve(remote_url, local_file)
        print (file, "sucess !!")

def unzip_data_files(data_files):
    for file in data_files:
        
        # specifying the zip file name
        file_name = "data/" + data_files[file][0] 
        
        # creating unzip directorie
        dir_name = "data" + os.sep + file
        
        if dir_name in os.listdir():
            os.mkdir(dir_name)

        # specifyng file path
        file_path = file + os.sep

        # opening the zip file in READ mode
        with ZipFile(file_name, 'r') as zip:
            # printing all the contents of the zip file
            zip.printdir()
        
            # extracting all the files
            print('Extracting all the files now...')
            zip.extractall(file_path)
            print('Done!')

def extract_sib_features(filepath, new_file_name):
    print("Removing unnecessary features from: ", new_file_name)
 
    # specifyng file path
    dir_name = "data" + os.sep + "sib_ativos"

    # creating a new folder to saving modified csv files
    if 'sib_ativos' not in os.listdir("data" + os.sep):
        os.mkdir(dir_name)
    
    with open(filepath, newline='', encoding='cp1252') as csvfile:
        
        spamreader = csv.reader(csvfile, delimiter=' ', quotechar='|')
        
        # open file to write main data features
        csvfile = open(new_file_name, 'w', newline='', encoding='cp1252')

        for row in spamreader:
            row_values = str(row).split(";")
            nrow = (row_values[1].replace('"', ''),
                    row_values[6].replace('"', ''), 
                    row_values[7].replace('"', ''),
                    row_values[8].replace('"', ''),
                    row_values[13].replace('"', ''),
                    row_values[14].replace('"', ''),
                    row_values[18].replace('"', ''),
                    row_values[19].replace('"', ''),
                    row_values[23].replace('"', ''),
                    row_values[25].replace('"', '')
                )

            spamwriter = csv.writer(csvfile, delimiter=';',
                                    quotechar=' ', quoting=csv.QUOTE_MINIMAL)
            spamwriter.writerow(nrow)
        
        csvfile.close()
    print(new_file_name, " done!!")

if __name__ =="__main__":
    download_data_files(data_files)
    unzip_data_files(data_files)
    
    for file in os.listdir('sib_ativos'):
        filepath = "sib_ativos" + os.sep + file
        new_file_name = "data" + os.sep + "sib_ativos" + os.sep + file
        extract_sib_features(filepath, new_file_name)
        print("Unnecessary Features were removed!")
    
    shutil.rmtree("sib_ativos")
    shutil.move("benef_regiao_geog", "data" + os.sep + "benef_regiao_geog")

---
### Identificando a posição das features de interesse

In [None]:
# Colunas de interesse
cols = ['LG_BENEFICIARIO_ATIVO', 'DT_NASCIMENTO', 'TP_SEXO',    
       'DT_CONTRATACAO', 'ID_BENE_TIPO_DEPENDENTE', 'SG_UF',
       'LG_RESIDE_EXTERIOR', 'DT_CANCELAMENTO', 'CD_BENE_MOTIV_CANCELAMENTO']

# Todos os cabeçalhos do dataset
header = ["ID_TEMPO_COMPETENCIA", "CD_OPERADORA", "DT_INCLUSAO", "CD_BENE_MOTV_INCLUSAO", "IND_PORTABILIDADE", "ID_MOTIVO_MOVIMENTO", "LG_BENEFICIARIO_ATIVO", 
"DT_NASCIMENTO", "TP_SEXO", "CD_PLANO_RPS", "CD_PLANO_SCPA", "NR_PLANO_PORTABILIDADE", "DT_PRIMEIRA_CONTRATACAO", "DT_CONTRATACAO", "ID_BENE_TIPO_DEPENDENTE", 
"LG_COBERTURA_PARCIAL", "LG_ITEM_EXCLUIDO_COBERTURA", "CD_MUNICIPIO", "SG_UF", "LG_RESIDE_EXTERIOR", "DT_REATIVACAO", "DT_ULTIMA_REATIVACAO", 
"DT_ULTIMA_MUDA_CONTRATUAL", "DT_CANCELAMENTO", "DT_ULTIMO_CANCELAMENTO", "CD_BENE_MOTIV_CANCELAMENTO", "DT_CARGA"]

index = []

for i in cols:
       index.append(header.index(i))
print ("Os indices desejados são:", index)

# Testando a extração das features de interesse

text = '"202211";"334189";"2009-02";11;"NAO";74;1;"2006";"M";;;;"2008-09";"2008-09";1;0;0;"140010";"RR";0;;;;;;;"2023-01"'
lista = text.split(";")

nrow = (lista[6], lista[7], lista[8], lista[13], lista[14], lista[18], lista[19], lista[23], lista[25])
nrow

### Função para remover features indesejadas sem ter que unzip todos os arquivos
> em desenvolvimento

In [None]:
# specifying the zip file name
zipfile_name = "data/sib_ativos.zip"

# opening the zip file in READ mode
with ZipFile(zipfile_name, 'r') as zip:
    # printing all the contents of the zip file
    files = zip.namelist()
    
    for file in files:
        with zip.open(file) as myfile:
            # print(myfile.name)
            filepath = "data/" + str(myfile.name)
            extract_features(filepath, file)

### Tentando unir os Dataframes em um único arquivo .csv
> em desenvolvimento

In [None]:
files = ['sib_202211_AC.csv', 'sib_202211_AL.csv', 'sib_202211_AM.csv', 'sib_202211_AP.csv', 'sib_202211_BA.csv', 'sib_202211_CE.csv', 'sib_202211_DF.csv', 'sib_202211_ES.csv', 'sib_202211_GO.csv', 'sib_202211_MA.csv', 'sib_202211_MG.csv', 'sib_202211_MS.csv', 'sib_202211_MT.csv', 'sib_202211_PA.csv', 'sib_202211_PB.csv', 'sib_202211_PE.csv', 'sib_202211_PI.csv', 'sib_202211_PR.csv', 'sib_202211_RJ.csv', 'sib_202211_RN.csv', 'sib_202211_RO.csv', 'sib_202211_RR.csv', 'sib_202211_RS.csv', 'sib_202211_SC.csv', 'sib_202211_SE.csv', 'sib_202211_SP.csv', 'sib_202211_TO.csv']
cols = ['LG_BENEFICIARIO_ATIVO', 'DT_NASCIMENTO', 'TP_SEXO',    
       'DT_CONTRATACAO', 'ID_BENE_TIPO_DEPENDENTE', 'CD_MUNICIPIO', 'SG_UF',
       'LG_RESIDE_EXTERIOR', 'DT_CANCELAMENTO', 'CD_BENE_MOTIV_CANCELAMENTO']

datatypes = {"ID_BENE_TIPO_DEPENDENTE": str, "CD_BENE_MOTIV_CANCELAMENTO": str}

# DF base
df = pd.DataFrame(columns= cols)

for file in files:
    df_ = pd.read_csv(file, sep=';', encoding = 'cp1252', usecols=cols, dtype=datatypes)
    df = pd.concat([df, df_], ignore_index=True)
    df_.to_csv("sib_202211.csv", sep=";")