<a href="https://colab.research.google.com/github/fertorresfs/Pipeline-ETL-Extract-Transform-Load-Simples/blob/main/pipeline_etl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import requests
import sqlite3
from sqlalchemy import create_engine, text
import logging
import os
from datetime import datetime

# Configurar o logging
logging.basicConfig(filename='etl.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [None]:
# Configurações do banco de dados
db_path = 'seu_banco_de_dados.db'  # Substitua pelo caminho do seu banco de dados
engine = create_engine(f'sqlite:///{db_path}')

In [None]:
def extrair_dados_csv(caminho_arquivo):
    """Extrai dados de um arquivo CSV."""
    try:
        df = pd.read_csv(caminho_arquivo)
        logging.info(f"Dados extraídos do CSV: {caminho_arquivo}")
        return df
    except Exception as e:
        logging.error(f"Erro ao extrair dados do CSV: {e}")
        return None

In [None]:
def extrair_dados_api(url_api):
    """Extrai dados de uma API."""
    try:
        response = requests.get(url_api)
        response.raise_for_status() # Lança uma exceção para códigos de status ruins (4xx ou 5xx)
        data = response.json()
        df = pd.DataFrame(data) # Supondo que a API retorna JSON que pode ser convertido em DataFrame.
        logging.info(f"Dados extraídos da API: {url_api}")
        return df
    except requests.exceptions.RequestException as e:
        logging.error(f"Erro na requisição da API: {e}")
        return None
    except Exception as e:
        logging.error(f"Erro ao extrair dados da API: {e}")
        return None

In [None]:
def transformar_dados(df1, df2=None):
    """Transforma os dados."""
    #Exemplo de transformação: renomear colunas para evitar conflitos
    df1.columns = ['id', 'nome_csv', 'valor_csv', 'data_csv']


    if df2 is not None:
        df2.columns = ['id', 'nome_api', 'valor_api']
        df = pd.merge(df1, df2, on='id', how='left') #combinando os dataframes
    else:
        df = df1

    #Outras transformações - exemplo convertendo coluna para datetime
    df['data_csv'] = pd.to_datetime(df['data_csv'])
    # Adicione mais transformações aqui, conforme necessário.
    logging.info("Dados transformados.")
    return df

In [None]:
def carregar_dados(df, nome_tabela):
    """Carrega os dados no banco de dados."""
    try:
        df.to_sql(nome_tabela, engine, if_exists='replace', index=False) #substitui a tabela se ela já existir
        logging.info(f"Dados carregados na tabela: {nome_tabela}")
    except Exception as e:
        logging.error(f"Erro ao carregar dados no banco de dados: {e}")

In [None]:
def executar_etl(csv_path, api_url, tabela_destino):
    """Executa o pipeline ETL completo."""

    df_csv = extrair_dados_csv(csv_path)
    df_api = extrair_dados_api(api_url)

    if df_csv is not None:  # Proceed even if API extraction fails
        df_transformado = transformar_dados(df_csv, df_api)
        if df_transformado is not None:
             carregar_dados(df_transformado, tabela_destino)

In [None]:
# Exemplo de Uso:
csv_filepath = 'seu_arquivo.csv' #substitua
api_url = 'https://sua_api.com/endpoint' #substitua
tabela = 'dados_combinados'

executar_etl(csv_filepath, api_url, tabela)

In [None]:
#Para agendar com cron (no Linux/macOS):
# 1. Abra o crontab: `crontab -e`
# 2. Adicione uma linha como esta para executar o script diariamente à meia-noite:
# `0 0 * * * /usr/bin/python3 /caminho/para/seu/pipeline_etl.ipynb`
# Substitua `/usr/bin/python3` e `/caminho/para/seu/` pelos valores corretos.
# Você pode usar `which python3` para encontrar o caminho do seu interpretador Python.