In [None]:
from faker import Faker
from pymongo import MongoClient
import pandas as pd
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import json
import os

# Configuração para o Faker
fake = Faker()

# Configuração do MongoDB
mongo_client = MongoClient("mongodb://<username>:<password>@<your_mongo_host>:<your_mongo_port>")
db = mongo_client["my_database"]
collection = db["fake_data"]

# Configuração do Azure Data Lake
AZURE_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=<your_account_name>;AccountKey=<your_account_key>;EndpointSuffix=core.windows.net"
CONTAINER_NAME = "datalake"
BLOB_NAME = "fake_data/fake_data.json"

# Geração de dados fake
def generate_fake_data(num_records=100):
    data = []
    for _ in range(num_records):
        record = {
            "name": fake.name(),
            "email": fake.email(),
            "address": fake.address(),
            "phone_number": fake.phone_number(),
            "company": fake.company(),
            "job": fake.job(),
            "created_at": fake.date_time_this_year().isoformat(),
        }
        data.append(record)
    return data

In [None]:
# Inserção de dados no MongoDB
def load_data_to_mongo(data):
    collection.insert_many(data)
    print(f"{len(data)} records inserted into MongoDB.")

# Extração e transformação dos dados do MongoDB
def extract_and_transform():
    data = list(collection.find({}, {"_id": 0}))  # Ignorando o campo "_id"
    df = pd.DataFrame(data)
    
    # Exemplo de transformação: Adicionar uma nova coluna
    df["source"] = "MongoDB"
    
    # Salvando o DataFrame em JSON
    transformed_data_path = "transformed_data.json"
    df.to_json(transformed_data_path, orient="records", lines=True)
    return transformed_data_path

In [None]:
# Carregamento dos dados no Data Lake
def load_to_datalake(file_path):
    blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING)
    blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=BLOB_NAME)
    
    # Criar o container caso ele não exista
    try:
        container_client = blob_service_client.get_container_client(CONTAINER_NAME)
        container_client.create_container()
    except Exception as e:
        print("Container already exists.")
    
    # Upload do arquivo JSON
    with open(file_path, "rb") as data:
        blob_client.upload_blob(data, overwrite=True)
    print(f"Data uploaded to Azure Data Lake: {BLOB_NAME}")

In [None]:
# Pipeline de ETL completo
def etl_pipeline():
    # Etapa 1: Gerar dados fake
    print("Generating fake data...")
    fake_data = generate_fake_data(100)
    
    # Etapa 2: Inserir no MongoDB
    print("Loading data into MongoDB...")
    load_data_to_mongo(fake_data)
    
    # Etapa 3: Extrair e transformar os dados
    print("Extracting and transforming data...")
    transformed_data_path = extract_and_transform()
    
    # Etapa 4: Carregar no Azure Data Lake
    print("Loading data into Azure Data Lake...")
    load_to_datalake(transformed_data_path)
    
    # Limpar o arquivo local
    os.remove(transformed_data_path)
    print("ETL pipeline completed.")

if __name__ == "__main__":
    etl_pipeline()