In [None]:
from kafka import KafkaProducer
import pandas as pd
import json
import glob
import os

def get_latest_file(pattern):
    files = glob.glob(pattern)
    if not files:
        raise FileNotFoundError(f"Keine Datei gefunden für Muster: {pattern}")
    return max(files, key=os.path.getctime)

producer = KafkaProducer(
    bootstrap_servers='172.29.16.101:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# GPU
gpu_gh_df = pd.read_csv(get_latest_file("geizhals_gpu_*.csv"))
for _, row in gpu_gh_df.iterrows():
    producer.send("geizhals-gpu", value=row.to_dict())

# RAM
ram_gh_df = pd.read_csv(get_latest_file("geizhals_ram_*.csv"))
for _, row in ram_gh_df.iterrows():
    producer.send("geizhals-ram", value=row.to_dict())

# SSD
ssd_gh_df = pd.read_csv(get_latest_file("geizhals_ssd_*.csv"))
for _, row in ssd_gh_df.iterrows():
    producer.send("geizhals-ssd", value=row.to_dict())

# CPU (Intel + AMD)
amd_cpu_df = pd.read_csv(get_latest_file("geizhals_amd_cpus_*.csv"))
intel_cpu_df = pd.read_csv(get_latest_file("geizhals_intel_cpus_*.csv"))
cpu_gh_df = pd.concat([amd_cpu_df, intel_cpu_df], ignore_index=True)
for _, row in cpu_gh_df.iterrows():
    producer.send("geizhals-cpu", value=row.to_dict())

# Steam HW Summary
steam_summary_df = pd.read_csv(get_latest_file("steam_hwsurvey_summary_*.csv"))
steam_summary_df["share_numeric"] = steam_summary_df["share"].str.replace("%", "").astype(float) / 100.0
for _, row in steam_summary_df.iterrows():
    producer.send("steam-hwsurvey-summary", value=row.to_dict())

producer.flush()
print("Alle Daten erfolgreich an Kafka gesendet!")


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, DoubleType, IntegerType
from pyspark.sql.functions import from_json, col, avg, regexp_replace

# Spark Session
spark = SparkSession.builder \
    .appName("Geizhals-Steam-Streaming") \
    .master("local[*]") \
    .getOrCreate()

# Schemata
gpu_schema = StructType() \
    .add("Produktname", StringType()) \
    .add("VRAM", StringType()) \
    .add("Preis_EUR", DoubleType())

ram_schema = StructType() \
    .add("Produktname", StringType()) \
    .add("Kapazitaet", StringType()) \
    .add("Preis_EUR", DoubleType()) \
    .add("RAM_Typ", StringType()) \
    .add("CAS_Latency", StringType())

ssd_schema = StructType() \
    .add("Produktname", StringType()) \
    .add("Speicher", StringType()) \
    .add("Preis_EUR", DoubleType())

cpu_schema = StructType() \
    .add("Title", StringType()) \
    .add("Price_EUR", DoubleType()) \
    .add("Kerne", IntegerType()) \
    .add("Turbotakt_GHz", DoubleType()) \
    .add("Basistakt_GHz", DoubleType())

steam_schema = StructType() \
    .add("category", StringType()) \
    .add("most_common", StringType()) \
    .add("share", StringType())

# Helper zum Streamlesen
def read_stream(topic, schema):
    return spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "172.29.16.101:9092") \
        .option("subscribe", topic) \
        .option("startingOffsets", "earliest") \
        .load() \
        .selectExpr("CAST(value AS STRING) as json_str") \
        .select(from_json(col("json_str"), schema).alias("data")) \
        .select("data.*")

# Streams lesen
gpu_df = read_stream("geizhals-gpu", gpu_schema)
ram_df = read_stream("geizhals-ram", ram_schema)
ssd_df = read_stream("geizhals-ssd", ssd_schema)
cpu_df = read_stream("geizhals-cpu", cpu_schema)
steam_df = read_stream("steam-hwsurvey-summary", steam_schema) \
    .withColumn("share_float", regexp_replace(col("share"), "%", "").cast("float") / 100)

# WriteStreams → Parquet speichern
gpu_df.writeStream \
    .format("parquet") \
    .option("path", "/tmp/gpu_data") \
    .option("checkpointLocation", "/tmp/gpu_checkpoint") \
    .outputMode("append") \
    .start()

ram_df.writeStream \
    .format("parquet") \
    .option("path", "/tmp/ram_data") \
    .option("checkpointLocation", "/tmp/ram_checkpoint") \
    .outputMode("append") \
    .start()

ssd_df.writeStream \
    .format("parquet") \
    .option("path", "/tmp/ssd_data") \
    .option("checkpointLocation", "/tmp/ssd_checkpoint") \
    .outputMode("append") \
    .start()

cpu_df.writeStream \
    .format("parquet") \
    .option("path", "/tmp/cpu_data") \
    .option("checkpointLocation", "/tmp/cpu_checkpoint") \
    .outputMode("append") \
    .start()

steam_df.writeStream \
    .format("parquet") \
    .option("path", "/tmp/steam_data") \
    .option("checkpointLocation", "/tmp/steam_checkpoint") \
    .outputMode("append") \
    .start()

# Laufend
spark.streams.awaitAnyTermination()
