In [None]:
# Importieren der erforderlichen Bibliotheken
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import col, isnan
from functools import reduce

In [None]:
# SparkSession initialisieren und Speicher konfigurieren
spark = SparkSession.builder \
    .appName("RecommenderSystem") \
    .config("spark.driver.memory", "32g") \
    .config("spark.executor.memory", "32g") \
    .config("spark.executor.instances", "8") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.memory.fraction", "0.75") \
    .config("spark.executor.heartbeatInterval", "30s") \
    .config("spark.network.timeout", "300s") \
    .config("spark.sql.pivotMaxValues", "80000000") \
    .getOrCreate()

In [None]:
# Benutzereingaben für Datensatz, Angriffstyp und Angriffsgröße
datensatz = input("Bitte den Datensatznamen eingeben: ")
angriffstyp = input("Bitte den Angriffstyp eingeben: ")
angriffsgröße = input("Bitte die Angriffsgröße eingeben: ")

In [None]:
# Daten einlesen (CSV-Datei mit den entsprechenden Parametern)
dateipfad = f"all_ratings_{datensatz}_{angriffstyp}_{angriffsgröße}.csv"
df = spark.read.csv(dateipfad, header=True, inferSchema=True)

In [None]:
df.show()

In [None]:
# Umbenennung der Item-Spalte in movieId -> Items werden der Einfachheit wegen konstant als Movies bezeichnet; Führe nur aus bei BookCrossing Datensatz
df = df.withColumnRenamed("ISBN", "movieId")
df.show()

In [None]:
# Umbenennung der Item-Spalte in movieId -> Items werden der Einfachheit wegen konstant als Movies bezeichnet; Führe nur aus bei Yelp Datensatz
df = df.withColumnRenamed("businessId", "movieId")
df.show()

In [None]:
# Berechnung der Durchschnittsbewertungen und RatingCount für jeden Film
movie_mean = df.groupBy("movieId").agg(
    F.avg("rating").alias("RatingMean"),
    F.count("rating").alias("RatingCount")
)

In [None]:
# 1. Berechnung von RDMA und WDMA
def calculate_RDMA_WDMA(df, movie_mean):
    # Join der Filmbewertungen mit dem Durchschnitt und der Anzahl der Bewertungen pro Film
    df_with_mean = df.join(movie_mean, on="movieId")
    
    # Berechnung von RDMA und WDMA pro Film-Bewertung
    df_with_rdma = df_with_mean.withColumn(
        "RDMA",
        F.abs(F.col("rating") - F.col("RatingMean")) / F.col("RatingCount")
    )

    df_with_wdma = df_with_rdma.withColumn(
        "WDMA",
        F.abs(F.col("rating") - F.col("RatingMean")) / (F.col("RatingCount") ** 2)
    )
    
    return df_with_wdma

# Anwendung der Berechnungen
df_rdma_wdma = calculate_RDMA_WDMA(df, movie_mean)

# Aggregation auf Benutzerebene
user_profiles = df_rdma_wdma.groupBy("userId").agg(
    F.first("Label").alias("Label"),  # Behalte das Label bei
    F.sum("RDMA").alias("RDMA_sum"),  # Summe der RDMA-Werte für alle bewerteten Filme
    F.sum("WDMA").alias("WDMA_sum"),  # Summe der WDMA-Werte für alle bewerteten Filme
    F.count("movieId").alias("NumRatings")  # Anzahl der bewerteten Filme pro Benutzer
)

# Berechnung des Durchschnitts für RDMA und WDMA
user_profiles = user_profiles.withColumn(
    "RDMA", F.col("RDMA_sum") / F.col("NumRatings")  # Durchschnittlicher RDMA
).withColumn(
    "WDMA", F.col("WDMA_sum") / F.col("NumRatings")  # Durchschnittlicher WDMA
).drop("RDMA_sum", "WDMA_sum")  # Entferne die Zwischensummen, um die Ausgabe sauber zu halten

In [None]:
# 1. Berechnung der durchschnittlichen Profillänge
avg_profile_length = df_rdma_wdma.groupBy("userId").agg(F.count("movieId").alias("NumRatings")) \
    .agg(F.avg("NumRatings").alias("AvgProfileLength")).collect()[0][0]

# 2. Berechnung der Profillängen für jeden Benutzer
user_profile_lengths = df_rdma_wdma.groupBy("userId").agg(F.count("movieId").alias("ProfileLength"))

