# Ingestão de Dados - MongoDB

Este notebook realiza a ingestão de dados de um banco de dados MongoDB para o MinIO usando DeltaLake.

## Configuração

Configure as variáveis abaixo antes de executar:

In [None]:
# Importar configurações base
%run ../00_configuracao_inicial.ipynb

In [None]:
# ============================================
# CONFIGURAÇÕES DE CONEXÃO MONGODB
# ============================================
import os

# Configurações de conexão MongoDB
MONGODB_HOST = os.getenv('MONGODB_HOST', 'localhost')
MONGODB_PORT = os.getenv('MONGODB_PORT', '27017')
MONGODB_DATABASE = os.getenv('MONGODB_DATABASE', 'admin')
MONGODB_USER = os.getenv('MONGODB_USER', 'admin')
MONGODB_PASSWORD = os.getenv('MONGODB_PASSWORD', 'senha')
MONGODB_AUTH_DB = os.getenv('MONGODB_AUTH_DB', 'admin')  # Database de autenticação

# Configurações de leitura
MONGODB_COLLECTION = os.getenv('MONGODB_COLLECTION', 'nome_colecao')

# Configurações de destino no MinIO
DESTINO_BRONZE = f"{PATH_BRONZE}/mongodb/{MONGODB_DATABASE.lower()}/{MONGODB_COLLECTION.lower()}"

print("Configurações MongoDB:")
print(f"Host: {MONGODB_HOST}")
print(f"Port: {MONGODB_PORT}")
print(f"Database: {MONGODB_DATABASE}")
print(f"Collection: {MONGODB_COLLECTION}")
print(f"Destino: {DESTINO_BRONZE}")

In [None]:
# Instalar MongoDB Spark Connector (executar apenas uma vez)
# !pip install pymongo

# Adicionar MongoDB Spark Connector ao Spark
# Baixar mongo-spark-connector_2.12-XX.X.X.jar
# Ou usar: spark.jars.packages com coordenadas Maven: org.mongodb.spark:mongo-spark-connector_2.12:XX.X.X

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

# URI de conexão MongoDB
# Formato: mongodb://[username:password@]host[:port][/[database][?options]]
if MONGODB_USER and MONGODB_PASSWORD:
    mongodb_uri = f"mongodb://{MONGODB_USER}:{MONGODB_PASSWORD}@{MONGODB_HOST}:{MONGODB_PORT}/{MONGODB_DATABASE}?authSource={MONGODB_AUTH_DB}"
else:
    mongodb_uri = f"mongodb://{MONGODB_HOST}:{MONGODB_PORT}/{MONGODB_DATABASE}"

print(f"MongoDB URI: mongodb://***:***@{MONGODB_HOST}:{MONGODB_PORT}/{MONGODB_DATABASE}")

In [None]:
# Função para ler dados do MongoDB
def ler_mongodb_collection(database, collection, query=None, pipeline=None, read_preference="primary"):
    """
    Lê dados de uma coleção MongoDB
    
    Args:
        database: Nome do banco de dados
        collection: Nome da coleção
        query: Query MongoDB em formato JSON (opcional)
        pipeline: Pipeline de agregação MongoDB (opcional, substitui query)
        read_preference: Preferência de leitura (primary, secondary, etc)
    
    Returns:
        DataFrame do Spark
    """
    reader = spark.read.format("mongo") \
        .option("uri", mongodb_uri) \
        .option("database", database) \
        .option("collection", collection) \
        .option("readPreference.name", read_preference)
    
    # Adicionar query se fornecida
    if pipeline:
        # Pipeline de agregação
        reader = reader.option("pipeline", pipeline)
    elif query:
        # Query simples
        reader = reader.option("query", query)
    
    df = reader.load()
    
    return df

In [None]:
# Exemplo 1: Leitura simples de coleção
print("Exemplo 1: Leitura simples")
df_mongodb = ler_mongodb_collection(
    database=MONGODB_DATABASE,
    collection=MONGODB_COLLECTION
)

print(f"Total de registros: {df_mongodb.count()}")
df_mongodb.printSchema()
df_mongodb.show(5, truncate=False)

In [None]:
# Exemplo 2: Leitura com query MongoDB
print("Exemplo 2: Leitura com query")
query_mongodb = {
    "status": "ativo",
    "created_at": {"$gte": "2024-01-01"}
}

# df_mongodb_query = ler_mongodb_collection(
#     database=MONGODB_DATABASE,
#     collection=MONGODB_COLLECTION,
#     query=json.dumps(query_mongodb)
# )
# df_mongodb_query.show(5)

In [None]:
# Exemplo 3: Leitura com pipeline de agregação
print("Exemplo 3: Leitura com pipeline de agregação")
pipeline_agregacao = [
    {"$match": {"status": "ativo"}},
    {"$project": {"nome": 1, "email": 1, "created_at": 1}},
    {"$sort": {"created_at": -1}},
    {"$limit": 1000}
]

