In [None]:
!pip install pysus

In [None]:
# Carregamento das bases
from pysus.ftp.databases.sinan import SINAN
sinan = SINAN().load()
files = sinan.get_files(dis_code="CHIK", year= range(2015, 2025))
sinan.download(files, local_dir="/content/basedosdados/br_ms_sinan/")

## Função de particionamento

In [None]:
import pandas as pd
import os
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)

import logging
import re
from datetime import datetime
from os import getenv, walk
from os.path import join
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from uuid import uuid4
def to_partitions(
    data: pd.DataFrame,
    partition_columns: List[str],
    savepath: str,
    file_type: str = "csv",
):
    """Save data in to hive patitions schema, given a dataframe and a list of partition columns.
    Args:
        data (pandas.core.frame.DataFrame): Dataframe to be partitioned.
        partition_columns (list): List of columns to be used as partitions.
        savepath (str, pathlib.PosixPath): folder path to save the partitions.
        file_type (str): default to csv. Accepts parquet.
    Exemple:
        data = {
            "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025],
            "mes": [1, 2, 3, 4, 5, 6, 6,9],
            "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"],
            "dado": ["a", "b", "c", "d", "e", "f", "g",'h'],
        }
        to_partitions(
            data=pd.DataFrame(data),
            partition_columns=['ano','mes','sigla_uf'],
            savepath='partitions/',
        )
    """

    if isinstance(data, (pd.core.frame.DataFrame)):
        savepath = Path(savepath)
        # create unique combinations between partition columns
        unique_combinations = (
            data[partition_columns]
            # .astype(str)
            .drop_duplicates(subset=partition_columns).to_dict(orient="records")
        )

        for filter_combination in unique_combinations:
            patitions_values = [
                f"{partition}={value}"
                for partition, value in filter_combination.items()
            ]

            # get filtered data
            df_filter = data.loc[
                data[filter_combination.keys()]
                .isin(filter_combination.values())
                .all(axis=1),
                :,
            ]
            df_filter = df_filter.drop(columns=partition_columns)

            # create folder tree
            filter_save_path = Path(savepath / "/".join(patitions_values))
            filter_save_path.mkdir(parents=True, exist_ok=True)

            if file_type == "csv":
                # append data to csv
                file_filter_save_path = Path(filter_save_path) / "data.csv"
                df_filter.to_csv(
                    file_filter_save_path,
                    sep=",",
                    encoding="utf-8",
                    na_rep="",
                    index=False,
                    mode="a",
                    header=not file_filter_save_path.exists(),
                )
            elif file_type == "parquet":
                # append data to parquet
                file_filter_save_path = Path(filter_save_path) / "data.parquet"
                if file_filter_save_path.exists():
                    existing_data = pd.read_parquet(file_filter_save_path)
                    df_filter = pd.concat([existing_data, df_filter], ignore_index=True)
                df_filter.to_parquet(
                    file_filter_save_path, index=False, compression="gzip"
                )
    else:
        raise BaseException("Data need to be a pandas DataFrame")

## Particionar todos os arquivos

In [None]:
for anos in range(15, 25):
    ano_str = str(anos).zfill(2)
    path = f'/content/basedosdados/br_ms_sinan/CHIKBR{ano_str}.parquet'

    df0 = pd.read_parquet(path)
    df0 = df0.astype(str)
    df0['ano'] = '20' + str(path[40:43])
    print(path[40:43])

    lista = [
    'ano',
    'TP_NOT',
    'ID_AGRAVO',
    'DT_NOTIFIC',
    'SEM_NOT',
    'SG_UF_NOT',
    'ID_REGIONA',
    'ID_MUNICIP',
    'ID_UNIDADE',
    'DT_SIN_PRI',
    'SEM_PRI',
    'ID_PAIS',
    'SG_UF',
    'ID_RG_RESI',
    'ID_MN_RESI',
    'ANO_NASC',
    'NU_IDADE_N',
    'CS_SEXO',
    'CS_RACA',
    'CS_ESCOL_N',
    'ID_OCUPA_N',
    'CS_GESTANT',
    'AUTO_IMUNE',
    'DIABETES',
    'HEMATOLOG',
    'HEPATOPAT',
    'RENAL',
    'HIPERTENSA',
    'ACIDO_PEPT',
    'DT_INVEST',
    'FEBRE',
    'CEFALEIA',
    'EXANTEMA',
    'DOR_COSTAS',
    'MIALGIA',
    'VOMITO',
    'NAUSEA',
    'CONJUNTVIT',
    'DOR_RETRO',
    'ARTRALGIA',
    'ARTRITE',
    'LEUCOPENIA',
    'EPISTAXE',
    'PETEQUIA_N',
    'GENGIVO',
    'METRO',
    'HEMATURA',
    'SANGRAM',
    'COMPLICA',
    'LACO',
    'HOSPITALIZ',
    'DT_INTERNA',
    'UF',
    'MUNICIPIO',
    'ALRM_HIPOT',
    'ALRM_PLAQ',
    'ALRM_VOM',
    'ALRM_SANG',
    'ALRM_HEMAT',
    'ALRM_ABDOM',
    'ALRM_LETAR',
    'ALRM_HEPAT',
    'ALRM_LIQ',
    'DT_ALRM',
    'GRAV_PULSO',
    'GRAV_CONV',
    'GRAV_ENCH',
    'GRAV_INSUF',
    'GRAV_TAQUI',
    'GRAV_EXTRE',
    'GRAV_HIPOT',
    'GRAV_HEMAT',
    'GRAV_MELEN',
    'GRAV_METRO',
    'GRAV_SANG',
    'GRAV_AST',
    'GRAV_MIOC',
    'GRAV_CONSC',
    'GRAV_ORGAO',
    'PLAQ_MENOR',
    'DT_CHIK_S1',
    'RES_CHIKS1',
    'RES_CHIKS2',
    'RESUL_PRNT',
    'DT_NS1',
    'RESUL_NS1',
    'DT_VIRAL',
    'RESUL_VI_N',
    'DT_PCR',
    'RESUL_PCR_',
    'RESUL_SORO',
    'DT_SORO',
    'SOROTIPO',
    'HISTOPA_N',
    'IMUNOH_N',
    'MANI_HEMOR',
    'CLASSI_FIN',
    'CRITERIO',
    'CON_FHD',
    'TPAUTOCTO',
    'COPAISINF',
    'COUFINF',
    'COMUNINF',
    'DOENCA_TRA',
    'CLINC_CHIK',
    'EVOLUCAO',
    'DT_OBITO',
    'DT_ENCERRA',
    'TP_SISTEMA',
    'DT_DIGITA',
    'DT_CHIK_S2',
    'DT_PRNT',
    'DT_GRAV'
        ]
    for x in lista:
        if x not in df0.columns:
            df0[x] = None

    df0 = df0[lista]

    df0.rename(columns={
        'NU_ANO' : 'ano',
        'SG_UF_NOT' : 'sigla_uf_notificacao'
    }, inplace=True)

    df0 = df0.loc[:,~df0.columns.duplicated()]
    print(f'Particionando: {ano_str}...')
    to_partitions(
        data=df0,
        partition_columns=['ano'],
        savepath= "/content/basedosdados/br_ms_sinan/partitions/",
        file_type= 'parquet',
    )
    del df0
    print(f'Terminou de particionar: {ano_str}...')