In [None]:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.rdd.RDD
import scala.util.parsing.json.JSON
import org.apache.spark.sql.DataFrame

In [None]:
// Load movies.csv from GCP Cloud Storage as a DataFrame
val moviePath = "gs://priyanshi-spark-bucket-2/movie.csv"
val moviesDF = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(moviePath)
    .cache()

In [None]:
// Print initial schema and data
println("Movies DataFrame Schema:")
moviesDF.printSchema()
println("Movies DataFrame Preview:")
moviesDF.show(5)

In [None]:
import scala.util.Random
// Step 3: Extract releaseYear from title or assign a random year
val extractYear = udf((title: String) => {
  val yearPattern = "\\((\\d{4})\\)".r
  yearPattern.findFirstMatchIn(title).map(_.group(1)).getOrElse {
    (1980 + Random.nextInt(2024 - 1980 + 1)).toString
  }
})

In [None]:
// Generate metadata DataFrame
val metadataDF = moviesDF
  .select("movieId", "title")
  .withColumn("releaseYear", extractYear(col("title")))

In [None]:
metadataDF.show(5)

In [None]:
val outputPath = s"gs://priyanshi-spark-bucket-2/metadata.json"

metadataDF.coalesce(1)
  .write
  .mode("overwrite")
  .json(outputPath)

println(s"Metadata written at $outputPath!")

In [None]:
// Step 1: Load `movie.csv` as DataFrame
val moviesDF = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("gs://priyanshi-spark-bucket-2/movie.csv")

In [None]:
// Step 2: Load `metadata.json` into RDD

val metadataPath = "gs://priyanshi-spark-bucket-2/metadata.json"
val metadataRDD = spark.sparkContext.textFile(metadataPath)

In [None]:
// Parse JSON to extract `movieId` and `releaseYear`
val parsedMetadataRDD: RDD[(Int, Int)] = metadataRDD.map { line =>
  JSON.parseFull(line) match {
    case Some(json: Map[String, Any]) =>
      val movieId = json.get("movieId").map(_.toString.toInt)
      val releaseYear = json.get("releaseYear").map(_.toString.toInt)
      (movieId.get, releaseYear.get)
  }
}

In [None]:
// Convert metadata RDD to DataFrame
val metadataFromJsonDF = parsedMetadataRDD.toDF("movieId", "releaseYear")

In [None]:
// Convert movies DataFrame to RDD for join
val moviesRDD: RDD[(Int, (String, String))] = moviesDF.rdd.map(row => {
  val movieId = row.getAs[Int]("movieId")
  val title = row.getAs[String]("title")
  val genres = row.getAs[String]("genres")
  (movieId, (title, genres))
})

In [None]:
// Perform RDD join to enrich `releaseYear` where missing
val enrichedRDD: RDD[(Int, (String, String))] = moviesRDD.leftOuterJoin(parsedMetadataRDD).mapValues { 
    case ((title, genres), releaseYear) =>
        var enrichedTitled = title
        if (!title.matches(".*\\(\\d{4}\\)$")) {
            enrichedTitled = s"$title (${releaseYear.get})"
        }
        (enrichedTitled, genres)
}

In [None]:
// Convert RDD back to DataFrame
val moviesDF: DataFrame = enrichedRDD.map {
  case (movieId, (title, genres)) =>
    (movieId, title, genres)
}.toDF("movieId", "title", "genres")

In [None]:
// Validate all movies have `releaseYear`
moviesDF = moviesDF.limit(1000)
val missingYearsCount = moviesDF.filter(!col("title").rlike("\\(\\d{4}\\)$")).count()
if (missingYearsCount > 0) {
  println(s"Warning: $missingYearsCount movies still missing releaseYear.")
} else {
  println("All movies have a releaseYear.")
}

In [None]:
// Step 6: Save the enriched DataFrame as Parquet in HDFS
val outputParquetPath = "hdfs:///user/priyanshi/enriched-movies.parquet"
enrichedMoviesDF.write.mode("overwrite").parquet(outputParquetPath)

println(s"Enriched movies data saved to $outputParquetPath")

// Stop Spark Session
spark.stop()