<a href="https://colab.research.google.com/github/LeticiaHms/data-collection-mba/blob/main/captura_ingestao_dados.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# üéì Aula Pr√°tica: Captura e Ingest√£o de Dados

## Objetivos da Aula
1. Capturar dados de APIs p√∫blicas
2. Fazer web scraping b√°sico
3. Simular ingest√£o de arquivos CSV/JSON
4. Criar pipeline simples de ETL
5. Introdu√ß√£o ao Airflow e dbt (conceitos)

---

## üì¶ Parte 1: Instala√ß√£o e Setup

Vamos instalar as bibliotecas necess√°rias:

In [None]:
# Instalando bibliotecas necess√°rias
!pip install requests pandas beautifulsoup4 sqlalchemy -q

In [None]:
# Imports
import requests
import pandas as pd
import json
from datetime import datetime
import time
from typing import Dict, List
import os

---
## üåê Parte 2: Captura de Dados de API

### Exerc√≠cio 1: API REST simples
Vamos usar a API p√∫blica JSONPlaceholder para simular captura de dados

In [None]:
# Fun√ß√£o b√°sica para capturar dados de API
def capturar_dados_api(url: str) -> Dict:
    """
    Captura dados de uma API REST

    Args:
        url: URL da API

    Returns:
        Dados em formato JSON
    """
    try:
        response = requests.get(url)
        response.raise_for_status()  # Levanta exce√ß√£o para erros HTTP
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Erro na requisi√ß√£o: {e}")
        return None

# Testando a fun√ß√£o
url = "https://jsonplaceholder.typicode.com/users"
dados_usuarios = capturar_dados_api(url)

print(f"Total de usu√°rios capturados: {len(dados_usuarios)}")
print("\nPrimeiro usu√°rio:")
print(json.dumps(dados_usuarios[0], indent=2))

Total de usu√°rios capturados: 10

Primeiro usu√°rio:
{
  "id": 1,
  "name": "Leanne Graham",
  "username": "Bret",
  "email": "Sincere@april.biz",
  "address": {
    "street": "Kulas Light",
    "suite": "Apt. 556",
    "city": "Gwenborough",
    "zipcode": "92998-3874",
    "geo": {
      "lat": "-37.3159",
      "lng": "81.1496"
    }
  },
  "phone": "1-770-736-8031 x56442",
  "website": "hildegard.org",
  "company": {
    "name": "Romaguera-Crona",
    "catchPhrase": "Multi-layered client-server neural-net",
    "bs": "harness real-time e-markets"
  }
}


### üí° DESAFIO 1: Capture dados de posts
Complete a fun√ß√£o abaixo:

In [None]:
def capturar_posts(limite: int = 10) -> List[Dict]:
    """
    TODO: Capturar posts da API
    URL: https://jsonplaceholder.typicode.com/posts

    Args:
        limite: n√∫mero m√°ximo de posts a capturar

    Returns:
        Lista de posts
    """
    # ESCREVA SEU C√ìDIGO AQUI
    pass

# Teste sua fun√ß√£o
# posts = capturar_posts(5)
# print(f"Posts capturados: {len(posts)}")

---
## üìä Parte 3: Transforma√ß√£o de Dados

Vamos transformar os dados capturados em DataFrames do Pandas

In [None]:
# Converter dados de usu√°rios para DataFrame
def transformar_usuarios(dados_usuarios: List[Dict]) -> pd.DataFrame:
    """
    Transforma dados de usu√°rios em DataFrame normalizado
    """
    # Criar DataFrame b√°sico
    df = pd.DataFrame(dados_usuarios)

    # Extrair informa√ß√µes aninhadas
    df['cidade'] = df['address'].apply(lambda x: x['city'])
    df['lat'] = df['address'].apply(lambda x: x['geo']['lat'])
    df['lng'] = df['address'].apply(lambda x: x['geo']['lng'])
    df['empresa'] = df['company'].apply(lambda x: x['name'])

    # Selecionar colunas relevantes
    colunas_finais = ['id', 'name', 'username', 'email', 'phone', 'cidade', 'empresa', 'lat', 'lng']

    return df[colunas_finais]

