#### **Análise de negócios**

#### **Observações**

#### **Conteúdo - Bases e Notebook da aula**

Github:  

https://github.com/FIAP/Pos_Tech_DTAT/tree/Analise-de-Negocios/Analise%20de%20Negocios  

Série Histórica de Preços de Combustíveis e de GLP:  

https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/serie-historica-de-precos-de-combustiveis

#### **Importação de pacotes e bibliotecas**

In [None]:
# Importar biblioteca completa
import pandas as pd
import os
import requests
from bs4 import BeautifulSoup
import sys
import zipfile  
import io       
import time
import concurrent.futures
import psycopg2 
import csv

# Importar algo especifico de uma biblioteca
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from IPython.display import display

#### **Funções (def)**

In [None]:
def encontrar_links(soup):
    
    """
    Encontra dinamicamente os links de download na seção 
    'Combustíveis automotivos'.
    """
    
    # Encontra o cabeçalho <h3> que contém o texto "Combustíveis automotivos"
    heading = soup.find(lambda tag: tag.name == 'h3' and 'Combustíveis automotivos' in tag.get_text())
    
    links_para_baixar = []
    
    if not heading:
        print("Erro: Não foi possível encontrar a seção 'Combustíveis automotivos' no HTML da página")
        return links_para_baixar

    # A lista <ul> com os links é o próximo "irmão" (sibling) da tag <h3>
    ul_tag = heading.find_next_sibling('ul')
    
    if not ul_tag:
        print("Erro: Não foi possível encontrar a lista <ul> após o cabeçalho")
        return links_para_baixar

    # Encontra todas as tags <a> (links) dentro desta lista <ul>
    a_tags = ul_tag.find_all('a')
    
    for a_tag in a_tags:
        url = a_tag.get('href')
        if url:
            links_para_baixar.append(url)
            
    return links_para_baixar

In [None]:
def processar_arquivo(url, pasta_destino, max_retries, retry_delay):

    """
    Baixa um arquivo com retentativas. Se for .zip, extrai. Se .csv, salva.
    RETORNA uma string de status em vez de imprimir.
    """

    nome_arquivo = os.path.basename(url)
    
    for attempt in range(max_retries):
        try:
            
            if nome_arquivo.endswith('.zip'):
                response = requests.get(url) 
                response.raise_for_status()
                
                with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
                    zf.extractall(pasta_destino)
                    nomes_extraidos = zf.namelist()
                
                return f"[EXTRAÍDO] {nome_arquivo} -> {', '.join(nomes_extraidos)}"

            else:
                caminho_local = os.path.join(pasta_destino, nome_arquivo)
                
                with requests.get(url, stream=True) as r:
                    r.raise_for_status()
                    with open(caminho_local, 'wb') as f:
                        for chunk in r.iter_content(chunk_size=8192): 
                            f.write(chunk)
                            
                return f"[SALVO] {nome_arquivo}"

        except requests.exceptions.RequestException as e:
            if attempt + 1 < max_retries:
                time.sleep(retry_delay)
            else: 
                return f"[FALHA-REDE] {nome_arquivo} após {max_retries} tentativas. Erro: {e}"
        
        except zipfile.BadZipFile:
            return f"[FALHA-ZIP] {nome_arquivo} está corrompido ou não é .zip."

        except Exception as e:
            return f"[FALHA-INESPERADA] {nome_arquivo}. Erro: {e}"

In [None]:
# Testar a conexão ao banco de dados
def test_connection(engine):

    try:
        with engine.connect() as connection:
            
            # Testar a versão do PostgreSQL
            result = connection.execute(text("SELECT version();"))
            versao = result.fetchone()
            print("✅ Conectado com sucesso:", versao[0])

            # Listar as tabelas no schema público
            result = connection.execute(text("""
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = 'anp';
            """))
            tabelas = result.fetchall()
            print("📄 Tabelas no banco:")
            for tabela in tabelas:
                print("-", tabela[0])

    except Exception as e:
        print("❌ Erro ao executar comandos:", e)
        sys.exit()

