In [1]:
!pip install faker sqlalchemy

Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
import pandas as pd
from faker import Faker
import random
import time
from datetime import datetime, timedelta
from sqlalchemy import create_engine

# --- Configurações do Banco de Dados RDS ---
DB_HOST = 'seu_host_rds.amazonaws.com'
DB_PORT = '5432'
DB_NAME = 'nome_do_seu_banco'
DB_USER = 'seu_usuario'
DB_PASSWORD = 'sua_senha'

# Configuração do Faker para o idioma português do Brasil
fake = Faker('pt_BR')

# Função para obter todos os cliente_id do banco de dados
def get_all_cliente_ids(db_host, db_port, db_name, db_user, db_password):
    try:
        engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
        query = "SELECT cliente_id FROM clientes"
        df_clientes_ids = pd.read_sql(query, engine)
        return df_clientes_ids['cliente_id'].tolist()
    except Exception as e:
        print(f"Erro ao obter cliente_ids do RDS: {e}")
        return []
    finally:
        if 'engine' in locals():
            engine.dispose()

# Função para gerar um novo cliente
def generate_new_cliente():
    cliente_id = fake.unique.random_number(digits=8)
    nome = fake.name()
    cpf = fake.cpf()
    data_nascimento = fake.date_of_birth(minimum_age=18, maximum_age=70)
    sexo = random.choice(['Masculino', 'Feminino'])
    endereco = fake.address()
    email = fake.email()
    telefone = fake.phone_number()
    return {
        'cliente_id': cliente_id,
        'nome': nome,
        'cpf': cpf,
        'data_nascimento': data_nascimento,
        'sexo': sexo,
        'endereco': endereco,
        'email': email,
        'telefone': telefone
    }

# Função para inserir um novo cliente no RDS
def insert_new_cliente(cliente_data, db_host, db_port, db_name, db_user, db_password):
    try:
        engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
        df_cliente = pd.DataFrame([cliente_data])
        df_cliente.to_sql('clientes', engine, if_exists='append', index=False)
        print(f"Novo cliente inserido com cliente_id: {cliente_data['cliente_id']}")
        return cliente_data['cliente_id']
    except Exception as e:
        print(f"Erro ao inserir novo cliente no RDS: {e}")
        return None
    finally:
        if 'engine' in locals():
            engine.dispose()

# Função para gerar um novo evento de transação
def generate_realtime_transaction(cliente_ids):
    if not cliente_ids:
        return None
    transacao_id = fake.unique.random_number(digits=10)
    cliente_id = random.choice(cliente_ids)
    data_transacao = datetime.now()
    tipos_transacao = ['Compra', 'Pagamento', 'Depósito', 'Retirada', 'Transferência']
    tipo_transacao = random.choice(tipos_transacao)
    valor = round(random.uniform(5, 500), 2)
    descricao = fake.sentence(nb_words=3)
    return {
        'transacao_id': transacao_id,
        'cliente_id': cliente_id,
        'data_transacao': data_transacao,
        'tipo_transacao': tipo_transacao,
        'valor': valor,
        'descricao': descricao
    }

# Função para inserir o evento de transação na tabela 'transacoes'
def insert_realtime_transaction(transaction_data, db_host, db_port, db_name, db_user, db_password):
    if not transaction_data:
        return
    try:
        engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
        df_transaction = pd.DataFrame([transaction_data])
        df_transaction.to_sql('transacoes', engine, if_exists='append', index=False)
        print(f"Transação inserida para cliente_id: {transaction_data['cliente_id']} às {transaction_data['data_transacao']}")
    except Exception as e:
        print(f"Erro ao inserir transação no RDS: {e}")
    finally:
        if 'engine' in locals():
            engine.dispose()

# Função para gerar um novo empréstimo para um cliente
def generate_new_loan(cliente_id):
    emprestimo_id = fake.unique.random_number(digits=9)
    data_solicitacao = datetime.now() - timedelta(days=random.randint(0, 30))
    tipos_emprestimo = ['Pessoal', 'Automotivo', 'Consignado']
    tipo_emprestimo = random.choice(tipos_emprestimo)
    valor_emprestimo = round(random.uniform(1000, 20000), 2)
    taxa_juros = round(random.uniform(0.01, 0.03), 4)
    prazo_meses = random.randint(12, 36)
    data_aprovacao = datetime.now() if random.random() < 0.7 else None # 70% de chance de aprovação imediata
    status = 'Aprovado' if data_aprovacao else 'Pendente'
    return {
        'emprestimo_id': emprestimo_id,
        'cliente_id': cliente_id,
        'data_solicitacao': data_solicitacao,
        'tipo_emprestimo': tipo_emprestimo,
        'valor_emprestimo': valor_emprestimo,
        'taxa_juros': taxa_juros,
        'prazo_meses': prazo_meses,
        'data_aprovacao': data_aprovacao,
        'status': status
    }

