<a href="https://colab.research.google.com/github/Hanamoongit/Hanamoongit/blob/main/google_colab_notebooks/Spark_Ratings.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Analyse der Top-20-Produkte nach durchschnittlicher Bewertung


### Spark Ratings – Google Colab Example

In [None]:
# --- 0. Google Drive mounten ---
from google.colab import drive
drive.mount('/content/drive')
# Bestätige den angezeigten Link und kopiere den Autorisierungscode hierher.

# Pfad zu deiner Datei in Google Drive:
# "Meine Ablage" entspricht "/content/drive/My Drive/"
# wenn dein Unterordner "BDS-Bibliotheken" heißt, nutze genau diesen Namen.
drive_path = "/content/drive/My Drive/BDS Bibliotheken/ratings.csv"


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# --- 1. Spark-Session initialisieren (lokal) ---
# Import der SparkSession-Klasse zur Initialisierung der Spark-Anwendung

from pyspark.sql import SparkSession
# Import der benötigten Funktionen für DataFrame-Transformationen
from pyspark.sql.functions import col, avg, from_unixtime, to_date

#-------------------------------------------------------------------
#original code:
#
# Erstellt oder erhält eine SparkSession mit dem Namen "RatingsAnalysis"
#spark = SparkSession.builder \
#    .appName("RatingsAnalysis") \
#    .getOrCreate()
#-------------------------------------------------------------------

spark = (
    SparkSession.builder
        .appName("RatingsAnalysis")
        .master("local[*]")                       # alle lokalen Kerne nutzen
        .config("spark.ui.showConsoleProgress", "true")
        .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")




Quelle:
* https://www.kaggle.com/datasets/skillsmuggler/amazon-ratings
* https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

In [None]:
# --- 2. CSV aus Google Drive einlesen ---
# Die erste Zeile enthält Spaltenüberschriften
# Ermittelt automatisch Datentypen

#----------------------------------------------------------
#original Code
#
# 2. CSV aus HDFS laden (Host und Port anpassen)
#df = spark.read \
#    .option("header", "true") \
#    .option("inferSchema", "true") \
#    .csv("ratings.csv")
#----------------------------------------------------------

df = (
    spark.read
         .option("header", "true")                # erste Zeile = Spaltennamen
         .option("inferSchema", "true")           # Typen automatisch ermitteln
         .csv(drive_path)                         # dein Drive-Pfad
)


In [None]:
# 3. Timestamp-Spalte in lesbares Datum umwandeln
# Neue Spalte "date"
# Konvertiert String in DateType
# Wandelt Unix-Zeit (Sekunden) in Timestamp um
# Zielformat Jahr-Monat-Tag (yyyy-MM-dd)
df = df.withColumn("date", to_date(from_unixtime(col("Timestamp")), "yyyy-MM-dd"))

In [None]:
# 4. Durchschnittliche Bewertung pro Produkt berechnen
# Gruppiert nach Produkt-ID
# Berechnet den Mittelwert der "Rating"-Spalte
avg_ratings = df.groupBy("ProductId") \
    .agg(
        avg(col("Rating")).alias("avg_rating")
    )

In [None]:
# 5. Ergebnisse absteigend nach Durchschnitt sortieren
sorted_ratings = avg_ratings.orderBy(col("avg_rating").desc())

In [None]:
# 6. Top 20 Produkte auswählen
top20 = sorted_ratings.limit(20)

In [None]:
# 7. Anzeige der Top-20 Ergebnisse in der Konsole
top20.show(truncate=False)

+----------+----------+
|ProductId |avg_rating|
+----------+----------+
|B00032OQBY|5.0       |
|B000A24T0A|5.0       |
|B000C1UB6U|5.0       |
|B000FL9EMY|5.0       |
|B0006LPC06|5.0       |
|B000A3XH3E|5.0       |
|B0002Z8XRU|5.0       |
|9790782950|5.0       |
|B0006PKEWS|5.0       |
|B000142VDE|5.0       |
|9790794207|5.0       |
|B0001G734Y|5.0       |
|B00076KR3W|5.0       |
|B000A5CEF4|5.0       |
|B00005NAPK|5.0       |
|B00020YY28|5.0       |
|B0007IMSSW|5.0       |
|B000A7MG96|5.0       |
|B0000X071O|5.0       |
|B0009R349I|5.0       |
+----------+----------+



In [None]:
# 8. Beenden der Spark-Session und Freigabe der Ressourcen
spark.stop()

Weitere Schritte, die möglich wären (für eine End-to-End-Pipeline):
* Erweiterte statistische Analyse (Varianz, Standardabweichung)
* Visualisierung der Ergebnisse
* Speicherung der Resultate im Data Warehouse
* Automatisierung des Workflows mit Airflow oder Oozie
* ...