# Transformar dados
df_usuarios = transformar_usuarios(dados_usuarios)
print(df_usuarios.head())
print(f"\nShape: {df_usuarios.shape}")

   id              name   username                      email  \
0   1     Leanne Graham       Bret          Sincere@april.biz   
1   2      Ervin Howell  Antonette          Shanna@melissa.tv   
2   3  Clementine Bauch   Samantha         Nathan@yesenia.net   
3   4  Patricia Lebsack   Karianne  Julianne.OConner@kory.org   
4   5  Chelsey Dietrich     Kamren   Lucio_Hettinger@annie.ca   

                   phone         cidade             empresa       lat  \
0  1-770-736-8031 x56442    Gwenborough     Romaguera-Crona  -37.3159   
1    010-692-6593 x09125    Wisokyburgh        Deckow-Crist  -43.9509   
2         1-463-123-4447  McKenziehaven  Romaguera-Jacobson  -68.6102   
3      493-170-9623 x156    South Elvis       Robel-Corkery   29.4572   
4          (254)954-1289     Roscoeview         Keebler LLC  -31.8129   

         lng  
0    81.1496  
1   -34.4618  
2   -47.0653  
3  -164.2990  
4    62.5342  

Shape: (10, 9)


### üí° DESAFIO 2: Adicione valida√ß√µes
Adicione valida√ß√µes aos dados:

In [None]:
def validar_dados(df: pd.DataFrame) -> pd.DataFrame:
    """
    TODO: Adicionar valida√ß√µes:
    1. Remover linhas com email inv√°lido
    2. Validar que lat/lng s√£o num√©ricos
    3. Remover duplicatas
    """
    # ESCREVA SEU C√ìDIGO AQUI
    pass

# Teste sua fun√ß√£o
# df_validado = validar_dados(df_usuarios)
# print(f"Registros ap√≥s valida√ß√£o: {len(df_validado)}")

---
## üíæ Parte 4: Ingest√£o e Armazenamento

### 4.1: Salvar em diferentes formatos

In [None]:
# Criar diret√≥rio para dados
os.makedirs('dados_capturados', exist_ok=True)

def salvar_dados(df: pd.DataFrame, nome_arquivo: str, formato: str = 'csv'):
    """
    Salva DataFrame em diferentes formatos

    Args:
        df: DataFrame a ser salvo
        nome_arquivo: nome base do arquivo (sem extens√£o)
        formato: 'csv', 'json', 'parquet'
    """
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

    if formato == 'csv':
        caminho = f'dados_capturados/{nome_arquivo}_{timestamp}.csv'
        df.to_csv(caminho, index=False)
    elif formato == 'json':
        caminho = f'dados_capturados/{nome_arquivo}_{timestamp}.json'
        df.to_json(caminho, orient='records', indent=2)
    elif formato == 'parquet':
        caminho = f'dados_capturados/{nome_arquivo}_{timestamp}.parquet'
        df.to_parquet(caminho, index=False)

    print(f"‚úÖ Dados salvos em: {caminho}")
    return caminho

# Salvar em diferentes formatos
salvar_dados(df_usuarios, 'usuarios', 'csv')
salvar_dados(df_usuarios, 'usuarios', 'json')

‚úÖ Dados salvos em: dados_capturados/usuarios_20251024_010751.csv
‚úÖ Dados salvos em: dados_capturados/usuarios_20251024_010751.json


'dados_capturados/usuarios_20251024_010751.json'

### 4.2: Simular banco de dados SQLite

In [None]:
from sqlalchemy import create_engine

# Criar conex√£o com banco SQLite
engine = create_engine('sqlite:///dados_capturados/dados.db')

def ingerir_para_banco(df: pd.DataFrame, nome_tabela: str, engine):
    """
    Ingere DataFrame em banco de dados
    """
    try:
        # Adicionar timestamp de ingest√£o
        df['data_ingestao'] = datetime.now()

        # Salvar no banco
        df.to_sql(nome_tabela, engine, if_exists='replace', index=False)

        print(f"‚úÖ {len(df)} registros inseridos na tabela '{nome_tabela}'")
    except Exception as e:
        print(f"‚ùå Erro ao inserir dados: {e}")

