In [1]:
!pip install unidecode


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.10 -m pip install --upgrade pip[0m


In [2]:
 # ! Importando as bibliotecas

import requests
from bs4 import BeautifulSoup
import os
import pandas as pd
import basedosdados as bd
import numpy as np
import unidecode
from pathlib import Path

# Visualizando todas as colunas do DataFrame. 
# Importante quando for trabalhar com muitas colunas
pd.set_option('display.max_columns', None)
pd.set_option('display.max_row', None)

In [3]:
    # ! URL da página que contém os links de download
url = "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/serie-historica-de-precos-de-combustiveis"

# ! Fazer solicitação GET para a página
# ? O método get() retorna um objeto Response
response = requests.get(url)

# ! Analisar o HTML da página usando a biblioteca BeautifulSoup
# ? Usando a biblioteca BeautifulSoup para analisar o HTML da página
# * Primeiro argumento: o conteúdo HTML da página
# * Segundo argumento: o parser HTML que será usado para analisar o HTML
soup = BeautifulSoup(response.content, "html.parser")


# ! Encontrar todos os links de download para arquivos CSV
# ? Usando o método find_all() para encontrar todos os elementos <a> com o atributo href
# * Segundo argumento: Uma função lambda que é usada como filtro adicional para encontrar apenas os links que terminam com .csv
links = soup.find_all("a", href=lambda href: href and href.endswith(".csv"))

In [4]:
diretorio = "input/"

if not os.path.exists(diretorio):
    os.mkdir(diretorio)

In [None]:
for link in links:
    filename = link.get("href").split("/")[-1]
    file_url = link.get("href")
    response = requests.get(file_url)

    with open(f'input/{filename}', "wb") as f:
        f.write(response.content)

    print(f'Arquivo {filename} baixado com sucesso!')

In [None]:
def to_partitions(data: pd.DataFrame, partition_columns: list[str], savepath: str):
    """Save data in to hive patitions schema, given a dataframe and a list of partition columns.
    Args:
        data (pandas.core.frame.DataFrame): Dataframe to be partitioned.
        partition_columns (list): List of columns to be used as partitions.
        savepath (str, pathlib.PosixPath): folder path to save the partitions
    Exemple:
        data = {
            "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025],
            "mes": [1, 2, 3, 4, 5, 6, 6,9],
            "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"],
            "dado": ["a", "b", "c", "d", "e", "f", "g",'h'],
        }
        to_partitions(
            data=pd.DataFrame(data),
            partition_columns=['ano','mes','sigla_uf'],
            savepath='partitions/'
        )
    """

    if isinstance(data, (pd.core.frame.DataFrame)):

        savepath = Path(savepath)

        # create unique combinations between partition columns
        unique_combinations = (
            data[partition_columns]
            .drop_duplicates(subset=partition_columns)
            .to_dict(orient="records")
        )

        for filter_combination in unique_combinations:
            patitions_values = [
                f"{partition}={value}"
                for partition, value in filter_combination.items()
            ]

            # get filtered data
            df_filter = data.loc[
                data[filter_combination.keys()]
                .isin(filter_combination.values())
                .all(axis=1),
                :,
            ]
            df_filter = df_filter.drop(columns=partition_columns)

            # create folder tree
            filter_save_path = Path(savepath / "/".join(patitions_values))
            filter_save_path.mkdir(parents=True, exist_ok=True)
            file_filter_save_path = Path(filter_save_path) / "data.csv"

            # append data to csv
            df_filter.to_csv(
                file_filter_save_path,
                index=False,
                mode="a",
                header=not file_filter_save_path.exists(),
            )
    else:
        raise BaseException("Data need to be a pandas DataFrame")

rename = {'Revenda': 'nome_estabelecimento', 'CNPJ da Revenda':'cnpj_revenda', 'Bairro':'bairro_revenda', 'Cep':'cep_revenda', 'Produto':'produto',
          'Valor de Venda':'preco_venda', 'Valor de Compra':'preco_compra', 'Unidade de Medida':'unidade_medida', 'Bandeira':'bandeira_revenda', 'Estado - Sigla' : 'sigla_uf',
          'Municipio':'nome', 'Data da Coleta': 'data_coleta', 'Nome da Rua':'nome_rua', 'Numero Rua':'numero_rua', 'Complemento':'complemento'}

ordem = ['ano', 'sigla_uf', 'id_municipio', 'bairro_revenda', 'cep_revenda', 'endereco_revenda', 'cnpj_revenda', 'nome_estabelecimento', 'bandeira_revenda', 
         'data_coleta', 'produto', 'unidade_medida', 'preco_compra', 'preco_venda']

# ! Carregando os dados direto do Diretório de municipio da BD
# Para carregar o dado direto no pandas
id_municipio = bd.read_table(dataset_id='br_bd_diretorios_brasil',
table_id='municipio',
billing_project_id="basedosdados-dev") 

# ! Tratamento do id_municipio para mergear com a base
id_municipio['nome'] = id_municipio['nome'].str.upper()
id_municipio['nome'] = id_municipio['nome'].apply(unidecode.unidecode)
id_municipio['nome'] = id_municipio['nome'].replace("ESPIGAO D'OESTE", "ESPIGAO DO OESTE")
id_municipio['nome'] = id_municipio['nome'].replace("SANT'ANA DO LIVRAMENTO", "SANTANA DO LIVRAMENTO")
id_municipio = id_municipio[['id_municipio', 'nome']]

for a in {*range(2004, 2023)}:
    df = pd.read_csv(f"input/ca-{a}-01.csv", sep=";", encoding="ISO-8859-1") # ! combustiveis automativos 2004 - Primeiro semestre
    dh = pd.read_csv(f"input/glp-{a}-01.csv", sep=";", encoding="ISO-8859-1") # ! glp 2004 - Primeiro semestre
    dk = pd.read_csv(f"input/ca-{a}-02.csv", sep=";", encoding="ISO-8859-1") # ! combustiveis automativos 2004 - Primeiro semestre
    dz = pd.read_csv(f"input/glp-{a}-02.csv", sep=";", encoding="ISO-8859-1") # ! glp 2004 - Primeiro semestre
    df_fx = df.append([dh, dk, dz], ignore_index=True)
    
    df_fx = pd.merge(id_municipio, df_fx, how='right', left_on=['nome'], right_on=['Municipio'])
    
    df_fx.rename(columns={'Municipio':'nome'}, inplace=True)
    
    #df_fx = pd.merge(id_cep, df_fx, how='right', on='cep_revenda') # Conversar depois com a Gabi sobre a coluna de CEP
    df_fx['endereco_revenda'] = df_fx['Nome da Rua'].fillna('')  + ',' + ' ' + df_fx['Numero Rua'].fillna('') + ',' + ' ' + df_fx['Complemento'].fillna('')
    
    df_fx.rename(columns={'Data da Coleta':'data_coleta'}, inplace=True)
    
    df_fx = df_fx.query('data_coleta != "08/"')
    
    df_fx['data_coleta'] = pd.to_datetime(df_fx['data_coleta'], format='%d/%m/%Y')
   
    df_fx['Produto'] = df_fx['Produto'].str.lower()
   
    df_fx['ano'] = df_fx['data_coleta'].dt.year
   
    df_fx['ano'].replace('nan', '', inplace=True)
   
    df_fx.rename(columns=rename, inplace=True)
  
    df_fx = df_fx[ordem]
   
    df_fx['ano'] = df_fx['ano'].apply(lambda x: str(x).replace('.0', ''))
   
    df_fx['cep_revenda'] = df_fx['cep_revenda'].apply(lambda x: str(x).replace('-', ''))
   
    df_fx['unidade_medida'] = df_fx['unidade_medida'].map({'R$ / litro': 'R$/litro', 'R$ / m³':'R$/m3', 'R$ / 13 kg':'R$/13kg'})
   
    df_fx['nome_estabelecimento'] = df_fx['nome_estabelecimento'].apply(lambda x: str(x).replace(',',''))
  
    df_fx['preco_compra'] = df_fx['preco_compra'].apply(lambda x: str(x).replace(',','.'))
 
    df_fx['preco_venda'] = df_fx['preco_venda'].apply(lambda x: str(x).replace(',','.'))
  
    df_fx['preco_venda'] = df_fx['preco_venda'].replace('nan', '')
  
    df_fx['preco_compra'] = df_fx['preco_compra'].replace('nan', '')
  
    df_fx.sort_values('data_coleta', inplace=True)
   
    df_fx.drop_duplicates()
  

    to_partitions(df_fx,
    partition_columns=['ano'],
    savepath = 'output/')