In [15]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Initialize SparkSession
val spark = SparkSession.builder()
  .appName("Genre-Specific Data Aggregation Pipeline")
  .getOrCreate()

spark = org.apache.spark.sql.SparkSession@2ee6f812


org.apache.spark.sql.SparkSession@2ee6f812

In [16]:

val moviesPath = "gs://bhargav-assignments/Day16And17Task/movie.csv"
val ratingsPath = "gs://bhargav-assignments/Day16And17Task/rating.csv"
val outputPath = "hdfs:///user/Day16AndDay17/case_study_1/genre_avg_ratings.parquet"

val moviesDF = spark.read.option("header", "true").csv(moviesPath)
val ratingsDF = spark.read.option("header", "true").csv(ratingsPath)

moviesPath = gs://bhargav-assignments/Day16And17Task/movie.csv
ratingsPath = gs://bhargav-assignments/Day16And17Task/rating.csv
outputPath = hdfs:///user/Day16AndDay17/case_study_1/genre_avg_ratings.parquet
moviesDF = [movieId: string, title: string ... 1 more field]
ratingsDF = [userId: string, movieId: string ... 2 more fields]


[userId: string, movieId: string ... 2 more fields]

In [17]:
// Split genres, explode into individual rows, and filter out "(no genres listed)"
val explodedMoviesDF = moviesDF
  .filter(col("genres").isNotNull && col("genres") =!= "") // Exclude null or empty genres
  .withColumn("genre", explode(split(col("genres"), "\\|"))) // Split and explode into individual rows
  .filter(trim(col("genre")) =!= "(no genres listed)") // Filter out "(no genres listed)"

// Debugging step: Check the exploded data
println("Exploded Movies DataFrame:")
explodedMoviesDF.show(10, truncate = false)


Exploded Movies DataFrame:
+-------+-----------------------+-------------------------------------------+---------+
|movieId|title                  |genres                                     |genre    |
+-------+-----------------------+-------------------------------------------+---------+
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|Adventure|
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|Animation|
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|Children |
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|Comedy   |
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|Fantasy  |
|2      |Jumanji (1995)         |Adventure|Children|Fantasy                 |Adventure|
|2      |Jumanji (1995)         |Adventure|Children|Fantasy                 |Children |
|2      |Jumanji (1995)         |Adventure|Children|Fantasy                 |Fantasy  |
|3   

explodedMoviesDF = [movieId: string, title: string ... 2 more fields]


[movieId: string, title: string ... 2 more fields]

In [18]:
// Define a mapping for inconsistent genre names
val genreMapping = Map("Sci-Fi" -> "Science Fiction", "Rom-Com" -> "Romantic Comedy")
val mappingUDF = udf((genre: String) => genreMapping.getOrElse(genre, genre))

// Apply genre mapping to normalize genre names
val mappedMoviesDF = explodedMoviesDF.withColumn("genre", mappingUDF(col("genre")))

// Debugging step: Check the mapped genres
println("Mapped Movies DataFrame:")
mappedMoviesDF.show(10, truncate = false)

Mapped Movies DataFrame:
+-------+-----------------------+-------------------------------------------+---------+
|movieId|title                  |genres                                     |genre    |
+-------+-----------------------+-------------------------------------------+---------+
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|Adventure|
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|Animation|
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|Children |
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|Comedy   |
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|Fantasy  |
|2      |Jumanji (1995)         |Adventure|Children|Fantasy                 |Adventure|
|2      |Jumanji (1995)         |Adventure|Children|Fantasy                 |Children |
|2      |Jumanji (1995)         |Adventure|Children|Fantasy                 |Fantasy  |
|3     

genreMapping = Map(Sci-Fi -> Science Fiction, Rom-Com -> Romantic Comedy)
mappingUDF = SparkUserDefinedFunction($Lambda$6414/0x000000010231a040@3636df9,StringType,List(Some(class[value[0]: string])),Some(class[value[0]: string]),None,true,true)
mappedMoviesDF = [movieId: string, title: string ... 2 more fields]


[movieId: string, title: string ... 2 more fields]

In [19]:
// Join movies with ratings on movieId
val ratingsWithMoviesDF = ratingsDF
  .withColumn("rating", col("rating").cast("float"))
  .join(mappedMoviesDF, Seq("movieId")) 

// Debugging step: Check the joined DataFrame
println("Ratings With Movies DataFrame:")
ratingsWithMoviesDF.show(10, truncate = false)

// Calculate the average rating for each genre
val genreAvgRatingsDF = ratingsWithMoviesDF
  .groupBy("genre")
  .agg(avg("rating").as("avg_rating"))


Ratings With Movies DataFrame:
+-------+------+------+-------------------+---------------------------------------------------------------+--------------------------------------+---------------+
|movieId|userId|rating|timestamp          |title                                                          |genres                                |genre          |
+-------+------+------+-------------------+---------------------------------------------------------------+--------------------------------------+---------------+
|2      |1     |3.5   |2005-04-02 23:53:47|Jumanji (1995)                                                 |Adventure|Children|Fantasy            |Fantasy        |
|2      |1     |3.5   |2005-04-02 23:53:47|Jumanji (1995)                                                 |Adventure|Children|Fantasy            |Children       |
|2      |1     |3.5   |2005-04-02 23:53:47|Jumanji (1995)                                                 |Adventure|Children|Fantasy            |Adventur

ratingsWithMoviesDF = [movieId: string, userId: string ... 5 more fields]
genreAvgRatingsDF = [genre: string, avg_rating: double]


[genre: string, avg_rating: double]

In [20]:
// Debugging step: Check the aggregated results
println("Genre Average Ratings DataFrame:")
genreAvgRatingsDF.show(10, truncate = false)

// Save the results as Parquet to HDFS
genreAvgRatingsDF.write.mode("overwrite").parquet(outputPath)

// Validation step: Load the results back and display
val resultDF = spark.read.parquet(outputPath)
println("Final Output from Parquet:")
resultDF.show(10, truncate = false)

Genre Average Ratings DataFrame:
+---------+------------------+
|genre    |avg_rating        |
+---------+------------------+
|Mystery  |3.663508921312903 |
|Musical  |3.558090628821412 |
|Action   |3.44386376493354  |
|Romance  |3.541802581902903 |
|Thriller |3.50711121809216  |
|Fantasy  |3.5059453358738244|
|Animation|3.6174939235897994|
|Film-Noir|3.96538126070082  |
|IMAX     |3.655945983272606 |
|Drama    |3.6742955093068264|
+---------+------------------+
only showing top 10 rows

Final Output from Parquet:
+---------+------------------+
|genre    |avg_rating        |
+---------+------------------+
|Mystery  |3.663508921312903 |
|Musical  |3.558090628821412 |
|Action   |3.44386376493354  |
|Romance  |3.541802581902903 |
|Thriller |3.50711121809216  |
|Fantasy  |3.5059453358738244|
|Animation|3.6174939235897994|
|Film-Noir|3.96538126070082  |
|IMAX     |3.655945983272606 |
|Drama    |3.6742955093068264|
+---------+------------------+
only showing top 10 rows



resultDF = [genre: string, avg_rating: double]


[genre: string, avg_rating: double]

In [21]:
// Stop the SparkSession
spark.stop()