# 3. Berechnung der quadrierten Differenzen zur durchschnittlichen Profillänge
length_variance_df = user_profile_lengths.withColumn(
    "SquaredDifference",
    (F.col("ProfileLength") - avg_profile_length) ** 2
)

# 4. Berechnung des Nenners (Summe der quadrierten Differenzen)
denominator = length_variance_df.agg(F.sum("SquaredDifference")).collect()[0][0]

# 5. Berechnung von LengthVariance für jeden Benutzer
user_profiles = user_profiles.join(user_profile_lengths, on="userId") \
    .withColumn(
        "LengthVariance",
        F.abs(F.col("ProfileLength") - avg_profile_length) / denominator
    ).drop("ProfileLength")  # Entferne die ProfileLength Spalte, um Mehrdeutigkeit zu vermeiden

In [None]:
# 1. Berechne den Durchschnitt der Bewertungen für jeden Nutzer
ratings_self = df.withColumnRenamed("userId", "user1").withColumnRenamed("rating", "rating1")
ratings_other = df.withColumnRenamed("userId", "user2").withColumnRenamed("rating", "rating2")

# 2. Join: Paarbildung basierend auf gemeinsamen Filmen
paired_ratings = ratings_self.join(
    ratings_other,
    (ratings_self.movieId == ratings_other.movieId) & (ratings_self.user1 < ratings_other.user2)
)

# 3. Durchschnittsbewertungen der Nutzer berechnen
user_avg_ratings = df.groupBy("userId").agg(F.avg("rating").alias("avg_rating"))

# 4. Join mit den Durchschnittswerten
paired_ratings = paired_ratings.join(
    user_avg_ratings.withColumnRenamed("userId", "user1").withColumnRenamed("avg_rating", "avg_rating1"),
    "user1"
).join(
    user_avg_ratings.withColumnRenamed("userId", "user2").withColumnRenamed("avg_rating", "avg_rating2"),
    "user2"
)

# 5. Berechne Zähler (Summe der Produkte der Abweichungen der Bewertungen)
paired_ratings = paired_ratings.withColumn(
    "numerator",
    (paired_ratings["rating1"] - paired_ratings["avg_rating1"]) * 
    (paired_ratings["rating2"] - paired_ratings["avg_rating2"])
)

# 6. Berechne Nenner (Produkt der Wurzeln der quadratischen Abweichungen)
paired_ratings = paired_ratings.withColumn(
    "denominator",
    F.sqrt(
        F.pow(paired_ratings["rating1"] - paired_ratings["avg_rating1"], 2) *
        F.pow(paired_ratings["rating2"] - paired_ratings["avg_rating2"], 2)
    )
)

# 7. Berechne Ähnlichkeit (inkl. negativer Werte)
correlations = paired_ratings.withColumn(
    "similarity",
    F.when(paired_ratings["denominator"] != 0, paired_ratings["numerator"] / paired_ratings["denominator"]).otherwise(0)
)

# 8. Finde die Top-10-Nachbarn basierend auf Ähnlichkeit
window_spec = Window.partitionBy("user1").orderBy(F.col("similarity").desc())
top_neighbors = correlations.withColumn("rank", F.row_number().over(window_spec)).filter(F.col("rank") <= 10)

# 9. Berechne DegSim für jeden Nutzer basierend auf den Top-10-Nachbarn
deg_sim = top_neighbors.groupBy("user1").agg(F.avg("similarity").alias("DegSim"))

# 10. Join mit dem bestehenden user_profiles DataFrame
# Stelle sicher, dass beide DataFrames eine Spalte 'userId' haben.
user_profiles = user_profiles.join(deg_sim, user_profiles.userId == deg_sim.user1, "left_outer") \
                             .drop("user1") \
                             .fillna({"DegSim": 0.0})  # Falls ein Nutzer keinen DegSim hat, setze auf 0

In [None]:
# Zeige die Top-Nachbarn
top_neighbors.show()

# Anzahl der Nachbarn pro Nutzer
top_neighbors.groupBy("user1").count().show()

user_profiles.groupBy("DegSim").count().show()

# Häufigkeiten der DegSim-Werte
user_profiles.groupBy("DegSim").count().orderBy("DegSim").show()

# Zähle die Anzahl der verschiedenen DegSim-Werte
unique_deg_sim_count = user_profiles.select("DegSim").distinct().count()

print(f"Anzahl der verschiedenen DegSim-Werte: {unique_deg_sim_count}")

user_profiles.count()

In [None]:
# Zeige das Schema von user_profiles
user_profiles.printSchema()

