In [9]:
from kafka import KafkaProducer
import json
import random
from datetime import datetime, timedelta

# Функция для генерации случайной транзакции
def generate_transaction(user_id):
    transaction_id = random.randint(1000, 9999)
    amount = round(random.uniform(50, 2000), 2)
    timestamp = datetime.now() - timedelta(days=random.randint(0, 30), hours=random.randint(0, 23), minutes=random.randint(0, 59))
    location = random.choice(["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"])
    
    return {
        "transaction_id": transaction_id,
        "user_id": user_id,
        "amount": amount,
        "timestamp": timestamp.isoformat(),
        "location": location
    }

# Создаем продюсера
producer = KafkaProducer(
    bootstrap_servers='localhost:9094',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Генерируем и отправляем транзакции в топик 'transactions'
for user_id in range(1, 101):  # Для 100 пользователей
    for _ in range(10):  # Каждому пользователю по 10 транзакций
        transaction = generate_transaction(user_id)
        producer.send('transactions', transaction)

producer.close()

In [10]:
import sqlite3

# Создаем базу данных и таблицу клиентов
conn = sqlite3.connect('banking_app.db')
cursor = conn.cursor()

# Создаем таблицу клиентов
cursor.execute('''
CREATE TABLE IF NOT EXISTS clients (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    email TEXT NOT NULL,
    phone TEXT NOT NULL
)
''')

# Генерируем и вставляем данные о клиентах
names = ["Alice", "Bob", "Charlie", "David", "Eve"]
for i in range(1, 101):
    cursor.execute('''
    INSERT INTO clients (name, email, phone) VALUES (?, ?, ?)
    ''', (random.choice(names), f'user{i}@example.com', f'123-456-789{i%10}'))

conn.commit()
conn.close()

In [11]:
from kafka import KafkaConsumer
import sqlite3

def is_suspicious(transaction):
    # Пример простой логики проверки: если сумма транзакции больше 1000
    return transaction['amount'] > 1000

# Создаем консюмера
consumer = KafkaConsumer(
    'transactions',
    bootstrap_servers='localhost:9094',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='transaction_group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Подключаемся к базе данных
conn = sqlite3.connect('banking_app.db')
cursor = conn.cursor()

# Обрабатываем транзакции
for message in consumer:
    transaction = message.value
    
    # Получаем информацию о клиенте из базы данных
    cursor.execute('SELECT * FROM clients WHERE id = ?', (transaction['user_id'],))
    client = cursor.fetchone()
    
    if client and is_suspicious(transaction):
        print(f'Suspicious transaction detected: {transaction} for client: {client}')

conn.close()

Suspicious transaction detected: {'transaction_id': 3658, 'user_id': 1, 'amount': 1179.12, 'timestamp': '2024-11-28T05:31:30.035280', 'location': 'Los Angeles'} for client: (1, 'Eve', 'user1@example.com', '123-456-7891')
Suspicious transaction detected: {'transaction_id': 5247, 'user_id': 1, 'amount': 1933.77, 'timestamp': '2024-12-14T01:17:30.035350', 'location': 'Chicago'} for client: (1, 'Eve', 'user1@example.com', '123-456-7891')
Suspicious transaction detected: {'transaction_id': 3747, 'user_id': 1, 'amount': 1407.48, 'timestamp': '2024-12-01T23:30:30.035418', 'location': 'Houston'} for client: (1, 'Eve', 'user1@example.com', '123-456-7891')
Suspicious transaction detected: {'transaction_id': 6865, 'user_id': 1, 'amount': 1528.89, 'timestamp': '2024-11-15T05:05:30.035485', 'location': 'Phoenix'} for client: (1, 'Eve', 'user1@example.com', '123-456-7891')
Suspicious transaction detected: {'transaction_id': 9603, 'user_id': 2, 'amount': 1529.77, 'timestamp': '2024-12-03T00:02:30.035

In [None]:
# Создаем продюсера для подозрительных транзакций
suspicious_producer = KafkaProducer(bootstrap_servers='localhost:9094',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Обрабатываем транзакции
for message in consumer:
    transaction = message.value
    
    # Получаем информацию о клиенте из базы данных
    cursor.execute('SELECT * FROM clients WHERE id = ?', (transaction['user_id'],))
    client = cursor.fetchone()
    
    if client and is_suspicious(transaction):
        print(f'Suspicious transaction detected: {transaction} for client: {client}')
        
        # Отправляем подозрительную транзакцию в новый поток 'suspicious_transactions'
        suspicious_producer.send('suspicious_transactions', transaction)

suspicious_producer.close()