# Total Sales Pipeline
Raw layer from "Venda Total" pipeline

In [13]:
from bs4 import BeautifulSoup
from io import BytesIO
from google.cloud import bigquery
import re
import os
import requests
import zipfile
import pandas as pd


True

In [14]:
"""
Constants for the ANP Fuel Price Scraper
"""

BASE_URL = "https://www.gov.br/anp/pt-br/centrais-de-conteudo/paineis-dinamicos-da-anp/paineis-dinamicos-do-abastecimento/painel-dinamico-do-mercado-brasileiro-de-combustiveis-liquidos"
start_date = pd.to_datetime("2023-01-01")
end_date = pd.to_datetime("2023-01-31")

In [15]:
"""
ANP Fuel Price Scraper
This script scrapes the ANP website for fuel prices and uploads the data to
Google Cloud Storage.
"""

response = requests.get(BASE_URL, verify=False)
if response.status_code == 200:
    page = BeautifulSoup(response.content, "html.parser")
else:
    raise Exception("Failed to retrieve data from the URL.")

pattern = re.compile(r"Clique\s+aqu.*Líquidos", re.IGNORECASE)
a_tag = page.find('a', string=pattern)
panel = None
if a_tag and 'href' in a_tag.attrs:
    panel_link = a_tag['href']
    panel_text = a_tag.get_text(strip=True)
    panel = {'text': panel_text, 'link': panel_link}
else:
    panel = None

data = None
data_link = page.find('a', string='Veja também a base de dados utilizada no painel')
if data_link and 'href' in data_link.attrs:
    link = data_link['href']
    text = data_link.get_text(strip=True)
    updated_data = data_link.next_sibling
    if updated_data:
        li_text = updated_data.get_text(strip=True).split("em ")[1][0:-1].strip()
    else:
        li_text = "Data não disponível"
    data = {'text': text, 'link': link, 'updated_date': li_text}
else:
    data = None

if data and data.get('link'):
    download_link = data.get('link')

    gcs_base_path = "general_sales"
    final_path = "/general_sales.zip"
    gcs_path = f"/{gcs_base_path}{final_path}"

    response = requests.get(download_link, verify=False)
    response.raise_for_status()
    zip_bytes = BytesIO(response.content)




In [16]:
"""
Extracts the general sales data from a zip file in Google Cloud Storage,
"""

with zipfile.ZipFile(zip_bytes) as zf:
	for file_info in zf.infolist():
		with zf.open(file_info) as file:
			file_name = file_info.filename
			if file_info.filename == "Liquidos_Vendas_Atual.csv":
				df = pd.read_csv(file, sep=";", encoding="latin1")
				break


df = df.rename(columns={
	"Ano": "year",
	"Mês": "month",
	"Agente Regulado": "company_name",
	"Código do Produto": "product_code",
	"Nome do Produto": "product_name",
	"Descrição do Produto": "product",
	"Região Origem": "origin_region",
	"UF Origem": "origin_state",
	"Região Destinatário": "destination_region",
	"UF Destino": "destination_state",
	"Mercado Destinatário": "target_market",
	"Quantidade de Produto (mil m³)": "volume_1000m3",

})

In [17]:
"""
    this code removes unused years from the dataframe,
    filters the dataframe by start and end date,
    drops the year and product_code columns,
    converts the year, month, and day into a date column,
    get all datas to insert into BigQuery
    iterate over the dataframe and append the data to the corresponding month datafram
    create a hashmap to store the dataframes for each month
"""

remove_years = [2022, 2021, 2020, 2019, 2018, 2017]
df = df[~df['year'].isin(remove_years)].copy()
df['date'] = pd.to_datetime(dict(year=df['year'], month=df['month'], day=1)).astype('datetime64[us]')

filter_by_start_and_end_date = (df['date'] >= start_date) & (df['date'] <= end_date)
df = df[filter_by_start_and_end_date]
df = df.drop(columns=['year', 'product_code'])
df_by_month = {month: month_df.drop(columns='month') for month, month_df in df.groupby('month')}
print("Months available in the dictionary:", df_by_month.keys())

df['date'] = pd.to_datetime(df['date']).dt.date
for month, month_df in df_by_month.items():
    df_by_month[month]['date'] = pd.to_datetime(month_df['date']).dt.date


Months available in the dictionary: dict_keys([1])


In [18]:
"""
Inserting data into BigQuery
"""

bq_client = bigquery.Client()
project_id = os.getenv("GOOGLE_PROJECT_ID")
bq_dataset = "rw_ext_anp"
table_name = "rw_ext_anp_total_sales"
table_id = f"{project_id}.{bq_dataset}.{table_name}"

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)

job = bq_client.load_table_from_dataframe(
    df, table_id, job_config=job_config
)

job.result()
for month, monthly_df in df_by_month.items():
    year = monthly_df['date'].iloc[0].year
    month_num = monthly_df['date'].iloc[0].month
    partition_key = f"{year}{month_num:02d}"

    print(f"Inserting data for partition: {partition_key}")
    job = bq_client.load_table_from_dataframe(
        monthly_df, f"{table_id}${partition_key}", job_config=job_config
    )
    job.result()
    print(f"Data for {partition_key} inserted successfully.")

Inserting data for partition: 202301
Data for 202301 inserted successfully.
