_Recomendo utilizar de Drive para guarda os dados baixados e particionados._


# Variaveis de caminho


In [None]:
year = 2015
link_download = f"https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SISVAN/estado_nutricional/sisvan_estado_nutricional_{year}.zip"
path_source_csv = f"/content/input/{year}/sisvan_estado_nutricional_{year}.csv"
path_partitioned_folder = f"/content/output/ano={year}"

# Importaçoes e funçoes


In [None]:
import csv
import glob
import io
import os
import re
from zipfile import ZipFile

import pandas as pd
import requests


def get_year(link: str) -> str:
    year = re.search("[0-9]{4}", link)
    return year.group() if year else "Falha"


def download_extract(link: str) -> None:
    r = requests.get(link)
    year = get_year(link)
    z = ZipFile(io.BytesIO(r.content))
    path = os.path.join(os.getcwd(), "input", year)
    z.extractall(path)


def create_folders(name_folders: str) -> None:
    try:
        os.makedirs(name_folders)

    except FileExistsError:
        pass


def extrair(path, path_output) -> None:
    # loading the temp.zip and creating a zip object
    with ZipFile(path, "r") as zObject:
        # Extracting all the members of the zip
        # into a specific location.
        zObject.extractall(path=path_output)


def csv_manager(path: str, row: dict, mode: str) -> None:
    with open(path, mode, newline="") as file:
        # Create a CSV writer object
        writer = csv.writer(file)

        # Write the field names
        if mode == "w":
            writer.writerow(row.keys())
        writer.writerow(row.values())


def partition_dataset(path_source_dataset: str) -> None:
    with open(path_source_dataset, "r", encoding="iso-8859-1") as file:
        reader = csv.DictReader(file, delimiter=";")
        year = get_year(path_source_dataset)
        for row in reader:
            data = row["DT_ACOMPANHAMENTO"]
            mes = data.split("/")[1]
            path_output_csv = os.path.join(
                os.getcwd(),
                "output",
                f"ano={year}",
                f"mes={mes}",
                f"sigla_uf={row['SG_UF']}",
            )
            file_csv = os.path.join(
                path_output_csv,
                f"sisvan_nutricional_{row['SG_UF']}_{mes}_{year}.csv",
            )

            if os.path.exists(file_csv):
                csv_manager(file_csv, row, "a")
            else:
                create_folders(path_output_csv)
                csv_manager(file_csv, row, "w")


def verification_dataset_sum(sum_verify: int, row_sum_by_model: int) -> None:
    if sum_verify == row_sum_by_model:
        print(
            f"Processamento feito com sucesso!\nValidação:\n"
            f"Soma linhas {sum_verify} | Soma verificadora por meses {row_sum_by_model}"
        )
    else:
        print(
            f"Processamento falho!\nValidação:\n"
            f"Soma linhas {sum_verify} | Soma verificadora por meses {row_sum_by_model}"
        )
        raise ValueError("Somas verificadoras não batem")

# Download e Particionar


## Download


In [None]:
download_extract(link_download)

## Particionar Dataset


In [None]:
partition_dataset(path_source_csv)

## Validar Particionamento


In [None]:
ufs = [
    "SP",
    "RS",
    "CE",
    "MG",
    "PE",
    "MA",
    "RJ",
    "PI",
    "AL",
    "PR",
    "BA",
    "SC",
    "RR",
    "RN",
    "MT",
    "MS",
    "AM",
    "DF",
    "PA",
    "SE",
    "AP",
    "TO",
    "ES",
    "GO",
    "PB",
    "RO",
    "AC",
]

model_meses = {n: 0 for n in range(1, 13)}

modelo_analise_particionado = {uf: model_meses.copy() for uf in ufs}

### Criar modelo de verificação Source Dataset


In [None]:
modelo_analise = {uf: model_meses.copy() for uf in ufs}

# Specify the chunk size
chunksize = 3000000

# Open the CSV file in chunks
row_sum_source_dataset = sum_verify_source_dataset = 0

for chunk in pd.read_csv(
    path_source_csv, chunksize=chunksize, encoding="iso-8859-1", sep=";"
):
    # Process each chunk here
    chunk["DT_ACOMPANHAMENTO"] = pd.to_datetime(
        chunk["DT_ACOMPANHAMENTO"], format="%d/%m/%Y"
    )
    chunk["ano"] = chunk["DT_ACOMPANHAMENTO"].dt.year
    chunk["mes"] = chunk["DT_ACOMPANHAMENTO"].dt.month

    for uf, meses in modelo_analise.items():
        for mes in meses.keys():
            modelo_analise[uf][mes] += len(
                chunk[(chunk.SG_UF == uf) & (chunk.mes == mes)]
            )
    row_sum_source_dataset += len(chunk)

for key, meses in modelo_analise.items():
    total = sum(list(meses.values()))
    sum_verify_source_dataset += total

verification_dataset_sum(row_sum_source_dataset, sum_verify_source_dataset)

Processamento feito com sucesso!
Validação:
Soma linhas 50544072 | Soma verificadora por meses 50544072


### Criar modelo de verificação Partição


In [None]:
modelo_analise_particionado = {uf: model_meses.copy() for uf in ufs}

row_sum_partition = sum_verify_partition = 0

# Specify the path to the directory
path = f"{path_partitioned_folder}/**/*.csv"

# Use glob() to find all CSV files
csv_files = glob.glob(path, recursive=True)

for csv in csv_files:  # noqa: F402
    fragment = csv.split("_")
    mes = int(fragment[4])
    uf = fragment[3]
    rows_part = len(pd.read_csv(csv))
    modelo_analise_particionado[uf][mes] += rows_part
    row_sum_partition += rows_part

for key, meses in modelo_analise_particionado.items():
    total = sum(list(meses.values()))
    sum_verify_partition += total

verification_dataset_sum(row_sum_partition, sum_verify_partition)

### Comparar Modelos de verificação


In [None]:
erros = False
for key, meses in modelo_analise_particionado.items():
    for mes in range(1, 13):
        total_particionado = modelo_analise_particionado[key][mes]
        total_raw = modelo_analise[key][mes]
        if total_particionado != total_raw:
            print(
                f"Diferença em {key} no mês {mes}\nParticionado: {total_particionado} Puro: {total_raw}"
            )
            erros = True

if erros:
    print("Erro!\nNão pronto para upload. Houve diferenças em algum mês")
    raise ValueError("Verifique as diferenças entre os modelos")
else:
    print("Pasta Output pronto para upload!")

Pasta Output pronto para upload!