# Ingerir dados
ingerir_para_banco(df_usuarios, 'usuarios', engine)

# Verificar dados inseridos
query = "SELECT * FROM usuarios LIMIT 3"
resultado = pd.read_sql(query, engine)
print("\nAmostra dos dados no banco:")
print(resultado)

‚úÖ 10 registros inseridos na tabela 'usuarios'

Amostra dos dados no banco:
   id              name   username               email                  phone  \
0   1     Leanne Graham       Bret   Sincere@april.biz  1-770-736-8031 x56442   
1   2      Ervin Howell  Antonette   Shanna@melissa.tv    010-692-6593 x09125   
2   3  Clementine Bauch   Samantha  Nathan@yesenia.net         1-463-123-4447   

          cidade             empresa       lat       lng  \
0    Gwenborough     Romaguera-Crona  -37.3159   81.1496   
1    Wisokyburgh        Deckow-Crist  -43.9509  -34.4618   
2  McKenziehaven  Romaguera-Jacobson  -68.6102  -47.0653   

                data_ingestao  
0  2025-10-24 01:13:15.238146  
1  2025-10-24 01:13:15.238146  
2  2025-10-24 01:13:15.238146  


---
## üîÑ Parte 5: Pipeline ETL Completo

Vamos criar um pipeline que integra todas as etapas

In [None]:
class PipelineETL:
    """
    Pipeline ETL completo para captura e ingest√£o de dados
    """

    def __init__(self, nome_pipeline: str):
        self.nome = nome_pipeline
        self.logs = []

    def log(self, mensagem: str):
        """Adiciona log com timestamp"""
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        log_msg = f"[{timestamp}] {mensagem}"
        self.logs.append(log_msg)
        print(log_msg)

    def extrair(self, url: str) -> List[Dict]:
        """Extra√ß√£o de dados"""
        self.log(f"üîµ Iniciando extra√ß√£o de: {url}")
        dados = capturar_dados_api(url)
        self.log(f"‚úÖ Extra√≠dos {len(dados)} registros")
        return dados

    def transformar(self, dados: List[Dict], funcao_transformacao) -> pd.DataFrame:
        """Transforma√ß√£o de dados"""
        self.log("üîµ Iniciando transforma√ß√£o")
        df = funcao_transformacao(dados)
        self.log(f"‚úÖ Transforma√ß√£o conclu√≠da. Shape: {df.shape}")
        return df

    def carregar(self, df: pd.DataFrame, destino: str, **kwargs):
        """Carregamento de dados"""
        self.log(f"üîµ Iniciando carregamento para: {destino}")

        if destino == 'csv':
            salvar_dados(df, kwargs.get('nome', 'dados'), 'csv')
        elif destino == 'banco':
            ingerir_para_banco(df, kwargs.get('tabela', 'dados'), kwargs.get('engine'))

        self.log("‚úÖ Carregamento conclu√≠do")

    def executar(self, url: str, funcao_transformacao, destino: str, **kwargs):
        """Executa pipeline completo"""
        self.log(f"üöÄ Iniciando pipeline: {self.nome}")
        inicio = time.time()

        try:
            # ETL
            dados = self.extrair(url)
            df = self.transformar(dados, funcao_transformacao)
            self.carregar(df, destino, **kwargs)

            # Finaliza√ß√£o
            tempo_total = time.time() - inicio
            self.log(f"‚úÖ Pipeline conclu√≠do em {tempo_total:.2f} segundos")

        except Exception as e:
            self.log(f"‚ùå Erro no pipeline: {e}")
            raise

# Executar pipeline
pipeline = PipelineETL("Pipeline de Usu√°rios")
pipeline.executar(
    url="https://jsonplaceholder.typicode.com/users",
    funcao_transformacao=transformar_usuarios,
    destino='banco',
    tabela='usuarios_pipeline',
    engine=engine
)

### üí° DESAFIO 3: Crie seu pr√≥prio pipeline
Crie um pipeline para capturar dados de posts e coment√°rios

