In [20]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Step 1: Initialize SparkSession
val spark = SparkSession.builder()
  .appName("Genre-Specific Data Aggregation with RDD")
  .getOrCreate()

// Step 2: Load DataFrames from GCP
val moviesPath = "gs://deva_vasadi/movie_lens_data/movies.csv"
val ratingsPath = "gs://deva_vasadi/movie_lens_data/ratings.csv"

val moviesDF = spark.read.option("header", "true").csv(moviesPath)
val ratingsDF = spark.read.option("header", "true").csv(ratingsPath)

// Step 3: Explode genres
val moviesGenresDF = moviesDF
  .withColumn("genre", explode(split(col("genres"), "\\|")))
  .drop("genres")

// Step 3.b: Normalize genres and filter out "(no genres listed)"
def normalizeGenre(genre: String): String = genre match {
  case "Sci-Fi" => "Science Fiction"
  case "Thrillers" => "Thriller"
  case "(no genres listed)" => null // Mark as null for filtering
  case other => other
}

val normalizeGenreUDF = udf(normalizeGenre _)

val filteredMoviesGenresDF = moviesGenresDF
  .withColumn("genre", normalizeGenreUDF(col("genre")))
  .filter(col("genre").isNotNull) // Remove null rows

// Step 4: Convert to RDDs for Transformations

// Convert DataFrames to RDDs
val moviesGenresRDD = filteredMoviesGenresDF.rdd.map(row => (row.getString(0), row.getString(2))) // (movieId, genre)
val ratingsRDD = ratingsDF.rdd.map(row => (row.getString(1), row.getString(2).toDouble)) // (movieId, rating)

// Join RDDs on movieId
val joinedRDD = moviesGenresRDD.join(ratingsRDD) // (movieId, (genre, rating))


+--------------------+------------------+
|               genre|    average_rating|
+--------------------+------------------+
|                IMAX| 3.593312447839248|
| We're Comin' To ...|               2.0|
|               Crime|3.6917711184948736|
|             Fantasy| 3.512174705402107|
|             Western|3.6001753109842554|
|     Science Fiction|3.4916991949223912|
|           Animation|3.6153322869262636|
|            Thriller|3.5317020152396505|
|           Film-Noir| 3.915774014636868|
|              Horror|3.3071549944529486|
|           Adventure|3.5234385724723545|
|             Romance|3.5450028644529983|
|         Documentary|3.6911815290871948|
|                 War|3.7916994435766664|
|              Comedy|3.4323858239436777|
|              Action| 3.476407141777424|
|             Musical| 3.554276956937205|
|            Children|3.4392409733948646|
|             Mystery| 3.673102967818112|
|               Drama|3.6824540581800784|
+--------------------+------------

spark = org.apache.spark.sql.SparkSession@663c3598
moviesPath = gs://deva_vasadi/movie_lens_data/movies.csv
ratingsPath = gs://deva_vasadi/movie_lens_data/ratings.csv
moviesDF = [movieId: string, title: string ... 1 more field]
ratingsDF = [userId: string, movieId: string ... 2 more fields]
moviesGenresDF = [movieId: string, title: string ... 1 more field]
normalizeGenreUDF = SparkUserDefinedFunction($Lambda$6598/0x0000000802007040@af8e415,StringType,List(Some(class[value[...


normalizeGenre: (genre: String)String


SparkUserDefinedFunction($Lambda$6598/0x0000000802007040@af8e415,StringType,List(Some(class[value[...

In [None]:
// Step 5: Map and Reduce by Key
// Map to (genre, (rating, 1)) for aggregation
val genreRatingsRDD = joinedRDD.map {
  case (_, (genre, rating)) => (genre, (rating, 1))
}

// Reduce by key to calculate total ratings and counts for each genre
val genreTotalsRDD = genreRatingsRDD.reduceByKey {
  case ((sum1, count1), (sum2, count2)) => (sum1 + sum2, count1 + count2)
}

// Calculate average rating for each genre
val genreAveragesRDD = genreTotalsRDD.mapValues {
  case (totalRating, count) => totalRating / count
}

// Step 6: Save to HDFS in Parquet Format

// Convert to DataFrame for saving
val genreAveragesDF = genreAveragesRDD.toDF("genre", "average_rating")

val outputPath = "hdfs:///user/casestudies/casestudy1/average_genre_ratings"
genreAveragesDF.write.mode("overwrite").parquet(outputPath)

// Show results
genreAveragesDF.show()