In [1]:
val spark = SparkSession.builder()
  .appName("Average_rating")
  .getOrCreate()

spark = org.apache.spark.sql.SparkSession@1b342d26


org.apache.spark.sql.SparkSession@1b342d26

In [2]:
// Define GCS paths
val bucketName = "movielens_dataset"
val moviesPath = s"gs://$bucketName/movie.csv"
val ratingsPath = s"gs://$bucketName/rating.csv"

bucketName = movielens_dataset
moviesPath = gs://movielens_dataset/movie.csv
ratingsPath = gs://movielens_dataset/rating.csv


gs://movielens_dataset/rating.csv

In [3]:
// Load the movies and ratings DataFrames
val moviesDF = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(moviesPath)

val ratingsDF = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(ratingsPath)

// Show some rows to verify
moviesDF.show(5)
ratingsDF.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+
only showing top 5 rows



moviesDF = [movieId: int, title: string ... 1 more field]
ratingsDF = [userId: int, movieId: int ... 2 more fields]


[userId: int, movieId: int ... 2 more fields]

In [35]:
println(s"Movies: ${moviesDF.count()}")
println(s"Ratings: ${ratingsDF.count()}")

Movies: 27278
Ratings: 20000263


In [4]:
import org.apache.spark.sql.functions._ // Import explode, split, and col


In [5]:
// Cast the 'rating' column to Double and drop rows where casting fails (i.e., invalid data)
val ratingsDFCleaned = ratingsDF
  .withColumn("rating", col("rating").cast("double"))
  .na.drop() // Drop rows where 'rating' is null after casting

ratingsDFCleaned = [userId: int, movieId: int ... 2 more fields]


[userId: int, movieId: int ... 2 more fields]

In [6]:
// Join movies and ratings on movieId
val joinedDF = moviesDF.join(ratingsDF, "movieId")

// Parse and explode genres column
val explodedDF = joinedDF
  .withColumn("genre", explode(split(col("genres"), "\\|")))
  .select("movieId", "title", "genre", "rating")

joinedDF = [movieId: int, title: string ... 4 more fields]
explodedDF = [movieId: int, title: string ... 2 more fields]


[movieId: int, title: string ... 2 more fields]

In [7]:
explodedDF.show(10)