In [None]:
def transformar_posts(dados_posts: List[Dict]) -> pd.DataFrame:
    """
    TODO: Transformar dados de posts
    - Criar DataFrame
    - Adicionar coluna com tamanho do t√≠tulo
    - Adicionar coluna com tamanho do corpo
    """
    # ESCREVA SEU C√ìDIGO AQUI
    pass

# TODO: Execute o pipeline para posts
# pipeline_posts = PipelineETL("Pipeline de Posts")
# pipeline_posts.executar(...)

---
## üåä Parte 6: Conceitos de Airflow (Simula√ß√£o)

N√£o podemos executar Airflow no Colab, mas vamos simular os conceitos principais

In [None]:
from datetime import timedelta

class TaskSimulada:
    """Simula uma Task do Airflow"""

    def __init__(self, task_id: str, funcao):
        self.task_id = task_id
        self.funcao = funcao
        self.status = "PENDING"

    def executar(self, **kwargs):
        """Executa a task"""
        print(f"\nüîµ Executando task: {self.task_id}")
        self.status = "RUNNING"

        try:
            resultado = self.funcao(**kwargs)
            self.status = "SUCCESS"
            print(f"‚úÖ Task {self.task_id} conclu√≠da")
            return resultado
        except Exception as e:
            self.status = "FAILED"
            print(f"‚ùå Task {self.task_id} falhou: {e}")
            raise

class DAGSimulada:
    """Simula uma DAG do Airflow"""

    def __init__(self, dag_id: str, descricao: str):
        self.dag_id = dag_id
        self.descricao = descricao
        self.tasks = []

    def adicionar_task(self, task: TaskSimulada):
        """Adiciona uma task √† DAG"""
        self.tasks.append(task)

    def executar(self):
        """Executa todas as tasks em sequ√™ncia"""
        print(f"\n{'='*60}")
        print(f"üöÄ Executando DAG: {self.dag_id}")
        print(f"Descri√ß√£o: {self.descricao}")
        print(f"Total de tasks: {len(self.tasks)}")
        print(f"{'='*60}")

        resultado_anterior = None

        for task in self.tasks:
            resultado_anterior = task.executar(dados_anteriores=resultado_anterior)

        print(f"\n‚úÖ DAG {self.dag_id} conclu√≠da com sucesso!")

# Exemplo de uso
def task_extrair_usuarios(**kwargs):
    url = "https://jsonplaceholder.typicode.com/users"
    return capturar_dados_api(url)

def task_transformar_usuarios(**kwargs):
    dados = kwargs.get('dados_anteriores')
    return transformar_usuarios(dados)

def task_salvar_usuarios(**kwargs):
    df = kwargs.get('dados_anteriores')
    salvar_dados(df, 'usuarios_dag', 'csv')
    return df

# Criar DAG
dag = DAGSimulada(
    dag_id="pipeline_usuarios_v1",
    descricao="Pipeline di√°rio de captura de usu√°rios"
)

# Adicionar tasks
dag.adicionar_task(TaskSimulada("extrair_usuarios", task_extrair_usuarios))
dag.adicionar_task(TaskSimulada("transformar_usuarios", task_transformar_usuarios))
dag.adicionar_task(TaskSimulada("salvar_usuarios", task_salvar_usuarios))

# Executar DAG
dag.executar()

---
## üìä Parte 7: Conceitos de dbt (Simula√ß√£o)

Vamos simular transforma√ß√µes no estilo dbt

In [None]:
# Simulando models do dbt

# Model 1: staging (dados brutos limpos)
def stg_usuarios(engine):
    """
    Model de staging - primeira camada de transforma√ß√£o
    Similar a um arquivo .sql no dbt
    """
    query = """
    SELECT
        id as usuario_id,
        name as nome_completo,
        username,
        email,
        cidade,
        empresa,
        CAST(lat AS FLOAT) as latitude,
        CAST(lng AS FLOAT) as longitude
    FROM usuarios
    WHERE email IS NOT NULL
    """

    df = pd.read_sql(query, engine)
    print(f"‚úÖ stg_usuarios: {len(df)} registros")
    return df

# Model 2: intermediate (l√≥gica de neg√≥cio)
def int_usuarios_com_metricas(df_staging):
    """
    Model intermediate - adiciona m√©tricas e c√°lculos
    """
    df = df_staging.copy()

    # Adicionar m√©tricas
    df['tamanho_nome'] = df['nome_completo'].str.len()
    df['tem_dominio_comercial'] = df['email'].str.contains('.com', na=False)
    df['hemisferio'] = df['latitude'].apply(lambda x: 'Norte' if x > 0 else 'Sul')

    print(f"‚úÖ int_usuarios_com_metricas: {len(df)} registros")
    return df

# Model 3: marts (modelo final para consumo)
def mart_usuarios_resumo(df_intermediate):
    """
    Model final - agrega√ß√µes e dados prontos para an√°lise
    """
    resumo = df_intermediate.groupby('cidade').agg({
        'usuario_id': 'count',
        'tamanho_nome': 'mean',
        'tem_dominio_comercial': 'sum'
    }).reset_index()

    resumo.columns = ['cidade', 'total_usuarios', 'media_tamanho_nome', 'usuarios_email_comercial']

    print(f"‚úÖ mart_usuarios_resumo: {len(resumo)} registros")
    return resumo

# Executar pipeline dbt-style
print("\n" + "="*60)
print("üîÑ Executando transforma√ß√µes estilo dbt")
print("="*60)

# Stage
df_stg = stg_usuarios(engine)

# Intermediate
df_int = int_usuarios_com_metricas(df_stg)

# Marts
df_mart = mart_usuarios_resumo(df_int)

print("\nüìä Resultado Final:")
print(df_mart)

# Salvar resultado final
ingerir_para_banco(df_mart, 'mart_usuarios_resumo', engine)

---
## üéØ Parte 8: Exerc√≠cio Final Integrado

Agora √© sua vez! Crie um pipeline completo que:
1. Captura dados de posts E coment√°rios
2. Faz JOIN entre as duas fontes
3. Cria m√©tricas agregadas
4. Salva em formato final

In [None]:
# TODO: Seu pipeline completo aqui

# URLs dispon√≠veis:
# Posts: https://jsonplaceholder.typicode.com/posts
# Coment√°rios: https://jsonplaceholder.typicode.com/comments

# Estrutura sugerida:
# 1. Criar classe ou fun√ß√£o para capturar ambos os dados
# 2. Transformar e fazer JOIN (postId √© a chave)
# 3. Calcular m√©tricas (ex: n√∫mero de coment√°rios por post)
# 4. Criar visualiza√ß√£o simples dos resultados

# ESCREVA SEU C√ìDIGO AQUI


---
## üìö Resumo e Conceitos Importantes

### O que aprendemos:

1. **Extra√ß√£o de Dados**
   - APIs REST com requests
   - Tratamento de erros
   - Pagina√ß√£o e limites

2. **Transforma√ß√£o**
   - Normaliza√ß√£o de dados
   - Valida√ß√µes
   - Limpeza e enriquecimento

3. **Carregamento**
   - M√∫ltiplos formatos (CSV, JSON, Parquet)
   - Bancos de dados SQL
   - Versionamento com timestamps

4. **Pipelines**
   - Orquestra√ß√£o de tarefas
   - Logging e monitoramento
   - Tratamento de erros

5. **Conceitos Airflow**
   - DAGs (Directed Acyclic Graphs)
   - Tasks e depend√™ncias
   - Agendamento

6. **Conceitos dbt**
   - Modelos em camadas (staging, intermediate, marts)
   - Transforma√ß√µes SQL
   - Documenta√ß√£o e testes

### Pr√≥ximos passos:
- Instalar Airflow localmente
- Criar projeto dbt real
- Explorar conectores para diferentes fontes de dados
- Implementar data quality checks
- Adicionar monitoramento e alertas

---
## üîó Recursos Adicionais

- [Documenta√ß√£o Airflow](https://airflow.apache.org/docs/)
- [Documenta√ß√£o dbt](https://docs.getdbt.com/)
- [APIs p√∫blicas para praticar](https://github.com/public-apis/public-apis)
- [Pandas User Guide](https://pandas.pydata.org/docs/user_guide/index.html)