In [None]:
# Pacotes
# Bibliotecas de Modelagem de Dados e Análises
import pandas as pd
import numpy as np
import pandas_datareader.data as web


# Pacote para requisições HTTP
import requests
import os
from urllib.parse import urljoin

# Pacote para extrair arquivos.zip
from zipfile import ZipFile

# Manipular caminhos de arquivo e diretório de forma mais eficaz
from pathlib import Path

# Pacote para leitura e escrita de arquivos csv
import csv

# Pacote para trabalhar com dados HTML e XML
from bs4 import BeautifulSoup

# Analises gráficas
import matplotlib.pyplot as plt

# Data
from datetime import datetime

# SQL
import sqlite3

# Avisos
import warnings
warnings.filterwarnings('ignore')

In [None]:
# URL das páginas que contém os links para os arquivos.zip de receitas e despesas  
url_receitas = 'https://dados.es.gov.br/dataset/receitas-municipios'

In [None]:
# Pasta onde você deseja salvar os arquivos.zip
output_folder_receitas = 'C:/Users/Darke/Documents/Especialização - Dados/Analitycs e BI/TCC/arquivos/receitas'

# Certifique-se de que as pastas de saída existam
if not os.path.exists(output_folder_receitas):
    os.makedirs(output_folder_receitas)

In [None]:
# Enviar uma solicitação HTTP para obter o conteúdo da página de receitas
response = requests.get(url_receitas)
if response.status_code == 200:
    # Analisar o HTML da página
    soup = BeautifulSoup(response.text, 'html.parser')

    # Encontrar todos os links na página
    links = soup.find_all('a')

    # Iterar sobre os links da página de receitas
    for link in links:
        href = link.get('href')
        if href and href.endswith('.zip'):
            file_url = urljoin(url_receitas, href)
            file_name = os.path.join(output_folder_receitas, os.path.basename(file_url))

            # Baixar os arquivos de receitas.ZIP
            with requests.get(file_url, stream=True) as file_response:
                if file_response.status_code == 200:
                    with open(file_name, 'wb') as file:
                        for chunk in file_response.iter_content():
                            file.write(chunk)

                    print(f'Arquivo baixado: {file_name}')
                else:
                    print(f'Falha ao baixar {file_url}')
else:
    print(f'Falha ao acessar a página: {url_receitas}')

In [None]:
# Especifica o caminho do diretório onde os arquivos ZIP das receitas estão localizados
caminho_arquivos_receitas = r'C:\Users\Darke\Documents\Especialização - Dados\Analitycs e BI\TCC\arquivos\receitas'

# Itera sobre todos os arquivos no diretório
for nome_arquivo_zip in os.listdir(caminho_arquivos_receitas):
    # Constrói o caminho completo para o arquivo ZIP
    caminho_zip = os.path.join(caminho_arquivos_receitas, nome_arquivo_zip)

    # Verifica se o arquivo tem a extensão .zip e é um arquivo válido
    if nome_arquivo_zip.endswith('.zip') and os.path.isfile(caminho_zip):
        # Abre o arquivo ZIP e extrai seu conteúdo no mesmo diretório
        with ZipFile(caminho_zip, 'r') as z:
            z.extractall(path=caminho_arquivos_receitas)
        
        # Exibe uma mensagem indicando que os arquivos foram extraídos com sucesso
        print(f'Arquivos extraídos em: {caminho_arquivos_receitas}')
    else:
        # Exibe uma mensagem se o arquivo ZIP não for válido ou não existir
        print(f'O arquivo ZIP "{nome_arquivo_zip}" não é válido ou não existe no caminho especificado.')

In [None]:
def encode_files(path_receitas):
    # Informando o caminho do diretório
    folder = Path(path_receitas)

    # Iterando sobre os arquivos csv
    for path_file in folder.glob("*.csv"):
        output = folder / (path_file.stem + ".tmp")
        
        # Abre o arquivo CSV usando UTF-8. Em caso de erro, tenta novamente usando Latin-1
        try:
            with open(path_file, 'r', encoding="utf-8") as file:
                data = file.read()
        except UnicodeDecodeError:
            with open(path_file, 'r', encoding="latin-1") as file:
                data = file.read()
                
        # Abre o arquivo de saída temp e escreve os dados lidos do csv original
        with open(output, 'w', encoding="utf-8") as output_file:
            output_file.write(data)

        # Remove o csv original e renomeia o arquivo temporario
        path_file.unlink()
        output.rename(path_file)

if __name__ == "__main__":
    path_receitas = r'C:\Users\Darke\Documents\Especialização - Dados\Analitycs e BI\TCC\arquivos\receitas'
    encode_files(path_receitas)

In [None]:
def merge_csv_files(path_receitas):
    folder = Path(path_receitas)
    output_file_path = folder / "receitas_unificadas.csv"

    # Iterando sobre cada arquivo csv
    for path_file in folder.glob("*.csv"):
        with open(path_file, 'r', encoding='utf-8') as input_file:
            csv_reader = csv.reader(input_file)
            
            # Se for o primeiro arquivo, escreva o header
            if not output_file_path.exists():
                header = next(csv_reader)
                with open(output_file_path, 'w', encoding='utf-8', newline='') as output_file:
                    csv_writer = csv.writer(output_file)
                    csv_writer.writerow(header)

            # Escrevendo as linhas remanescentes
            with open(output_file_path, 'a', encoding='utf-8', newline='') as output_file:
                csv_writer = csv.writer(output_file)
                csv_writer.writerows(csv_reader)

if __name__ == "__main__":
    path_receitas = r'C:\Users\Darke\Documents\Especialização - Dados\Analitycs e BI\TCC\arquivos\receitas'
    merge_csv_files(path_receitas)