+-------+--------------------+---------+------+
|movieId|               title|    genre|rating|
+-------+--------------------+---------+------+
|      2|      Jumanji (1995)|Adventure|   3.5|
|      2|      Jumanji (1995)| Children|   3.5|
|      2|      Jumanji (1995)|  Fantasy|   3.5|
|     29|City of Lost Chil...|Adventure|   3.5|
|     29|City of Lost Chil...|    Drama|   3.5|
|     29|City of Lost Chil...|  Fantasy|   3.5|
|     29|City of Lost Chil...|  Mystery|   3.5|
|     29|City of Lost Chil...|   Sci-Fi|   3.5|
|     32|Twelve Monkeys (a...|  Mystery|   3.5|
|     32|Twelve Monkeys (a...|   Sci-Fi|   3.5|
+-------+--------------------+---------+------+
only showing top 10 rows



In [8]:
val genreMapping = Map(
  "Sci-Fi" -> "Science Fiction",
  "Rom-Com" -> "Romantic Comedy",
  "Thriller" -> "Suspense"
)

// Broadcast the mapping for efficient access across nodes
val broadcastMapping = spark.sparkContext.broadcast(genreMapping)

// Convert the exploded DataFrame to RDD
val genreRatingRDD = explodedDF.rdd.map(row => {
  val genre = row.getAs[String]("genre")
  val rating = row.getAs[Double]("rating") // Rating is now a Double

  // Apply the custom mapping to handle inconsistent genre names
  val standardizedGenre = broadcastMapping.value.getOrElse(genre, genre)

  (standardizedGenre, (rating, 1)) // Return (genre, (rating, 1))
})

genreMapping = Map(Sci-Fi -> Science Fiction, Rom-Com -> Romantic Comedy, Thriller -> Suspense)
broadcastMapping = Broadcast(17)
genreRatingRDD = MapPartitionsRDD[46] at map at <console>:38


MapPartitionsRDD[46] at map at <console>:38

In [9]:
val elements = genreRatingRDD.mapPartitionsWithIndex((index, iter) =>
      Iterator((index, iter.take(5).toList))
    ).collect()
elements.foreach { case (partition, elements) =>
  println(s"Partition $partition: ${elements.mkString(", ")}")
}

Partition 0: (Adventure,(3.5,1)), (Children,(3.5,1)), (Fantasy,(3.5,1)), (Adventure,(3.5,1)), (Drama,(3.5,1))
Partition 1: (Action,(4.0,1)), (Crime,(4.0,1)), (Drama,(4.0,1)), (Suspense,(4.0,1)), (Comedy,(4.5,1))
Partition 2: (Action,(5.0,1)), (Fantasy,(5.0,1)), (Horror,(5.0,1)), (Suspense,(5.0,1)), (Adventure,(5.0,1))
Partition 3: (Comedy,(4.0,1)), (Action,(4.5,1)), (Crime,(4.5,1)), (Drama,(4.5,1)), (Suspense,(4.5,1))
Partition 4: (Documentary,(5.0,1)), (Documentary,(4.5,1)), (Documentary,(5.0,1)), (Drama,(5.0,1)), (Drama,(5.0,1))
Partition 5: (Comedy,(3.5,1)), (Romance,(3.5,1)), (Drama,(3.5,1)), (Fantasy,(3.5,1)), (Mystery,(3.5,1))


elements = Array((0,List((Adventure,(3.5,1)), (Children,(3.5,1)), (Fantasy,(3.5,1)), (Adventure,(3.5,1)), (Drama,(3.5,1)))), (1,List((Action,(4.0,1)), (Crime,(4.0,1)), (Drama,(4.0,1)), (Suspense,(4.0,1)), (Comedy,(4.5,1)))), (2,List((Action,(5.0,1)), (Fantasy,(5.0,1)), (Horror,(5.0,1)), (Suspense,(5.0,1)), (Adventure,(5.0,1)))), (3,List((Comedy,(4.0,1)), (Action,(4.5,1)), (Crime,(4.5,1)), (Drama,(4.5,1)), (Suspense,(4.5,1)))), (4,List((Documentary,(5.0,1)), (Documentary,(4.5,1)), (Documentary,(5.0,1)), (Drama,(5.0,1)), (Drama,(5.0,1)))), (5,List((Comedy,(3.5,1)), (Romance,(3.5,1)), (Drama,(3.5,1)), (Fantasy,(3.5,1)), (Mystery,(3.5,1)))))


Array((0,List((Adventure,(3.5,1)), (Children,(3.5,1)), (Fantasy,(3.5,1)), (Adventure,(3.5,1)), (Drama,(3.5,1)))), (1,List((Action,(4.0,1)), (Crime,(4.0,1)), (Drama,(4.0,1)), (Suspense,(4.0,1)), (Comedy,(4.5,1)))), (2,List((Action,(5.0,1)), (Fantasy,(5.0,1)), (Horror,(5.0,1)), (Suspense,(5.0,1)), (Adventure,(5.0,1)))), (3,List((Comedy,(4.0,1)), (Action,(4.5,1)), (Crime,(4.5,1)), (Drama,(4.5,1)), (Suspense,(4.5,1)))), (4,List((Documentary,(5.0,1)), (Documentary,(4.5,1)), (Documentary,(5.0,1)), (Drama,(5.0,1)), (Drama,(5.0,1)))), (5,List((Comedy,(3.5,1)), (Romance,(3.5,1)), (Drama,(3.5,1)), (Fantasy,(3.5,1)), (Mystery,(3.5,1)))))

In [10]:
// Use reduceByKey to sum ratings and count for each genre
val genreRatingSumCount = genreRatingRDD.reduceByKey((a, b) => 
  (a._1 + b._1, a._2 + b._2) // Aggregate sum of ratings and count
)

// Calculate average ratings
val genreAverageRDD = genreRatingSumCount.mapValues {
  case (sum, count) => sum / count // Calculate average
}

// Convert the result back to DataFrame
val genreAverageDF = genreAverageRDD.toDF("genre", "average_rating")

// Show the results
genreAverageDF.show()

+------------------+------------------+
|             genre|    average_rating|
+------------------+------------------+
|           Fantasy|3.5059453358738244|
|          Suspense|  3.50711121809216|
|            Action|  3.44386376493354|
|          Children|3.4081137685270444|
|           Mystery| 3.663508921312903|
|           Romance| 3.541802581902903|
|         Film-Noir|  3.96538126070082|
|           Western|3.5704980246109406|
|           Musical| 3.558090628821412|
|            Horror|3.2772238097518307|
|         Adventure|3.5018926565473865|
|              IMAX| 3.655945983272606|
|            Comedy|3.4260113054324886|
|               War|3.8095307347384844|
|   Science Fiction|3.4367726714455005|
|             Crime|3.6745276025631113|
|         Animation|3.6174939235897994|
|       Documentary|3.7397176834178865|
|(no genres listed)|3.0069252077562325|
|             Drama|3.6742955093068264|
+------------------+------------------+



genreRatingSumCount = ShuffledRDD[48] at reduceByKey at <console>:28
genreAverageRDD = MapPartitionsRDD[49] at mapValues at <console>:33
genreAverageDF = [genre: string, average_rating: double]


[genre: string, average_rating: double]

In [12]:
// Save the DataFrame to Parquet format in HDFS or GCS
val outputPath = s"hdfs:///user/caseStudy1"
genreAverageDF.write.mode("overwrite").parquet(outputPath)

lastException = null
outputPath = hdfs:///user/caseStudy1


hdfs:///user/caseStudy1

In [13]:
spark.stop()