# df_mongodb_pipeline = ler_mongodb_collection(
#     database=MONGODB_DATABASE,
#     collection=MONGODB_COLLECTION,
#     pipeline=json.dumps(pipeline_agregacao)
# )
# df_mongodb_pipeline.show(5)

In [None]:
# Processar campos aninhados do MongoDB (se necessário)
# MongoDB armazena documentos JSON, que podem ter estruturas aninhadas
def processar_documentos_mongodb(df):
    """
    Processa documentos MongoDB, extraindo campos aninhados
    """
    # Exemplo: extrair campos de um objeto aninhado
    # df_processado = df.withColumn("campo_extraido", col("documento.campo_aninhado"))
    
    # Converter _id ObjectId para string (se necessário)
    df_processado = df.withColumn("_id_str", col("_id").cast(StringType()))
    
    return df_processado

# Aplicar processamento
df_mongodb_processado = processar_documentos_mongodb(df_mongodb)
df_mongodb_processado.show(5, truncate=False)

In [None]:
# Adicionar metadados de ingestão
df_ingestao = df_mongodb_processado \
    .withColumn("fonte", lit("MONGODB")) \
    .withColumn("database_origem", lit(MONGODB_DATABASE)) \
    .withColumn("collection_origem", lit(MONGODB_COLLECTION)) \
    .withColumn("ingestao_em", current_timestamp()) \
    .withColumn("particao_data", date_format(current_date(), "yyyy-MM-dd"))

print("Metadados adicionados:")
df_ingestao.select("fonte", "database_origem", "collection_origem", "ingestao_em").show(1, truncate=False)

In [None]:
# Salvar no MinIO como Delta Table
print(f"Salvando dados em: {DESTINO_BRONZE}")

# save_delta_table(
#     df_ingestao,
#     DESTINO_BRONZE,
#     mode="overwrite",  # ou "append" para incrementais
#     partition_by=["particao_data"]  # Particionar por data
# )

print("Ingestão concluída com sucesso!")

In [None]:
# Verificar dados salvos
# df_verificacao = read_delta_table(DESTINO_BRONZE)
# print(f"Registros salvos: {df_verificacao.count()}")
# df_verificacao.show(5)

## Ingestão Incremental

Para ingestões incrementais baseadas em timestamp ou ObjectId:

In [None]:
# Função para ingestão incremental usando timestamp
def ingestao_incremental_mongodb_timestamp(database, collection, coluna_timestamp="updated_at", ultima_execucao=None):
    """
    Realiza ingestão incremental de dados MongoDB usando timestamp
    
    Args:
        database: Nome do banco
        collection: Nome da coleção
        coluna_timestamp: Nome do campo de timestamp
        ultima_execucao: Timestamp da última execução (formato ISO ou datetime)
    """
    if ultima_execucao:
        query = {
            coluna_timestamp: {"$gt": ultima_execucao}
        }
    else:
        # Primeira execução: pegar últimos 7 dias
        from datetime import datetime, timedelta
        data_limite = datetime.now() - timedelta(days=7)
        query = {
            coluna_timestamp: {"$gte": data_limite}
        }
    
    df_incremental = ler_mongodb_collection(
        database=database,
        collection=collection,
        query=json.dumps(query)
    )
    
    return df_incremental

# Função para ingestão incremental usando ObjectId
def ingestao_incremental_mongodb_objectid(database, collection, ultimo_objectid=None):
    """
    Realiza ingestão incremental usando ObjectId (mais eficiente)
    ObjectId contém timestamp, então pode ser usado para ordenação
    """
    if ultimo_objectid:
        from bson import ObjectId
        query = {
            "_id": {"$gt": ObjectId(ultimo_objectid)}
        }
    else:
        # Primeira execução: pegar últimos 7 dias
        from datetime import datetime, timedelta
        from bson import ObjectId
        data_limite = datetime.now() - timedelta(days=7)
        objectid_limite = ObjectId.from_datetime(data_limite)
        query = {
            "_id": {"$gte": objectid_limite}
        }
    
    pipeline = [
        {"$match": query},
        {"$sort": {"_id": 1}}
    ]
    
    df_incremental = ler_mongodb_collection(
        database=database,
        collection=collection,
        pipeline=json.dumps(pipeline)
    )
    
    return df_incremental

# Exemplo de uso
# df_incremental = ingestao_incremental_mongodb_timestamp(
#     database=MONGODB_DATABASE,
#     collection=MONGODB_COLLECTION,
#     ultima_execucao="2024-01-01T00:00:00Z"
# )
# 
# # Salvar em modo append
# save_delta_table(df_incremental, DESTINO_BRONZE, mode="append", partition_by=["particao_data"])