## Imports

In [0]:
import requests
import pandas as pd
from zipfile import ZipFile
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import os
from io import StringIO, BytesIO
import re

## Get data

### Path and Year range

In [0]:
anos_zip = range(2022, (datetime.now().year + 1))
anos_csv = range(2020, 2022)
semestres = [1, 2]

base_url_zip = "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/dsas/ca/ca-{year}-{sem:02d}.zip"
base_url_csv = "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/dsas/ca/ca-{year}-{sem:02d}.csv"

urls_zip = [base_url_zip.format(year=year, sem=sem) for year in anos_zip for sem in semestres]
urls_csv = [base_url_csv.format(year=year, sem=sem) for year in anos_csv for sem in semestres]

all_urls = urls_zip + urls_csv

### Download and process files in-memory

In [0]:
def rename_cols(df_spark):
    return (
        df_spark
        .withColumnRenamed("ï»¿Regiao - Sigla", "RegiaoSigla")
        .withColumnRenamed("Regiao - Sigla", "RegiaoSigla")
        .withColumnRenamed("Estado - Sigla", "EstadoSigla")
        .withColumnRenamed("Municipio", "Municipio")
        .withColumnRenamed("Revenda", "Revenda")
        .withColumnRenamed("CNPJ da Revenda", "CNPJRevenda")
        .withColumnRenamed("Nome da Rua", "NomeRua")
        .withColumnRenamed("Numero Rua", "NumeroRua")
        .withColumnRenamed("Complemento", "Complemento")
        .withColumnRenamed("Bairro", "Bairro")
        .withColumnRenamed("Cep", "Cep")
        .withColumnRenamed("Produto", "Produto")
        .withColumnRenamed("Data da Coleta", "DataColeta")
        .withColumnRenamed("Valor de Venda", "ValorVenda")
        .withColumnRenamed("Valor de Compra", "ValorCompra")
        .withColumnRenamed("Unidade de Medida", "UnidadeMedida")
        .withColumnRenamed("Bandeira", "Bandeira")
    )

In [0]:
def process_and_save_url(url):
    match = re.search(r"ca-(\d{4})-(\d{2})", url)
    if not match:
        print(f"Could not extract year/semester from {url}")
        return

    year, sem = match.groups()
    table_name = f"gov_br_combustivel.bronze_layer.delta_{year}_{sem}"

    if spark.catalog.tableExists(table_name):
        print(f"Table {table_name} already exists. Skipping.")
        return

    df_pandas = None
    try:
        r = requests.get(url)
        if r.status_code == 200:
            if url.endswith('.zip'):
                with ZipFile(BytesIO(r.content)) as z:
                    for fname in z.namelist():
                        if fname.lower().endswith('.csv'):
                            with z.open(fname) as f:
                                df_pandas = pd.read_csv(f, sep=";", encoding="UTF-8")
            elif url.endswith('.csv'):
                df_pandas = pd.read_csv(StringIO(r.text), sep=";", encoding="UTF-8")
    except Exception as e:
        print(f"Error processing {url}: {e}")
        return

    if df_pandas is None or df_pandas.empty:
        print(f"No data in {url}")
        return

    spark_df = spark.createDataFrame(df_pandas)
    renamed_df = rename_cols(spark_df)

    renamed_df.write.format("delta").mode("overwrite").saveAsTable(table_name)
    print(f"Successfully saved {table_name}")

with ThreadPoolExecutor(max_workers=4) as ex:
    ex.map(process_and_save_url, all_urls)

In [0]:
def process_and_save_precos_zip(url):
    table_name = "gov_br_combustivel.bronze_layer.delta_2022_01"
    if spark.catalog.tableExists(table_name):
        print(f"Table {table_name} already exists. Skipping.")
        return

    df_pandas = None
    try:
        r = requests.get(url)
        if r.status_code == 200 and url.endswith('.zip'):
            with ZipFile(BytesIO(r.content)) as z:
                for fname in z.namelist():
                    if fname.lower().endswith('.csv'):
                        with z.open(fname) as f:
                            df_pandas = pd.read_csv(f, sep=";", encoding="UTF-8")
    except Exception as e:
        print(f"Error processing {url}: {e}")
        return

    if df_pandas is None or df_pandas.empty:
        print(f"No data in {url}")
        return

    spark_df = spark.createDataFrame(df_pandas)
    renamed_df = rename_cols(spark_df)
    renamed_df.write.format("delta").mode("overwrite").saveAsTable(table_name)
    print(f"Successfully saved {table_name}")

precos_zip_url = "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/dsas/ca/precos-semestrais-ca.zip"
process_and_save_precos_zip(precos_zip_url)