#### **Credenciais**

In [None]:
load_dotenv()

# Credenciais do PostgreSQL
usuario_pg = os.getenv("POSTGRES_USER")
senha_pg = os.getenv("POSTGRES_PASSWORD")
host_pg = os.getenv("POSTGRES_HOST")
porta_pg = os.getenv("POSTGRES_PORT")
banco_pg = os.getenv("POSTGRES_DB")

#### **Variaveis**

In [None]:
# Número máximo de tentativas por arquivo
max_retries = 5

# Segundos de espera entre as tentativas
retry_delay = 5  

# Maximo de threads
max_workers = 20

# URL da página para extrair os links
page_url = 'https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/serie-historica-de-precos-de-combustiveis'

# Pasta onde os arquivos serão baixados
download_dir = 'arquivos_combustiveis_automotivos'

# Validar download dos arquivos ANP
baixar_arquivos_anp = 'n'

# Nome do Schema e Tabela no banco de dados 
schema_db = 'anp'
nome_tabela_db = 'preco_combustivel'
nome_tabela_completo = f'{schema_db}.{nome_tabela_db}'

# Chunksize para carga no banco de dados
chunksize = 100000     

# Validar carga no banco de dados
carregar_tabela = 's' 

# Dicionário para armazenar tempos
tempos_execucao = {}

#### **Aula 1 - Processos e formas de análise**

#### **Aula 2 - Ligação com bancos de dados**

In [None]:
# Criar engine com banco 
engine = create_engine(f"postgresql+psycopg2://{usuario_pg}:{senha_pg}@{host_pg}:{porta_pg}/{banco_pg}")

# Testar a conexão
test_connection(engine)

In [None]:
# Baixar os arquivos da ANP

if baixar_arquivos_anp.lower() == 'n':
    print(f'Etapa de carregar os dados do Github para o PostgreSQL não realizada pois a variavel baixar_arquivos_anp é `n`')
    
else:
    start_extracao = time.perf_counter()

    # 1. Criar a pasta de download
    os.makedirs(download_dir, exist_ok=True)

    # 2. Baixar o HTML da página da ANP
    print(f"Acessando a página: {page_url}")

    try:
        response = requests.get(page_url)
        response.raise_for_status()
        conteudo_html = response.text
        
    except requests.exceptions.RequestException as e:
        print(f"Erro fatal ao acessar a página da ANP: {e}")
        print("Verifique sua conexão com a internet ou se a URL da ANP mudou")
        sys.exit(1) 

    # 3. Analisar (parse) o HTML
    soup = BeautifulSoup(conteudo_html, 'html.parser')

    # 4. Encontrar os links
    links = encontrar_links(soup)

    if not links:
        print("Nenhum link encontrado para baixar")

    else:
        print(f"Encontrados {len(links)} arquivos para baixar na seção 'Combustíveis automotivos'")
        print()

        # 5. Processar (baixar ou extrair) cada arquivo EM PARALELO
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            print(f"Iniciando downloads com até {max_workers} threads em paralelo...")
            print("-" * 70)
            future_to_url = {}

            for url in links:
                future = executor.submit(processar_arquivo, 
                                        url, 
                                        download_dir, 
                                        max_retries, 
                                        retry_delay)
                
                future_to_url[future] = url

            num_concluidos = 0

            for future in concurrent.futures.as_completed(future_to_url):
                num_concluidos += 1
                url = future_to_url[future]
                
                try:
                    status_message = future.result()
                    print(f"({num_concluidos}/{len(links)}) {status_message}")
                    
                except Exception as exc:
                    print(f"({num_concluidos}/{len(links)}) [FALHA-GERAL] {url} gerou uma exceção: {exc}")
                    
        print("-" * 70)
        print()
        print("Download e processamento de todos os arquivos concluído")
        print(f"Os arquivos estão salvos em: {os.path.abspath(download_dir)}")

    end_extracao = time.perf_counter()
    tempos_execucao['extracao'] = end_extracao - start_extracao