In [None]:
# Schritt 1: Berechnung der maximalen Bewertungen (Target-Items) pro Benutzer
user_max_ratings = df.groupBy("userId").agg(F.max("rating").alias("MaxRating"))

# Schritt 2: Berechnung der globalen Durchschnittsbewertungen für jedes Item (movieId)
global_avg_ratings = df.groupBy("movieId").agg(F.avg("rating").alias("GlobalAvgRating"))

# Schritt 3: Berechnung der Target-Items (Items mit MaxRating)
user_target_ratings = df.join(user_max_ratings, on="userId") \
    .filter(F.col("rating") == F.col("MaxRating")) \
    .groupBy("userId") \
    .agg(F.collect_list("rating").alias("TargetRatings"))

# Schritt 4: Filler-Items berechnen (Bewertungen niedriger als MaxRating)
user_filler_ratings_with_id = df.join(user_max_ratings, on="userId") \
    .withColumn("is_lower", F.col("rating") < F.col("MaxRating")) \
    .filter(F.col("is_lower")) \
    .groupBy("userId", "movieId") \
    .agg(F.collect_list("rating").alias("FillerRatings"))

# Schritt 5: Benutzer finden, die keine Bewertungen niedriger als MaxRating haben
users_with_no_lower_ratings = df.join(user_max_ratings, on="userId") \
    .withColumn("is_lower", F.col("rating") < F.col("MaxRating")) \
    .groupBy("userId") \
    .agg(F.sum(F.when(F.col("is_lower"), 1).otherwise(0)).alias("lower_count")) \
    .filter(F.col("lower_count") == 0) \
    .select("userId")

# Schritt 6: Filler-Ratings mit MaxRating als Fallback für Benutzer ohne niedrigere Bewertungen
user_filler_ratings_with_id_fallback = df.join(user_max_ratings, on="userId") \
    .join(users_with_no_lower_ratings, on="userId", how="inner") \
    .withColumn("is_max", F.col("rating") == F.col("MaxRating")) \
    .filter(F.col("is_max")) \
    .groupBy("userId", "movieId") \
    .agg(F.collect_list("rating").alias("FillerRatings"))

# Schritt 7: Zusammenführen der Filler-Ratings mit Fallback
user_filler_ratings_with_id = user_filler_ratings_with_id.unionByName(user_filler_ratings_with_id_fallback)

# Schritt 8: Berechnung der MeanVar (Abweichung von GlobalAvgRating)
user_filler_with_global_avg = user_filler_ratings_with_id.join(global_avg_ratings, on="movieId", how="left").withColumn(
    "SquaredDifference",
    F.expr("transform(FillerRatings, x -> POWER(x - GlobalAvgRating, 2))")
)

mean_var_per_user = user_filler_with_global_avg.groupBy("userId").agg(
    F.sum(F.expr("aggregate(SquaredDifference, 0D, (acc, x) -> acc + x)")).alias("TotalSquaredDifference"),
    F.sum(F.size("FillerRatings")).alias("TotalFillerRatingsCount")
).withColumn(
    "MeanVar",
    F.col("TotalSquaredDifference") / F.col("TotalFillerRatingsCount")
).select("userId", "MeanVar")

# Schritt 9: Berechnung des Zählers und Nenners für FAC
fac_numerator = user_filler_with_global_avg.groupBy("userId").agg(
    F.sum(F.expr("aggregate(FillerRatings, 0D, (acc, x) -> acc + x - GlobalAvgRating)")).alias("FACNumerator")
)

fac_denominator = user_filler_with_global_avg.groupBy("userId").agg(
    F.sqrt(F.sum(F.expr("aggregate(SquaredDifference, 0D, (acc, x) -> acc + x)"))).alias("FACDenominator")
)

fac_per_user = fac_numerator.join(fac_denominator, on="userId").withColumn(
    "FAC",
    F.when(F.col("FACDenominator") != 0, F.col("FACNumerator") / F.col("FACDenominator")).otherwise(0)
).fillna({"FAC": 0}).select("userId", "FAC")  # Falls FAC NULL ist, setze 0

# Schritt 10: Berechnung des globalen Mittelwerts des Systems für FMD
global_mean = df.select(F.avg("rating").alias("GlobalMean")).collect()[0]["GlobalMean"]

# Schritt 11: Berechnung der absoluten Differenz zur globalen Durchschnittsbewertung für FMD
fmd_per_user = user_filler_ratings_with_id.groupBy("userId").agg(
    F.avg(F.expr(f"aggregate(transform(FillerRatings, x -> ABS(x - {global_mean})), 0D, (acc, x) -> acc + x)")).alias("FMD")
)

