In [None]:
from confluent_kafka import Producer
import json
import random
import time

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def generate_transaction():
    users = [1, 2, 3, 4, 5]
    return {
        "user_id": random.choice(users),
        "amount": round(random.uniform(10, 1000), 2),
        "transaction_type": random.choice(["deposit", "withdrawal", "transfer"]),
        "timestamp": int(time.time())
    }

while True:
    transaction = generate_transaction()
    producer.produce('transactions', json.dumps(transaction))
    producer.flush()
    time.sleep(1)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, lit
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, LongType


spark = SparkSession.builder \
    .appName("Suspicious Transactions") \
    .getOrCreate()


transaction_schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("amount", FloatType()),
    StructField("transaction_type", StringType()),
    StructField("timestamp", LongType())
])


transactions = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .load() \
    .selectExpr("CAST(value AS STRING)")


parsed_transactions = transactions.select(from_json(col("value"), transaction_schema).alias("data")).select("data.*")

# Чтение данных из базы данных
customer_info = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/hw_12") \
    .option("dbtable", "customer_info") \
    .option("user", "postgres") \
    .option("password", "1111") \
    .load()

# Объединение данных
joined_data = parsed_transactions.join(customer_info, "user_id")

# Логика определения подозрительности
suspicious_transactions = joined_data.filter((col("amount") > col("account_balance")) & (col("transaction_type") == "withdrawal"))

# Вывод результата
query = suspicious_transactions.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()