if baixar_arquivos_anp.lower() == 'n':
    tempos_execucao['extracao'] = 0.0
    
else:
    tempos_execucao['extracao'] = end_extracao - start_extracao

In [None]:
# Carregar dados no banco de dados 

start_carga = time.perf_counter()

colunas_tabela_pg = [
    'regiao', 'estado', 'municipio', 'revenda', 'cnpj', 'nome_rua', 
    'numero_rua', 'complemento', 'bairro', 'cep', 'produto', 
    'data_coleta', 'valor_venda', 'unidade_medida', 'bandeira',
    'nome_arquivo'
]

colunas_sql_copy = ', '.join(f'\"{col}\"' for col in colunas_tabela_pg) 

mapeamento_colunas_csv_para_pg = {
    'Regiao - Sigla': 'regiao', 'Estado - Sigla': 'estado', 'Municipio': 'municipio',
    'Revenda': 'revenda', 'CNPJ da Revenda': 'cnpj', 'Nome da Rua': 'nome_rua',
    'Numero Rua': 'numero_rua', 'Complemento': 'complemento', 'Bairro': 'bairro',
    'Cep': 'cep', 'Produto': 'produto', 'Data da Coleta': 'data_coleta',
    'Valor de Venda': 'valor_venda', 'Unidade de Medida': 'unidade_medida',
    'Bandeira': 'bandeira'
}

conn_str_psycopg = f"dbname='{banco_pg}' user='{usuario_pg}' password='{senha_pg}' host='{host_pg}' port='{porta_pg}'"

if carregar_tabela.lower() == 'n':
    print(f'Etapa de carregar os dados para o PostgreSQL não realizada pois a variavel carregar_tabela é `n`')

