## Baixa conteudo da web e salva em disco

In [10]:
## Baixa conteudo da web e salva em disco
import requests
import os

def create_directory_if_not_exists(directory_path):
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)

def get_file_name(url:str, response:requests.Response) -> str:
        file_name = 'download'
        if 'Content-Disposition' in response.headers:
            content_disposition = response.headers.get('Content-Disposition')
            file_name = content_disposition.split('filename=')[1].strip('"')
        else:
            file_name = url.split("/")[-1]
        return file_name

def generate_available_filename(directory, filename):

    name, extension = os.path.splitext(filename)
    file_path = os.path.join(directory, filename)
    counter = 1

    while os.path.exists(file_path):
        new_filename = f"{name} ({counter}){extension}"
        file_path = os.path.join(directory, new_filename)
        counter += 1

    return file_path

def download(url:str, directory:str=os.getcwd()) -> str:
    try:
        response = requests.get(url)
        response.raise_for_status()
        file_name = generate_available_filename(directory, get_file_name(url, response))
        create_directory_if_not_exists(directory)
        with open(generate_available_filename(directory, get_file_name(url, response)), "wb") as file:
            file.write(response.content)

        return file_name

    except Exception as e:
        print(f"Ocorreu um erro: {e}")
        raise e

## File Utils

In [11]:
import os
import zipfile

def unzip(zip_path, dataset_unzip_directory, file_to_extract) -> str:
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        return zip_ref.extract(file_to_extract, dataset_unzip_directory)

def remove_file(caminho_arquivo):
    if os.path.exists(caminho_arquivo):
        os.remove(caminho_arquivo)
    else:
        print(f"Arquivo '{caminho_arquivo}' não encontrado.")

def move_file(origem, destino):
    os.replace(origem, destino)

def remove_directory(directory):
    if os.path.exists(directory):
        os.rmdir(directory)

## PANDAS Utils

In [12]:
import pandas as pd

def convert_csv_to_parquet(csv_path):
    """Função para converter CSV em Parquet."""
    try:
        df = pd.read_csv(csv_path, sep=';', encoding='latin1', low_memory=False)
        parquet_path = csv_path.replace(".csv", ".parquet")
        df.to_parquet(parquet_path, index=False)
        return parquet_path
    except Exception as e:
        print(f"Erro ao converter {csv_path} para Parquet: {e}")
        return None

In [13]:
# dataset_directory = os.path.join(os.getcwd(), "dataset")

# #Resultados
# download("https://cdn.tse.jus.br/estatistica/sead/eleicoes/eleicoes2022/Historico_Totalizacao_Presidente_BR_1T_2022.zip", dataset_directory)

# #Canditados
# download("https://cdn.tse.jus.br/estatistica/sead/odsele/consulta_cand/consulta_cand_2022.zip", dataset_directory)

## Prepara datasets para serem ingeridos pelo notebook

In [14]:
import os

dataset_directory = os.path.join(os.getcwd(), "dataset/2022")
dataset_unzip_directory= os.path.join(dataset_directory, "unzip")


## DataSet Principal (Candidatos)
URL = 'https://cdn.tse.jus.br/estatistica/sead/odsele/consulta_cand/consulta_cand_2022.zip'

zip_path = download(URL, dataset_directory)
file_to_extract = "consulta_cand_2022_BRASIL.csv"
file_extracted = unzip(zip_path, dataset_unzip_directory, file_to_extract)
remove_file(zip_path)
parquet_file = convert_csv_to_parquet(file_extracted)
if parquet_file:
    remove_file(file_extracted)
move_file(parquet_file, os.path.join(dataset_directory, "candidatos.parquet"))

## DataSet Complementar (Informacoes Complementares)
URL = 'https://cdn.tse.jus.br/estatistica/sead/odsele/consulta_cand_complementar/consulta_cand_complementar_2022.zip'

