In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType


ModuleNotFoundError: No module named 'pyspark'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType

# Конфигурация Spark
spark = SparkSession.builder \
    .appName("KafkaBatchProcessing") \
    .getOrCreate()

# Настройки Kafka
KAFKA_BROKER = "kafka:9092"
TOPIC_NAME = "test_topic"

# Схема JSON-сообщений
schema = StructType().add("user_id", StringType()).add("event", StringType()).add("timestamp", StringType())

# Читаем данные из Kafka
df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", TOPIC_NAME) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

df = df.selectExpr("CAST(value AS STRING)")

# Десериализуем JSON
df = df.withColumn("data", from_json(col("value"), schema)).select("data.*")

# Запись в PostgreSQL
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres_dwh:5432/dwh_db") \
    .option("dbtable", "kafka_data") \
    .option("user", "dwh_user") \
    .option("password", "dwh_pass") \
    .mode("append") \
    .save()

print("✅ Batch processing completed!")