else:
    print(f"Iniciando preparação do banco de dados para a tabela '{nome_tabela_completo}'")
    
    sql_create_schema = text(f"CREATE SCHEMA IF NOT EXISTS {schema_db};")
    sql_drop_table = text(f"DROP TABLE IF EXISTS {nome_tabela_completo};")
    
    sql_create_table = text(f"""
        CREATE TABLE {nome_tabela_completo} (
            regiao VARCHAR(255),
            estado VARCHAR(255),
            municipio VARCHAR(255),
            revenda VARCHAR(255),
            cnpj VARCHAR(255),
            nome_rua VARCHAR(255),
            numero_rua VARCHAR(255),
            complemento VARCHAR(255),
            bairro VARCHAR(255),
            cep VARCHAR(255),
            produto VARCHAR(255),
            data_coleta DATE,
            valor_venda FLOAT,
            unidade_medida VARCHAR(255),
            bandeira VARCHAR(255),
            nome_arquivo VARCHAR(255),
            data_carga TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            usuario VARCHAR(255) DEFAULT CURRENT_USER
        );
    """)

    try:
        with engine.connect() as connection:
            print(f"Executando: CREATE SCHEMA IF NOT EXISTS {schema_db}")
            connection.execute(sql_create_schema)
            print(f"Executando: DROP TABLE IF EXISTS {nome_tabela_completo}")
            connection.execute(sql_drop_table)
            print(f"Executando: CREATE TABLE {nome_tabela_completo}")
            connection.execute(sql_create_table)
            connection.commit() 
        print("✅ Schema e Tabela recriados com sucesso no banco de dados")

    except Exception as e_sql:
        print(f"❌ Erro ao preparar o banco de dados com SQLAlchemy: {e_sql}")
        sys.exit(1)
    
    print(f"\nIniciando carga de dados via COPY para a tabela '{nome_tabela_completo}' a partir de '{download_dir}'")
    
    conn_psycopg = None 
    cursor = None       
    
    try:
        print("Conectando ao PostgreSQL via psycopg2...")
        conn_psycopg = psycopg2.connect(conn_str_psycopg)
        conn_psycopg.autocommit = False 
        cursor = conn_psycopg.cursor()
        print("✅ Conectado com sucesso")

        print()
        print("-" * 70)
        print(f"Procurando arquivos .csv em '{download_dir}'")
        arquivos_csv = sorted([f for f in os.listdir(download_dir) if f.endswith('.csv')]) 
        
        if not arquivos_csv:
             print("Nenhum arquivo .csv encontrado na pasta e carga não realizada")

        else:
            print(f"Encontrados {len(arquivos_csv)} arquivos .csv e iniciando carga via COPY")
            arquivos_processados = 0
            arquivos_com_erro = 0

            for nome_arquivo_csv in arquivos_csv:
                caminho_completo = os.path.join(download_dir, nome_arquivo_csv)
                print(f"\n--- Processando arquivo: {nome_arquivo_csv} ({arquivos_processados + arquivos_com_erro + 1}/{len(arquivos_csv)}) ---")
                
                encodings_to_try = ['utf-8', 'latin-1']
                detected_encoding = None

                print(f"Iniciando detecção de encoding (tentando {encodings_to_try})")
                for encoding_attempt in encodings_to_try:
                    try:
                        pd.read_csv(
                            caminho_completo, 
                            sep=';',              
                            encoding=encoding_attempt, 
                            decimal=',',
                            nrows=5 
                        )

                        print(f"Sucesso: Encoding detectado como '{encoding_attempt}'")
                        detected_encoding = encoding_attempt
                        break 
                    
                    except UnicodeDecodeError:
                        print(f"Falha ao decodificar com '{encoding_attempt}' e tentando o próximo")
                        continue
                    
                    except FileNotFoundError:
                         print(f"ERRO FATAL: Arquivo {nome_arquivo_csv} não encontrado")
                         raise
                    
                    except Exception as e_detect:
                        print(f"Erro inesperado ao tentar ler com {encoding_attempt}: {e_detect}")
                        raise
                
                if detected_encoding is None:
                    print(f"ERRO FATAL: Não foi possível decodificar o arquivo {nome_arquivo_csv} com nenhum encoding testado ({encodings_to_try})")
                    arquivos_com_erro += 1
                    continue 

                try:
                    chunk_iterator = pd.read_csv(
                        caminho_completo, 
                        chunksize=chunksize, 
                        low_memory=False, 
                        sep=';',              
                        encoding=detected_encoding,
                        decimal=',',          
                        parse_dates=['Data da Coleta'], 
                        dayfirst=True         
                    )
                
                    total_chunks = 0

                    for i, chunk in enumerate(chunk_iterator):
                        total_chunks = i + 1
                        
                        try:
                            chunk_renamed = chunk.rename(columns=mapeamento_colunas_csv_para_pg)

                        except Exception as e_rename:
                            print(f"ERRO ao renomear colunas no chunk {total_chunks}: {e_rename}")
                            raise 

                        chunk_reordered = chunk_renamed.reindex(columns=colunas_tabela_pg)
                        chunk_reordered['nome_arquivo'] = nome_arquivo_csv

                        buffer = io.StringIO()
                        chunk_reordered.to_csv(buffer, index=False, header=False, sep=',', 
                                               na_rep='\\N', quoting=csv.QUOTE_MINIMAL, 
                                               date_format='%Y-%m-%d')
                        buffer.seek(0) 

                        print(f"Carregando chunk {total_chunks}")
                        sql_copy_command = f"""COPY {nome_tabela_completo} ({colunas_sql_copy}) FROM STDIN WITH (FORMAT CSV, HEADER FALSE, NULL '\\N', DELIMITER ',')"""
                        cursor.copy_expert(sql_copy_command, buffer)
                        
                    print(f"Arquivo {nome_arquivo_csv} carregado ({total_chunks} chunks)")
                    conn_psycopg.commit() 
                    arquivos_processados += 1

                except pd.errors.EmptyDataError:
                    print(f"AVISO: Arquivo {nome_arquivo_csv} está vazio ou tornou-se vazio após leitura")
                    conn_psycopg.rollback() 
                    arquivos_com_erro += 1 

                except FileNotFoundError: 
                    print(f"❌ Erro: Arquivo {nome_arquivo_csv} não encontrado durante processamento dos chunks")
                    conn_psycopg.rollback()
                    arquivos_com_erro += 1

                except Exception as e_file:
                    print(f"❌ Erro ao processar o arquivo {nome_arquivo_csv} (no chunk {total_chunks}): {e_file}")
                    print(f"Verifique o mapeamento, tipos de dados (especialmente datas e decimais).")
                    conn_psycopg.rollback() 
                    arquivos_com_erro += 1
            
            print("-" * 70)
            print(f"\n✅ Carga via COPY concluída")
            print(f"Arquivos processados com sucesso: {arquivos_processados}")
            print(f"Arquivos com erro/vazios: {arquivos_com_erro}")

    except psycopg2.Error as db_err:
        print(f"❌ Erro de conexão ou execução no PostgreSQL (psycopg2): {db_err}")
        if conn_psycopg:
            conn_psycopg.rollback() 

    except Exception as e:
        print(f"❌ Erro inesperado durante a carga dos dados (possivelmente ao iniciar leitura de {nome_arquivo_csv}): {e}")
        if conn_psycopg:
            conn_psycopg.rollback()
            
    finally:
        if cursor:
            cursor.close()
        if conn_psycopg:
            conn_psycopg.close()
            print("\nConexão psycopg2 fechada")
            
    print("\nProcesso de carga finalizado")

    end_carga = time.perf_counter()
    tempo_total_celula_carga = end_carga - start_carga