zip_path = download(URL, dataset_directory)
file_to_extract = "consulta_cand_complementar_2022_BRASIL.csv"
file_extracted = unzip(zip_path, dataset_unzip_directory, file_to_extract)
remove_file(zip_path)
parquet_file = convert_csv_to_parquet(file_extracted)
if parquet_file:
    remove_file(file_extracted)
move_file(parquet_file, os.path.join(dataset_directory, "candidatos.infos_adicionais.parquet"))

## DataSet Bens dos Candidatos
URL = 'https://cdn.tse.jus.br/estatistica/sead/odsele/bem_candidato/bem_candidato_2022.zip'

zip_path = download(URL, dataset_directory)
file_to_extract = "bem_candidato_2022_BRASIL.csv"
file_extracted = unzip(zip_path, dataset_unzip_directory, file_to_extract)
remove_file(zip_path)
parquet_file = convert_csv_to_parquet(file_extracted)
if parquet_file:
    remove_file(file_extracted)
move_file(parquet_file, os.path.join(dataset_directory, "candidatos.bens.parquet"))


## DataSet Prestação de Contas
URL = 'https://cdn.tse.jus.br/estatistica/sead/odsele/prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2022.zip'
zip_path = download(URL, dataset_directory)

## Despesas Contratadas
file_to_extract = "despesas_contratadas_candidatos_2022_BRASIL.csv"
file_extracted = unzip(zip_path, dataset_unzip_directory, file_to_extract)
parquet_file = convert_csv_to_parquet(file_extracted)
if parquet_file:
    remove_file(file_extracted)
move_file(parquet_file, os.path.join(dataset_directory, "candidatos.despesas.contratadas.parquet"))

## Despesas Contratadas Pagas
file_to_extract = "despesas_pagas_candidatos_2022_BRASIL.csv"
file_extracted = unzip(zip_path, dataset_unzip_directory, file_to_extract)
parquet_file = convert_csv_to_parquet(file_extracted)
if parquet_file:
    remove_file(file_extracted)
move_file(parquet_file, os.path.join(dataset_directory, "candidatos.despesas.pagas.parquet"))

## Receitas
file_to_extract = "receitas_candidatos_2022_BRASIL.csv"
file_extracted = unzip(zip_path, dataset_unzip_directory, file_to_extract)
parquet_file = convert_csv_to_parquet(file_extracted)
if parquet_file:
    remove_file(file_extracted)
move_file(parquet_file, os.path.join(dataset_directory, "candidatos.receitas.parquet"))

## Receitas
file_to_extract = "receitas_candidatos_doador_originario_2022_BRASIL.csv"
file_extracted = unzip(zip_path, dataset_unzip_directory, file_to_extract)
parquet_file = convert_csv_to_parquet(file_extracted)
if parquet_file:
    remove_file(file_extracted)
move_file(parquet_file, os.path.join(dataset_directory, "candidatos.receitas.doador_originario.parquet"))

remove_file(zip_path)

remove_directory(dataset_unzip_directory)


In [15]:
import os

dataset_directory = os.path.join(os.getcwd(), "dataset/2022")
dataset_unzip_directory= os.path.join(dataset_directory, "unzip")

## DataSet Principal (Candidatos)
URL = 'https://cdn.tse.jus.br/estatistica/sead/odsele/votacao_candidato_munzona/votacao_candidato_munzona_2022.zip'

zip_path = download(URL, dataset_directory)
file_to_extract = "votacao_candidato_munzona_2022_BRASIL.csv"
file_extracted = unzip(zip_path, dataset_unzip_directory, file_to_extract)
remove_file(zip_path)
parquet_file = convert_csv_to_parquet(file_extracted)
if parquet_file:
    remove_file(file_extracted)
move_file(parquet_file, os.path.join(dataset_directory, "resultados.votacao.canditados.parquet"))

remove_directory(dataset_unzip_directory)

In [16]:

## Ver se deixo dps
def extract_files(zip_path, dataset_unzip_directory, files_to_extract):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        extracted_files = []
        for file in files_to_extract:
            extracted_files.append(zip_ref.extract(file, dataset_unzip_directory))
        return extracted_files