In [1]:
val context = sc
val sparkLib = SparkSession.builder().appName("Case Study 1: Genre-Specific Data Aggregation Pipeline").getOrCreate()

Waiting for a Spark session to start...

context = org.apache.spark.SparkContext@23bb08a3
sparkLib = org.apache.spark.sql.SparkSession@40ecb8f


org.apache.spark.sql.SparkSession@40ecb8f

In [2]:
val moviesPath = "gs://task-dataset-bucket/Day_16_17/movie.csv"
val ratingsPath = "gs://task-dataset-bucket/Day_16_17/rating.csv"

moviesPath = gs://task-dataset-bucket/Day_16_17/movie.csv
ratingsPath = gs://task-dataset-bucket/Day_16_17/rating.csv


gs://task-dataset-bucket/Day_16_17/rating.csv

In [3]:
// Creating a schema for data frames
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val moviesSchema = StructType(Array(
  StructField("movieId", IntegerType, true),
  StructField("title", StringType, true),
  StructField("genres", StringType, true)
))

val ratingsSchema = StructType(Array(
  StructField("userId", IntegerType, true),
  StructField("movieId", IntegerType, true),
  StructField("rating", DoubleType, true),
  StructField("timestamp", LongType, true)
))

moviesSchema = StructType(StructField(movieId,IntegerType,true),StructField(title,StringType,true),StructField(genres,StringType,true))
ratingsSchema = StructType(StructField(userId,IntegerType,true),StructField(movieId,IntegerType,true),StructField(rating,DoubleType,true),StructField(timestamp,LongType,true))


StructType(StructField(userId,IntegerType,true),StructField(movieId,IntegerType,true),StructField(rating,DoubleType,true),StructField(timestamp,LongType,true))

In [4]:
// Data-frames of the above Schemas

val moviesDF = sparkLib.read
  .option("header", "true")
  .schema(moviesSchema)
  .csv(moviesPath)

val ratingsDF = sparkLib.read
  .option("header", "true")
  .schema(ratingsSchema)
  .csv(ratingsPath)

moviesDF = [movieId: int, title: string ... 1 more field]
ratingsDF = [userId: int, movieId: int ... 2 more fields]


[userId: int, movieId: int ... 2 more fields]

In [5]:
val explodedMoviesDF = moviesDF
  .withColumn("genre", explode(split(col("genres"), "\\|")))
  .drop("genres")

explodedMoviesDF = [movieId: int, title: string ... 1 more field]


[movieId: int, title: string ... 1 more field]

In [6]:
print(moviesDF.show(5))
print(explodedMoviesDF.show(5))

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

()+-------+----------------+---------+
|movieId|           title|    genre|
+-------+----------------+---------+
|      1|Toy Story (1995)|Adventure|
|      1|Toy Story (1995)|Animation|
|      1|Toy Story (1995)| Children|
|      1|Toy Story (1995)|   Comedy|
|      1|Toy Story (1995)|  Fantasy|
+-------+----------------+---------+
only showing top 5 rows

()

In [7]:
val genreMappings = Map(
  "Sci-Fi" -> "Science Fiction",
  "Rom-Com" -> "Romantic Comedy"
)

val moviesRDD = explodedMoviesDF.rdd.map(row => {
  val movieId = row.getInt(0)
  val title = row.getString(1)
  val genre = genreMappings.getOrElse(row.getString(2), row.getString(2))
  (movieId, title, genre)
})

genreMappings = Map(Sci-Fi -> Science Fiction, Rom-Com -> Romantic Comedy)
moviesRDD = MapPartitionsRDD[14] at map at <console>:35


MapPartitionsRDD[14] at map at <console>:35

In [8]:
moviesRDD.take(10)

Array((1,Toy Story (1995),Adventure), (1,Toy Story (1995),Animation), (1,Toy Story (1995),Children), (1,Toy Story (1995),Comedy), (1,Toy Story (1995),Fantasy), (2,Jumanji (1995),Adventure), (2,Jumanji (1995),Children), (2,Jumanji (1995),Fantasy), (3,Grumpier Old Men (1995),Comedy), (3,Grumpier Old Men (1995),Romance))

In [9]:
val ratingsRDD = ratingsDF.rdd.map(row => (row.getInt(1), row.getDouble(2)))

val joinedRDD = moviesRDD.map(row => (row._1, row._3))
  .join(ratingsRDD)
  .map {case (_, (genre, rating)) => (genre, rating)}