if carregar_tabela.lower() == 'n':
    tempos_execucao['carga'] = 0.0
    
else:
    tempos_execucao['carga'] = tempo_total_celula_carga

In [None]:
# Gerar resumo

print(f"Resumo da execução da extração e carregamento\n")

tempo_extracao = tempos_execucao.get('extracao', 0.0)
tempo_carga = tempos_execucao.get('carga', 0.0)

if tempo_extracao == 0.0 and tempo_carga == 0.0:
    print("Nenhuma operação (extração ou carga) foi cronometrada")
    
elif (tempo_extracao + tempo_carga) == 0.0:
     print("Tempo total foi zero e não é possível calcular percentuais")
     
else:
    total_time = tempo_extracao + tempo_carga
    perc_extracao = (tempo_extracao / total_time) * 100
    perc_carga = (tempo_carga / total_time) * 100

    data_ops = {
        'Tempo (s)': [tempo_extracao, tempo_carga],
        'Percentual (%)': [perc_extracao, perc_carga]
    }

    index_labels_ops = ['Operação de Extração', 'Operação de Carga']

    df_ops = pd.DataFrame(data_ops, index=index_labels_ops)
    
    df_ops_styled = df_ops.style.format({
        'Tempo (s)': '{:.2f}'.format,
        'Percentual (%)': '{:.1f}'.format
    }).set_properties(
        **{'text-align': 'right'}
    ).set_table_styles([
        dict(selector="th.row_heading", props=[("text-align", "left"), ("min-width", "220px")]),
        dict(selector="th.col_heading", props=[("text-align", "right")])
    ])
    
    display(df_ops_styled)

    data_total = {
        'Tempo (s)': [total_time],
        'Percentual (%)': [100.0]
    }
    index_labels_total = ['Tempo Total']
    df_total = pd.DataFrame(data_total, index=index_labels_total)

    df_total_styled = df_total.style.format({
        'Tempo (s)': '{:.2f}'.format,
        'Percentual (%)': '{:.1f}'.format
    }).set_properties(
        **{'text-align': 'right'}
    ).set_table_styles([
        dict(selector="th.row_heading", props=[("text-align", "left"), ("min-width", "220px")]),
        dict(selector="th.col_heading", props=[("text-align", "right")])
    ])

    print()
    display(df_total_styled)

Resumo da execução da extração e carregamento



Unnamed: 0,Tempo (s),Percentual (%)
Operação de Extração,0.0,0.0
Operação de Carga,12.26,100.0





Unnamed: 0,Tempo (s),Percentual (%)
Tempo Total,12.26,100.0