# Schritt 12: Berechnung von FMTD (Target Mean - Filler Mean)
# Zunächst werden die Target- und Filler-Ratings explodiert
user_filler_ratings_exploded = user_filler_ratings_with_id.withColumn("FillerRating", F.explode("FillerRatings"))
user_target_ratings_exploded = user_target_ratings.withColumn("TargetRating", F.explode("TargetRatings"))

# Gruppieren nach userId und Berechnen des Mittelwerts für beide Gruppen
fmt_per_user = user_target_ratings_exploded.groupBy("userId").agg(
    F.avg("TargetRating").alias("TargetMean")
).join(
    user_filler_ratings_exploded.groupBy("userId").agg(
        F.avg("FillerRating").alias("FillerMean")
    ),
    on="userId",
    how="inner"
).withColumn(
    "FMTD",
    F.abs(F.col("TargetMean") - F.col("FillerMean"))
).select("userId", "FMTD")


# Schritt 13: Zusammenführen aller Maße ins Benutzerprofil
user_profiles = user_profiles.join(mean_var_per_user, on="userId", how="left") \
    .join(fac_per_user, on="userId", how="left") \
    .join(fmd_per_user, on="userId", how="left") \
    .join(fmt_per_user, on="userId", how="left")

# Schritt 14: Sicherstellen, dass `user_profiles` eindeutig pro Benutzer ist durch Entfernung eventueller Duplikate
user_profiles = user_profiles.dropDuplicates(["userId"])

In [None]:
# Nicht mehr benötigte Spalten entfernen
user_profiles = user_profiles.drop("NumRatings")

In [None]:
# Zeige erneut das Schema von user_profiles
user_profiles.printSchema()

In [None]:
# Ausgabe des finalen DataFrames
user_profiles.show()

In [None]:
# Runden der Werte und Umwandlung in String, um wissenschaftliche Notation zu vermeiden
user_profiles = user_profiles.withColumn("RDMA", F.format_number(F.round("RDMA", 10), 10)) \
                             .withColumn("WDMA", F.format_number(F.round("WDMA", 10), 10)) \
                             .withColumn("LengthVariance", F.format_number(F.round("LengthVariance", 10), 10)) \
                             .withColumn("MeanVar", F.format_number(F.round("MeanVar", 10), 10)) \
                             .withColumn("FMTD", F.format_number(F.round("FMTD", 10), 10)) \
                             .withColumn("FAC", F.format_number(F.round("FAC", 10), 10)) \
                             .withColumn("FMD", F.format_number(F.round("FMD", 10), 10)) \
                             .withColumn("DegSim", F.format_number(F.round("DegSim", 10), 10))

# Sortieren nach userId
user_profiles = user_profiles.orderBy("userId", ascending=True)

# Ausgabe des finalen DataFrames
user_profiles.show(truncate=False)

In [None]:
# Letzte 20 UserIDs anzeigen
last_20_users = user_profiles.orderBy(F.col("UserID").desc()).limit(20)
last_20_users.show(truncate=False)

In [None]:
# Dynamische Bedingung für alle Spalten im DataFrame `user_profiles`
nan_or_null_filter = [isnan(col(c)) | col(c).isNull() for c in user_profiles.columns]
user_profiles_nan = user_profiles.filter(reduce(lambda x, y: x | y, nan_or_null_filter))

# Zeige die Zeilen mit mindestens einem NaN oder NULL-Wert
user_profiles_nan.show()

In [None]:
# Anzahl der NaN- oder Null- Zeilen im gesamten DataFrame
total_rows_nan = user_profiles_nan.count()
print(f"Gesamtanzahl der NaN-Zeilen: {total_rows_nan}")

In [None]:
# Anzahl der Zeilen im gesamten DataFrame
total_rows = user_profiles.count()
print(f"Gesamtanzahl der Zeilen: {total_rows}")

In [None]:
# Dateinamen erstellen (Basierend auf den Benutzereingaben)
dateiname = f"ratings_readyforclassification_{datensatz}_{angriffstyp}_{angriffsgröße}_Ordner.csv"

# DataFrame als CSV speichern
user_profiles.coalesce(1).write.csv(dateiname, header=True)

print(f"DataFrame wurde als {dateiname} gespeichert.")

In [None]:
# Beende die Spark-Session
spark.stop()