Neste ponto foi necessária a correção manual em determinadas linhas no arquivo de receitas unificado, pois havia dados na coluna "NomeDetalhamento" com ";".

In [None]:
arquivo_receita = r'C:\Users\Darke\Documents\Especialização - Dados\Analitycs e BI\TCC\arquivos\receitas\receitas_unificadas.csv'
dados = pd.read_csv(arquivo_receita, sep=';', encoding='utf-8')
dados.head()

In [None]:
# Validando os anos no arquivo de receitas unificado
dados[':Ano'] = pd.to_numeric(dados[':Ano'], errors='coerce')
print(dados[':Ano'].unique())

In [None]:
# Retirando os "NaN" da coluna Ano
dados = dados.dropna(subset=[":Ano"])
print(dados[':Ano'].unique())

In [None]:
# Cria a nova coluna data com lambda e datetime
dados["data"] = dados.apply(lambda row: datetime(int(row[":Ano"]), int(row["Mes"]), 1), axis=1)


In [None]:
# Somar o total de valores nulos em cada coluna
total_nulos_por_coluna = dados.isnull().sum()

# Somar o total de valores nulos em todo o DataFrame
total_nulos_no_dataframe = dados.isnull().sum().sum()

print("Total de nulos por coluna:")
print(total_nulos_por_coluna)

print("\nTotal de nulos no DataFrame:")
print(total_nulos_no_dataframe)

In [None]:
# Exclui as colunas que não serão utilizadas
dados = dados.drop(["CodigoRubrica", "NomeRubrica","NomeAlinea", "NomeSubAlinea", "NomeSubAlinea"], axis=1)

In [None]:
# Renomeando as colunas
novos_nomes = {
    ':Ano': 'ano',
    'Mes': 'mes',
    'CodigoUnidadeGestora': 'codigo_unidade_gestora',
    'UnidadeGestora': 'unidade_gestora',
    'EsferaAdministrativa': 'esfera_administrativa',
    'ClassificacaoReceita': 'classificacao_receita',
    'CodigoCategoria': 'codigo_categoria',
    'NomeCategoria': 'nome_categoria',
    'CodigoOrigem': 'codigo_origem',
    'NomeOrigem': 'nome_origem',
    'CodigoEspecie': 'codigo_especie',
    'NomeEspecie': 'nome_especie',
    'CodigoAlinea': 'codigo_alinea',
    'CodigoSubAlinea': 'codigo_sub_alinea',
    'NomeFonteReduzida': 'nome_fonteReduzida',
    'CodigoDetalhamento': 'codigo_detalhamento',
    'NomeGrupo': 'nome_grupo',
    'NomeDetalhamento': 'nome_detalhamento',
    'PrevisaoInicial': 'previsao_inicial',
    'PrevisaoAtualizada': 'previsao_atualizada',
    'Arrecadada': 'arrecadada',
    'PrevisaoInicialFUNDEB': 'previsao_inicial_fundeb',
    'PrevisaoAtualizadaFUNDEB': 'previsao_atualizada_fundeb',
    'ArrecadadaFUNDEB': 'arrecadada_fundeb',
    'data': 'data_arquivo',
    'CodigoTipoDetalhamento': 'codigo_tipo_detalhamento',
    'NomeTipoDetalhamento': 'nome_tipo_detalhamento',
    'CodigoGrupo': 'codigo_grupo',
    'CodigoFonteReduzida': 'codigo_fonte_reduzida',
    'CodigoDetalhamentoNivel1': 'codigo_detalhamento_nivel_1',
    'NomeDetalhamentoNivel1': 'nome_detalhamento_nivel_1',
    'CodigoDetalhamentoNivel2': 'codigo_detalhamento_nivel_2',
    'NomeDetalhamentoNivel2': 'nome_detalhamento_nivel_2',
    'CodigoDetalhamentoNivel3': 'codigo_detalhamento_nivel_3',
    'NomeDetalhamentoNivel3': 'nome_detalhamento_nivel_3'
}

dados.rename(columns=novos_nomes, inplace=True)

In [None]:
# Incluindo novo campo de data_carga no arquivo
data_carga = datetime.now()
dados['data_carga'] = data_carga

In [None]:
# Setando as colunas como object e int
dados['nome_detalhamento_nivel_1'] = dados['nome_detalhamento_nivel_1'].astype('object')
dados['ano'] = dados['ano'].astype('int64')
dados['mes'] = dados['mes'].astype('int64')

In [None]:
dados.info()

In [None]:
# Salva as alterações de volta ao arquivo original
dados.to_csv(arquivo_receita, sep=';', encoding='utf-8', index=False)

In [None]:
# Carregar o arquivo unificado de receitas para o DataFrame do pandas
csv_file_path = r'C:\Users\Darke\Documents\Especialização - Dados\Analitycs e BI\TCC\arquivos\receitas\receitas_unificadas.csv'
df = pd.read_csv(csv_file_path, sep=';')

# Especificar o caminho para salvar o arquivo Parquet
parquet_file_path = r'C:\Users\Darke\Documents\Especialização - Dados\Analitycs e BI\TCC\arquivos\receitas\receitas.parquet'

# Salvar o DataFrame como um arquivo Parquet
df.to_parquet(parquet_file_path, index=False)

print(f'O arquivo CSV foi convertido com sucesso para Parquet em: {parquet_file_path}')

In [None]:
# Validando o parquet arquivo convertido
arquivo_receita = r'C:\Users\Darke\Documents\Especialização - Dados\Analitycs e BI\TCC\arquivos\receitas\receitas.parquet'
dados = pd.read_parquet(arquivo_receita)
dados.head()

In [None]:
dados.info()