1. Extração
- Nesta etapa, vamos extrair o CSV para a camada mais baixa (bronze) onde os dados brutos serão armazenados. 

In [None]:
# Importando as bibliotecas
import pandas as pd
import requests
import os
from datetime import date, timedelta, datetime
from glob import glob
from zipfile import BadZipFile

In [None]:
from datetime import date, timedelta

def generate_weekly_files():
    today = date.today()

    
    last_week_end = today - timedelta(days=today.weekday() + 1)
    
    
    start = date(2025, 1, 5)
    end = start + timedelta(days=6)

    urls = []

    while end <= last_week_end:
        url = (
            f"https://www.gov.br/anp/pt-br/assuntos/precos-e-defesa-da-concorrencia/"
            f"precos/arquivos-lpc/{end.year}/"
            f"resumo_semanal_lpc_{start:%Y-%m-%d}_{end:%Y-%m-%d}.xlsx"
        )
        urls.append((url, start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")))

        
        start += timedelta(days=7)
        end += timedelta(days=7)

    return urls


In [None]:
def collect_raw_data():
    current_year = date.today().year
    base_path = f"../data/bronze/{current_year}"
    os.makedirs(base_path, exist_ok=True)

    
    for urls, week_start, week_end in generate_weekly_files():
        date_end = datetime.strptime(week_end, "%Y-%m-%d").date()
        month = date_end.month

        
        mkdir_month = f"{base_path}/{month:02d}"
        os.makedirs(mkdir_month, exist_ok=True)

        
        file_path = f"{mkdir_month}/{week_start}_{week_end}.xlsx"

        
        if os.path.exists(file_path):
            
            continue

        
        resp = requests.get(urls)
        if resp.status_code == 200:
            with open(file_path, "wb") as f:
                f.write(resp.content)
            print(f"Arquivo salvo: {file_path}")
        else:
            print(f"Erro ao baixar {urls} -> status {resp.status_code}")


In [None]:
collect_raw_data()

2. Transformação 
- Transformando os dados em CSV (dados consumíveis e consistentes)

In [None]:
def generate_weekly_file_in_csv():
    current_year = date.today().year
    bronze_files = glob(f"../data/bronze/{current_year}/*/*.xlsx")
    silver_path = f"../data/silver/{current_year}"
    os.makedirs(silver_path, exist_ok=True)

    for file in bronze_files:
        try:
            
            month = os.path.basename(os.path.dirname(file))
            
            
            xls = pd.ExcelFile(file, engine="openpyxl")

            
            for sheet in xls.sheet_names:
                try:
                    
                    df_temp = pd.read_excel(xls, sheet_name=sheet)
                    header_row = df_temp.index[df_temp.iloc[:, 0] == "DATA INICIAL"][0]

                    
                    df = pd.read_excel(xls, sheet_name=sheet, header=header_row, skiprows=1)

                    
                    sheet_path = os.path.join(silver_path, month, sheet.upper())
                    os.makedirs(sheet_path, exist_ok=True)

                    
                    base_name = os.path.basename(file).replace(".xlsx", f"_{sheet.upper()}.csv")
                    csv_file = os.path.join(sheet_path, base_name)
                    
                    if os.path.exists(csv_file):
                       
                        continue
                   
                    df.to_csv(csv_file, index=False, encoding="utf-8-sig")
                    print(f"Arquivo convertido: {csv_file}")

                except Exception as e:
                    print(f"Erro ao processar sheet {sheet} do arquivo {file}: {e}")

        except (BadZipFile, ValueError) as e:
            print(f"Arquivo inválido (pulado): {file} ({e})")
        except Exception as e:
            print(f"Erro inesperado em {file}: {e}")


In [None]:
generate_weekly_file_in_csv()

In [None]:
def cleaning_all_silver():
  current_year = date.today().year
  silver_path = glob(f"../data/silver/{current_year}/*/*.csv")
  
  for file in silver_path:
    try:
      df = pd.read_csv(file, encoding="utf-8-sig")
      df.columns = ["" if col.startswith("Unnamed") else col for col in df.columns]
      df.columns = df.columns.str.strip().str.upper()
      if "DATA INICIAL" in df.columns:
                df["DATA INICIAL"] = pd.to_datetime(df["DATA INICIAL"], errors="coerce").dt.strftime("%d-%m-%y")
      if "DATA FINAL" in df.columns:
                df["DATA FINAL"] = pd.to_datetime(df["DATA FINAL"], errors="coerce").dt.strftime("%d-%m-%y")      
      df.to_csv(file, index=False, encoding="utf-8-sig") 
     # print(df.head(1)) 
      print(df.columns)
    except Exception as e:
            print(f"Erro inesperado em {file}: {e}")

In [None]:
cleaning_all_silver()