ratingsRDD = MapPartitionsRDD[20] at map at <console>:31
joinedRDD = MapPartitionsRDD[25] at map at <console>:35


MapPartitionsRDD[25] at map at <console>:35

In [10]:
val avgRatingsRDD = joinedRDD
  .mapValues(rating => (rating, 1))
  .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
  .mapValues { case (totalRating, count) => totalRating / count }

avgRatingsRDD = MapPartitionsRDD[28] at mapValues at <console>:33


MapPartitionsRDD[28] at mapValues at <console>:33

In [11]:
val avgRatingsDF = avgRatingsRDD.toDF("genre", "average_rating")

val outputPath = "hdfs:///user/Navadeep/Day_16_17/Case_Study_1/average_ratings.parquet"

avgRatingsDF.write
  .mode("overwrite")
  .parquet(outputPath)

avgRatingsDF = [genre: string, average_rating: double]
outputPath = hdfs:///user/Navadeep/Day_16_17/Case_Study_1/average_ratings.parquet


hdfs:///user/Navadeep/Day_16_17/Case_Study_1/average_ratings.parquet

In [12]:
val resultDF = spark.read.parquet(outputPath)
resultDF.show()

+------------------+------------------+
|             genre|    average_rating|
+------------------+------------------+
|           Western|3.5704980246109406|
|           Musical| 3.558090628821412|
|            Horror|3.2772238097518307|
|          Thriller|  3.50711121809216|
|         Adventure|3.5018926565473865|
|       Documentary|3.7397176834178865|
|(no genres listed)|3.0069252077562325|
|             Drama|3.6742955093068264|
|          Children|3.4081137685270444|
|           Mystery| 3.663508921312903|
|           Romance| 3.541802581902903|
|         Film-Noir|  3.96538126070082|
|               War|3.8095307347384844|
|   Science Fiction|3.4367726714455005|
|             Crime|3.6745276025631113|
|         Animation|3.6174939235897994|
|           Fantasy|3.5059453358738244|
|            Action|  3.44386376493354|
|              IMAX| 3.655945983272606|
|            Comedy|3.4260113054324886|
+------------------+------------------+



resultDF = [genre: string, average_rating: double]


[genre: string, average_rating: double]

In [13]:
val consolidatedDF = spark.read.parquet(outputPath)

consolidatedDF = [genre: string, average_rating: double]


[genre: string, average_rating: double]

In [14]:
val consolidatedOutputPath = "hdfs:///user/Navadeep/Day_16_17/Case_Study_1/consolidated_average_ratings.parquet"

consolidatedOutputPath = hdfs:///user/Navadeep/Day_16_17/Case_Study_1/consolidated_average_ratings.parquet


hdfs:///user/Navadeep/Day_16_17/Case_Study_1/consolidated_average_ratings.parquet

In [15]:
val finalDF = consolidatedDF.coalesce(1)
finalDF.write
  .mode("overwrite")
  .parquet(consolidatedOutputPath)

println(s"Consolidated data written to: $consolidatedOutputPath")

Consolidated data written to: hdfs:///user/Navadeep/Day_16_17/Case_Study_1/consolidated_average_ratings.parquet


finalDF = [genre: string, average_rating: double]


[genre: string, average_rating: double]

In [16]:
val resultDF = spark.read.parquet(consolidatedOutputPath)
resultDF.show()

+------------------+------------------+
|             genre|    average_rating|
+------------------+------------------+
|           Western|3.5704980246109406|
|           Musical| 3.558090628821412|
|            Horror|3.2772238097518307|
|          Thriller|  3.50711121809216|
|         Adventure|3.5018926565473865|
|       Documentary|3.7397176834178865|
|(no genres listed)|3.0069252077562325|
|             Drama|3.6742955093068264|
|          Children|3.4081137685270444|
|           Mystery| 3.663508921312903|
|           Romance| 3.541802581902903|
|         Film-Noir|  3.96538126070082|
|               War|3.8095307347384844|
|   Science Fiction|3.4367726714455005|
|             Crime|3.6745276025631113|
|         Animation|3.6174939235897994|
|           Fantasy|3.5059453358738244|
|            Action|  3.44386376493354|
|              IMAX| 3.655945983272606|
|            Comedy|3.4260113054324886|
+------------------+------------------+



resultDF = [genre: string, average_rating: double]


[genre: string, average_rating: double]