In [5]:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.RDD
import scala.util.parsing.json.JSON
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import scala.util.Random


val spark = SparkSession.builder()
      .appName("EnrichMovieMetadata")
      .master("local[*]")
      .getOrCreate()

 val moviesDF = spark.read.option("header", "true").csv("gs://gcs_bucket_rupal/movies.csv")

val extractYear = udf((title: String) => {
  val yearPattern = "\\((\\d{4})\\)".r
  yearPattern.findFirstMatchIn(title).map(_.group(1)).getOrElse {
    (1980 + Random.nextInt(2024 - 1980 + 1)).toString
  }
})

val metadataDF = moviesDF
  .select("movieId", "title")
  .withColumn("releaseYear", extractYear(col("title")))

metadataDF.coalesce(1) // Ensures a single output file
  .write
  .mode("overwrite")
  .json("gs://gcs_bucket_rupal/metadata.json")

spark = org.apache.spark.sql.SparkSession@65f807a4
moviesDF = [movieId: string, title: string ... 1 more field]
extractYear = SparkUserDefinedFunction($Lambda$4932/0x0000000801bf3840@3b0a429b,StringType,List(Some(class[value[0]: string])),Some(class[value[0]: string]),None,true,true)
metadataDF = [movieId: string, title: string ... 1 more field]


[movieId: string, title: string ... 1 more field]

In [7]:

val metadataPath = s"gs://gcs_bucket_rupal/metadata.json"
val metadataRDD: RDD[String] = spark.sparkContext.textFile(metadataPath)

val parsedMetadataRDD: RDD[(Int, Int)] = metadataRDD.map { line =>
  JSON.parseFull(line) match {
    case Some(json: Map[String, Any]) =>
      val movieId = json.get("movieId").map(_.toString.toInt)
      val releaseYear = json.get("releaseYear").map(_.toString.toInt)
      (movieId.get, releaseYear.get)
  }
}

val metadataDF = parsedMetadataRDD.toDF("movieId", "releaseYear")

val moviesRDD: RDD[(Int, (String, String))] = moviesDF.rdd.map(row => {
  val movieId = row.getAs[Int]("movieId")
  val title = row.getAs[String]("title")
  val genres = row.getAs[String]("genres")
  (movieId, (title, genres))
})

metadataPath = gs://gcs_bucket_rupal/metadata.json
metadataRDD = gs://gcs_bucket_rupal/metadata.json MapPartitionsRDD[24] at textFile at <console>:39
parsedMetadataRDD = MapPartitionsRDD[25] at map at <console>:42
metadataDF = [movieId: int, releaseYear: int]
moviesRDD = MapPartitionsRDD[26] at map at <console>:54


           case Some(json: Map[String, Any]) =>
                           ^
It would fail on the following inputs: None, Some((x: Any forSome x not in scala.collection.immutable.Map[?,?]))
         JSON.parseFull(line) match {
                       ^


MapPartitionsRDD[26] at map at <console>:54

In [11]:
val enrichedRDD: RDD[(Int, (String, String))] = moviesRDD.leftOuterJoin(parsedMetadataRDD).mapValues { 
    case ((title, genres), releaseYear) =>
        var enrichedTitled = title
        if (!title.matches(".*\\(\\d{4}\\)$")) {
            enrichedTitled = s"$title (${releaseYear.get})"
        }
        (enrichedTitled, genres)
}

val moviesDF: DataFrame = enrichedRDD.map {
  case (movieId, (title, genres)) =>
    (movieId, title, genres)
}.toDF("movieId", "title", "genres")

val missingYearsCount = moviesDF.filter(!col("title").rlike("\\(\\d{4}\\)$")).count()
if (missingYearsCount > 0) {
  println(s"Warning: $missingYearsCount movies missing releaseYear.")
} else {
  println("All movies have a releaseYear.")
}

val outputParquetPath = "hdfs://gcs_bucket_rupal/user//enriched-movies.parquet"
moviesDF.write.mode("overwrite").parquet(outputParquetPath)

spark.stop()

lastException = null


org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 6.0 failed 1 times, most recent failure: Lost task 1.0 in stage 6.0 (TID 9) (cluster-a693-m.us-central1-f.c.thermal-slice-441104-a0.internal executor driver): java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer (java.lang.String and java.lang.Integer are in module java.base of loader 'bootstrap')
	at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:103)
	at $line23.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$moviesRDD$1(<console>:55)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:173)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1505)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace: