# üé¨ ETL Pipeline - RAW ‚Üí SILVER (Lakehouse)

---

## üìã Sum√°rio

Este notebook implementa o **pipeline ETL completo** que transforma dados brutos (RAW) em dados curados (SILVER) no lakehouse.

### Etapas do Pipeline:

1. **üì• EXTRA√á√ÉO (Extract)**: Leitura dos arquivos CSV da camada RAW
2. **üîÑ TRANSFORMA√á√ÉO (Transform)**: Limpeza, mesclagem e enriquecimento dos dados
3. **üì§ CARGA (Load)**: Inser√ß√£o no banco de dados MySQL (Lakehouse)

---

### üéØ Objetivo

Criar um **lakehouse populado** com dados de filmes e avalia√ß√µes, pronto para an√°lises e consultas SQL.

### üìä Dados de Entrada (RAW):
- `movies_metadata.csv`: ~45K filmes
- `credits.csv`: ~45K registros de elenco/equipe
- `keywords.csv`: ~46K palavras-chave
- `ratings_small.csv`: ~100K avalia√ß√µes

### üíæ Dados de Sa√≠da (SILVER):
- Tabela `movies`: 45.433 filmes
- Tabela `ratings`: 44.989 avalia√ß√µes v√°lidas

---

## üì¶ 1. Importa√ß√£o de Bibliotecas

Carregamento das bibliotecas necess√°rias para o ETL.

In [None]:
import pandas as pd
import numpy as np
import json
import warnings
from datetime import datetime
from sqlalchemy import create_engine, text
from sqlalchemy.pool import NullPool
import mysql.connector

warnings.filterwarnings('ignore')

print("‚úÖ Bibliotecas importadas com sucesso!")
print(f"üìå Pandas vers√£o: {pd.__version__}")
print(f"üìå NumPy vers√£o: {np.__version__}")

---

## üîß 2. Configura√ß√£o da Conex√£o com o Banco

Conex√£o com o **MySQL Lakehouse** usando SQLAlchemy.

In [None]:
# Configura√ß√µes do banco de dados
DB_CONFIG = {
    'host': 'localhost',  # ou 'db' se estiver dentro do container
    'port': 3306,
    'database': 'movies_db',
    'user': 'app_user',
    'password': 'app_password'
}

# String de conex√£o
connection_string = (
    f"mysql+mysqlconnector://{DB_CONFIG['user']}:{DB_CONFIG['password']}"
    f"@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
)

# Criar engine
engine = create_engine(
    connection_string,
    poolclass=NullPool,
    echo=False
)

# Testar conex√£o
try:
    with engine.connect() as conn:
        result = conn.execute(text("SELECT 'Conex√£o OK!' as status"))
        print("‚úÖ", result.fetchone()[0])
except Exception as e:
    print(f"‚ùå Erro na conex√£o: {e}")

---

## üì• 3. FASE 1: EXTRA√á√ÉO (Extract)

Leitura dos arquivos CSV da camada RAW.

In [None]:
# Caminhos dos arquivos
DATA_PATH = '../raw/dados_brutos/'

print("üì• Iniciando extra√ß√£o dos dados...\n")

# 1. Movies Metadata
print("üìÑ Carregando movies_metadata.csv...")
df_movies = pd.read_csv(
    f'{DATA_PATH}movies_metadata.csv',
    low_memory=False,
    dtype={'id': str}
)
print(f"   ‚úì {len(df_movies):,} filmes carregados")

# 2. Credits (elenco e equipe)
print("üìÑ Carregando credits.csv...")
df_credits = pd.read_csv(
    f'{DATA_PATH}credits.csv',
    dtype={'id': str}
)
print(f"   ‚úì {len(df_credits):,} registros de cr√©ditos carregados")

# 3. Keywords
print("üìÑ Carregando keywords.csv...")
df_keywords = pd.read_csv(
    f'{DATA_PATH}keywords.csv',
    dtype={'id': str}
)
print(f"   ‚úì {len(df_keywords):,} registros de palavras-chave carregados")

# 4. Ratings
print("üìÑ Carregando ratings_small.csv...")
df_ratings = pd.read_csv(
    f'{DATA_PATH}ratings_small.csv'
)
print(f"   ‚úì {len(df_ratings):,} avalia√ß√µes carregadas")

print("\n‚úÖ Extra√ß√£o conclu√≠da com sucesso!")

### üìä Visualiza√ß√£o dos Dados Brutos

In [None]:
print("üîç Primeiras linhas - Movies:")
display(df_movies.head(3))

print("\nüîç Primeiras linhas - Ratings:")
display(df_ratings.head(3))

print("\nüìà Informa√ß√µes dos DataFrames:")
print(f"Movies: {df_movies.shape}")
print(f"Credits: {df_credits.shape}")
print(f"Keywords: {df_keywords.shape}")
print(f"Ratings: {df_ratings.shape}")

---

## üîÑ 4. FASE 2: TRANSFORMA√á√ÉO (Transform)

Aplica√ß√£o das transforma√ß√µes e limpezas necess√°rias.

### üßπ Etapa 1: Limpeza de IDs Inv√°lidos

In [None]:
print("üßπ Etapa 1: Limpeza de IDs inv√°lidos\n")

# Remover registros com ID inv√°lido (n√£o num√©rico)
initial_count = len(df_movies)

df_movies = df_movies[pd.to_numeric(df_movies['id'], errors='coerce').notna()]
df_credits = df_credits[pd.to_numeric(df_credits['id'], errors='coerce').notna()]
df_keywords = df_keywords[pd.to_numeric(df_keywords['id'], errors='coerce').notna()]

removed = initial_count - len(df_movies)
print(f"   ‚úì {removed} registros com ID inv√°lido removidos")
print(f"   ‚úì {len(df_movies):,} filmes restantes")

### üîó Etapa 2: Mesclagem dos DataFrames

In [None]:
print("üîó Etapa 2: Mesclagem dos DataFrames\n")

# Mesclar movies com credits
df_merged = df_movies.merge(df_credits, on='id', how='left')
print(f"   ‚úì Ap√≥s merge com credits: {df_merged.shape}")

# Mesclar com keywords
df_merged = df_merged.merge(df_keywords, on='id', how='left')
print(f"   ‚úì Ap√≥s merge com keywords: {df_merged.shape}")

# Remover duplicatas
initial = len(df_merged)
df_merged = df_merged.drop_duplicates(subset=['id'], keep='first')
duplicates = initial - len(df_merged)
print(f"   ‚úì {duplicates:,} duplicatas removidas")
print(f"   ‚úì DataFrame final: {df_merged.shape}")

### üî¢ Etapa 3: Convers√£o de Tipos de Dados

In [None]:
print("üî¢ Etapa 3: Convers√£o de tipos de dados\n")

# Converter ID para inteiro
df_merged['id'] = pd.to_numeric(df_merged['id'], errors='coerce').astype('Int64')

# Converter colunas num√©ricas
numeric_cols = ['budget', 'revenue', 'runtime', 'popularity', 'vote_average', 'vote_count']
for col in numeric_cols:
    if col in df_merged.columns:
        df_merged[col] = pd.to_numeric(df_merged[col], errors='coerce')

# Converter data de lan√ßamento
df_merged['release_date'] = pd.to_datetime(df_merged['release_date'], errors='coerce')

print("   ‚úì Tipos de dados convertidos")
print(f"\nüìä Tipos das principais colunas:")
print(df_merged[['id', 'budget', 'revenue', 'runtime', 'release_date']].dtypes)

### üì¶ Etapa 4: Extra√ß√£o de Dados JSON

In [None]:
print("üì¶ Etapa 4: Extra√ß√£o de dados JSON\n")

def extract_names_from_json(json_str, key='name', max_items=5, separator=', '):
    """Extrai nomes de uma string JSON."""
    if pd.isna(json_str) or json_str == '':
        return ''
    try:
        data = json.loads(json_str.replace("'", '"'))
        if isinstance(data, list):
            names = [item.get(key, '') for item in data[:max_items] if isinstance(item, dict)]
            return separator.join(filter(None, names))
        return ''
    except:
        return ''

# Extrair g√™neros
print("   üé≠ Extraindo g√™neros...")
df_merged['genres'] = df_merged['genres'].apply(extract_names_from_json)

# Extrair elenco (cast) - top 5
print("   üé¨ Extraindo elenco principal...")
df_merged['cast'] = df_merged['cast'].apply(extract_names_from_json)

# Extrair equipe (crew) - diretores
print("   üé• Extraindo equipe t√©cnica...")
df_merged['crew'] = df_merged['crew'].apply(extract_names_from_json)

# Extrair keywords
print("   üîë Extraindo palavras-chave...")
df_merged['keywords'] = df_merged['keywords'].apply(extract_names_from_json)

# Extrair production companies
print("   üè¢ Extraindo produtoras...")
df_merged['production_companies'] = df_merged['production_companies'].apply(extract_names_from_json)

# Extrair production countries
print("   üåç Extraindo pa√≠ses de produ√ß√£o...")
df_merged['production_countries'] = df_merged['production_countries'].apply(extract_names_from_json)

# Extrair spoken languages
print("   üó£Ô∏è  Extraindo idiomas...")
df_merged['spoken_languages'] = df_merged['spoken_languages'].apply(extract_names_from_json)

# Extrair belongs_to_collection
print("   üìö Extraindo cole√ß√µes...")
df_merged['belongs_to_collection'] = df_merged['belongs_to_collection'].apply(
    lambda x: extract_names_from_json(x, key='name', max_items=1) if pd.notna(x) else ''
)

print("\n   ‚úì Dados JSON extra√≠dos e processados")

### ‚úÇÔ∏è Etapa 5: Sele√ß√£o de Colunas Finais

In [None]:
print("‚úÇÔ∏è Etapa 5: Sele√ß√£o de colunas finais\n")

# Colunas para a tabela MOVIES
movies_columns = [
    'id', 'title', 'overview', 'release_date',
    'budget', 'revenue', 'runtime', 'popularity',
    'status', 'tagline', 'vote_average', 'vote_count',
    'imdb_id', 'original_language',
    'genres', 'production_companies', 'production_countries',
    'spoken_languages', 'belongs_to_collection'
]

df_movies_final = df_merged[movies_columns].copy()

# Substituir valores vazios por string vazia
text_columns = ['overview', 'tagline', 'status', 'imdb_id', 'original_language',
                'genres', 'production_companies', 'production_countries',
                'spoken_languages', 'belongs_to_collection']

for col in text_columns:
    df_movies_final[col] = df_movies_final[col].fillna('').astype(str)

print(f"   ‚úì DataFrame MOVIES preparado: {df_movies_final.shape}")
print(f"   ‚úì Colunas: {len(movies_columns)}")

### üåü Etapa 6: Transforma√ß√£o da Tabela RATINGS

In [None]:
print("üåü Etapa 6: Transforma√ß√£o da tabela RATINGS\n")

# Renomear colunas
df_ratings_final = df_ratings.rename(columns={
    'userId': 'user_id',
    'movieId': 'movie_id',
    'timestamp': 'rating_timestamp'
})

# Converter timestamp UNIX para datetime
df_ratings_final['rating_timestamp'] = pd.to_datetime(
    df_ratings_final['rating_timestamp'], 
    unit='s'
)

# Remover avalia√ß√µes de filmes que n√£o existem no dataset final
valid_movie_ids = set(df_movies_final['id'].dropna())
initial_ratings = len(df_ratings_final)
df_ratings_final = df_ratings_final[df_ratings_final['movie_id'].isin(valid_movie_ids)]
removed_ratings = initial_ratings - len(df_ratings_final)

print(f"   ‚úì {removed_ratings:,} avalia√ß√µes de filmes inexistentes removidas")
print(f"   ‚úì {len(df_ratings_final):,} avalia√ß√µes v√°lidas")

# Estat√≠sticas
print(f"\nüìä Estat√≠sticas das avalia√ß√µes:")
print(f"   ‚Ä¢ Usu√°rios √∫nicos: {df_ratings_final['user_id'].nunique():,}")
print(f"   ‚Ä¢ Filmes avaliados: {df_ratings_final['movie_id'].nunique():,}")
print(f"   ‚Ä¢ Nota m√©dia: {df_ratings_final['rating'].mean():.2f}")
print(f"   ‚Ä¢ Per√≠odo: {df_ratings_final['rating_timestamp'].min()} a {df_ratings_final['rating_timestamp'].max()}")

### ‚úÖ Resumo da Transforma√ß√£o

In [None]:
print("\n" + "="*70)
print("‚úÖ TRANSFORMA√á√ÉO CONCLU√çDA COM SUCESSO!")
print("="*70)
print(f"\nüìä Resumo Final:")
print(f"   ‚Ä¢ Filmes processados: {len(df_movies_final):,}")
print(f"   ‚Ä¢ Avalia√ß√µes processadas: {len(df_ratings_final):,}")
print(f"   ‚Ä¢ Colunas na tabela MOVIES: {len(df_movies_final.columns)}")
print(f"   ‚Ä¢ Colunas na tabela RATINGS: {len(df_ratings_final.columns)}")

print(f"\nüîç Amostra dos dados transformados (MOVIES):")
display(df_movies_final[['id', 'title', 'genres', 'release_date', 'budget', 'revenue']].head(3))

print(f"\nüîç Amostra dos dados transformados (RATINGS):")
display(df_ratings_final.head(3))

---

## üì§ 5. FASE 3: CARGA (Load)

Inser√ß√£o dos dados no banco de dados MySQL (Lakehouse).

### üßπ Limpeza das Tabelas Existentes

In [None]:
print("üßπ Limpando tabelas existentes...\n")

try:
    with engine.connect() as conn:
        # Desabilitar foreign key checks temporariamente
        conn.execute(text("SET FOREIGN_KEY_CHECKS = 0"))
        
        # Truncar tabelas
        conn.execute(text("TRUNCATE TABLE ratings"))
        conn.execute(text("TRUNCATE TABLE movies"))
        
        # Reabilitar foreign key checks
        conn.execute(text("SET FOREIGN_KEY_CHECKS = 1"))
        
        conn.commit()
    
    print("   ‚úì Tabelas limpas com sucesso")
except Exception as e:
    print(f"   ‚ö†Ô∏è  Aviso: {e}")

### üì• Carregamento da Tabela MOVIES

In [None]:
print("\nüì• Carregando tabela MOVIES...\n")