# Função para inserir um novo empréstimo no RDS
def insert_new_loan(loan_data, db_host, db_port, db_name, db_user, db_password):
    try:
        engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
        df_loan = pd.DataFrame([loan_data])
        df_loan.to_sql('emprestimos', engine, if_exists='append', index=False)
        print(f"Novo empréstimo inserido para cliente_id: {loan_data['cliente_id']} com emprestimo_id: {loan_data['emprestimo_id']}")
    except Exception as e:
        print(f"Erro ao inserir novo empréstimo no RDS: {e}")
    finally:
        if 'engine' in locals():
            engine.dispose()

# Função para gerar um pagamento para um empréstimo
def generate_payment(emprestimo_id, valor_emprestimo, prazo_meses):
    pagamento_id = fake.unique.random_number(digits=10)
    data_pagamento = datetime.now() - timedelta(days=random.randint(0, 5))
    valor_parcela_estimado = valor_emprestimo / prazo_meses if prazo_meses > 0 else valor_emprestimo
    valor_pago = round(random.uniform(valor_parcela_estimado * 0.8, valor_parcela_estimado * 1.2), 2)
    status_pagamento = random.choice(['Pago', 'Pendente', 'Em Atraso'] if random.random() < 0.3 else ['Pago']) # Mais chances de estar pago
    return {
        'pagamento_id': pagamento_id,
        'emprestimo_id': emprestimo_id,
        'data_pagamento': data_pagamento,
        'valor_pago': valor_pago,
        'status_pagamento': status_pagamento
    }

# Função para inserir um novo pagamento no RDS
def insert_payment(payment_data, db_host, db_port, db_name, db_user, db_password):
    try:
        engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
        df_payment = pd.DataFrame([payment_data])
        df_payment.to_sql('pagamentos', engine, if_exists='append', index=False)
        print(f"Novo pagamento inserido para emprestimo_id: {payment_data['emprestimo_id']} com pagamento_id: {payment_data['pagamento_id']}")
    except Exception as e:
        print(f"Erro ao inserir novo pagamento no RDS: {e}")
    finally:
        if 'engine' in locals():
            engine.dispose()

if __name__ == "__main__":
    cliente_ids = get_all_cliente_ids(DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD)
    if not cliente_ids:
        print("Não foram encontrados cliente_ids no banco de dados. Iniciando com a criação de um novo cliente.")
        new_client = generate_new_cliente()
        new_client_id = insert_new_cliente(new_client, DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD)
        cliente_ids = [new_client_id] if new_client_id else []

    print("Iniciando a simulação de ingestão de dados em tempo real...")
    while True:
        # Chance de criar um novo cliente
        if random.random() < 0.1: # 10% de chance de criar um novo cliente a cada iteração
            new_client = generate_new_cliente()
            new_client_id = insert_new_cliente(new_client, DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD)
            if new_client_id:
                cliente_ids.append(new_client_id)

        if cliente_ids:
            # Gerar e inserir transação para um cliente existente
            new_transaction = generate_realtime_transaction(cliente_ids)
            insert_realtime_transaction(new_transaction, DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD)

            # Chance de gerar um novo empréstimo para um cliente existente
            cliente_para_emprestimo = random.choice(cliente_ids)
            if random.random() < 0.05: # 5% de chance de gerar um novo empréstimo
                new_loan = generate_new_loan(cliente_para_emprestimo)
                insert_new_loan(new_loan, DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD)
                if new_loan['data_aprovacao']: # Se o empréstimo foi aprovado, gerar um pagamento inicial
                    new_payment = generate_payment(new_loan['emprestimo_id'], new_loan['valor_emprestimo'], new_loan['prazo_meses'])
                    insert_payment(new_payment, DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD)

        time.sleep(random.uniform(1, 5)) # Simula um evento ocorrendo a cada 1 a 5 segundos