try:
    # Inserir dados em chunks para melhor performance
    chunk_size = 1000
    total_chunks = (len(df_movies_final) // chunk_size) + 1
    
    for i, chunk in enumerate(range(0, len(df_movies_final), chunk_size), 1):
        df_chunk = df_movies_final.iloc[chunk:chunk + chunk_size]
        df_chunk.to_sql(
            name='movies',
            con=engine,
            if_exists='append',
            index=False,
            method='multi',
            chunksize=500
        )
        print(f"   ‚è≥ Progresso: {i}/{total_chunks} chunks ({(i/total_chunks)*100:.1f}%)", end='\r')
    
    print(f"\n   ‚úÖ {len(df_movies_final):,} filmes carregados com sucesso!")
    
except Exception as e:
    print(f"\n   ‚ùå Erro ao carregar MOVIES: {e}")

### üì• Carregamento da Tabela RATINGS

In [None]:
print("\nüì• Carregando tabela RATINGS...\n")

try:
    # Inserir dados em chunks
    chunk_size = 1000
    total_chunks = (len(df_ratings_final) // chunk_size) + 1
    
    for i, chunk in enumerate(range(0, len(df_ratings_final), chunk_size), 1):
        df_chunk = df_ratings_final.iloc[chunk:chunk + chunk_size]
        df_chunk.to_sql(
            name='ratings',
            con=engine,
            if_exists='append',
            index=False,
            method='multi',
            chunksize=500
        )
        print(f"   ‚è≥ Progresso: {i}/{total_chunks} chunks ({(i/total_chunks)*100:.1f}%)", end='\r')
    
    print(f"\n   ‚úÖ {len(df_ratings_final):,} avalia√ß√µes carregadas com sucesso!")
    
except Exception as e:
    print(f"\n   ‚ùå Erro ao carregar RATINGS: {e}")

---

## ‚úÖ 6. VALIDA√á√ÉO E ESTAT√çSTICAS FINAIS

Verifica√ß√£o dos dados carregados no lakehouse.

In [None]:
print("\n" + "="*70)
print("‚úÖ PIPELINE ETL CONCLU√çDO COM SUCESSO!")
print("="*70)

# Verificar contadores no banco
print("\nüìä Verificando dados no lakehouse...\n")

with engine.connect() as conn:
    # Contar filmes
    result = conn.execute(text("SELECT COUNT(*) FROM movies"))
    movies_count = result.fetchone()[0]
    print(f"   üé¨ Filmes na tabela MOVIES: {movies_count:,}")
    
    # Contar avalia√ß√µes
    result = conn.execute(text("SELECT COUNT(*) FROM ratings"))
    ratings_count = result.fetchone()[0]
    print(f"   ‚≠ê Avalia√ß√µes na tabela RATINGS: {ratings_count:,}")
    
    # Usu√°rios √∫nicos
    result = conn.execute(text("SELECT COUNT(DISTINCT user_id) FROM ratings"))
    users_count = result.fetchone()[0]
    print(f"   üë• Usu√°rios √∫nicos: {users_count:,}")
    
    # Estat√≠sticas gerais
    print("\nüìà Executando procedure de estat√≠sticas...\n")
    result = conn.execute(text("CALL sp_database_stats()"))
    stats = result.fetchall()
    
    for row in stats:
        print(f"\n   üìä Tabela: {row[0]}")
        print(f"      ‚Ä¢ Total de registros: {row[1]:,}")
        print(f"      ‚Ä¢ IDs √∫nicos: {row[2]:,}")
        print(f"      ‚Ä¢ Data mais antiga: {row[3]}")
        print(f"      ‚Ä¢ Data mais recente: {row[4]}")
        if row[5] is not None:
            print(f"      ‚Ä¢ Or√ßamento m√©dio: ${row[5]:,.2f}")
        if row[6] is not None:
            print(f"      ‚Ä¢ Receita m√©dia: ${row[6]:,.2f}")
        if row[7] is not None:
            print(f"      ‚Ä¢ Avalia√ß√£o m√©dia: {row[7]:.2f}")

print("\n" + "="*70)
print("üéâ LAKEHOUSE POPULADO E PRONTO PARA USO!")
print("="*70)

---

## üîç 7. CONSULTAS DE EXEMPLO

Demonstra√ß√£o de consultas SQL usando as views criadas.

### üìä View: Filmes com Estat√≠sticas Agregadas

In [None]:
# Top 10 filmes mais lucrativos
query = """
SELECT 
    title,
    genres,
    YEAR(release_date) as ano,
    budget,
    revenue,
    roi_percentage,
    user_avg_rating,
    user_ratings_count
FROM v_movies_with_stats
WHERE revenue > 0 AND budget > 0
ORDER BY revenue DESC
LIMIT 10
"""

df_top_revenue = pd.read_sql(query, engine)
print("üèÜ Top 10 Filmes Mais Lucrativos:\n")
display(df_top_revenue)

### üé≠ View: Distribui√ß√£o por G√™nero

In [None]:
query = """
SELECT *
FROM v_genre_distribution
ORDER BY total_movies DESC
LIMIT 10
"""

df_genres = pd.read_sql(query, engine)
print("üé≠ Top 10 G√™neros Mais Populares:\n")
display(df_genres)

### üìÖ View: Melhores Filmes por Ano

In [None]:
query = """
SELECT *
FROM v_top_movies_by_year
WHERE release_year >= 2010
ORDER BY release_year DESC, user_avg_rating DESC
LIMIT 15
"""

df_top_by_year = pd.read_sql(query, engine)
print("üìÖ Melhores Filmes por Ano (2010+):\n")
display(df_top_by_year)

---

## üéØ CONCLUS√ÉO

### ‚úÖ Objetivos Alcan√ßados:

1. **Extra√ß√£o**: Leitura de 4 arquivos CSV (~237K registros)
2. **Transforma√ß√£o**: 
   - Limpeza de IDs inv√°lidos
   - Mesclagem de m√∫ltiplos datasets
   - Convers√£o de tipos de dados
   - Extra√ß√£o de dados JSON complexos
   - Normaliza√ß√£o e valida√ß√£o
3. **Carga**: Inser√ß√£o de 45K+ filmes e 45K+ avalia√ß√µes no MySQL

### üìä Estat√≠sticas Finais:
- **Filmes**: 45.433
- **Avalia√ß√µes**: 44.989
- **Usu√°rios**: 671
- **Per√≠odo**: 1995-2016

### üöÄ Pr√≥ximos Passos:
- Camada GOLD: Star Schema para Data Warehouse
- Dashboards no Power BI/Tableau
- An√°lises avan√ßadas e Machine Learning

---

**üé¨ Lakehouse pronto para an√°lises!**