In [6]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{explode, col, split}
import scala.util.Random

In [1]:
var spark = SparkSession.builder().appName("metadata using additional JSON files.").master("local[*]").getOrCreate()

spark = org.apache.spark.sql.SparkSession@7713cdba


org.apache.spark.sql.SparkSession@7713cdba

In [4]:
val movie_gcs_path = "gs://artifacts_spark_jobs/movie.csv"
val movie_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(movie_gcs_path)

movie_gcs_path = gs://artifacts_spark_jobs/movie.csv
movie_df = [movieId: int, title: string ... 1 more field]


[movieId: int, title: string ... 1 more field]

In [5]:
val moviesRdd = movie_df.rdd.map {row =>
    val movieId = row.getAs[Int]("movieId")
    val title = row.getAs[String]("title")
    val genres = row.getAs[String]("genres")
    
    (movieId, title, genres)
}

moviesRdd = MapPartitionsRDD[15] at map at <console>:29


MapPartitionsRDD[15] at map at <console>:29

In [7]:
def extractYear(movieId:Int, title:String):(Int, Int) = {
    val indexOfOpenParenthesis = title.lastIndexOf("(")
    val indexOfCloseParenthesis = title.lastIndexOf(")")
    val randomYear = 1950 + Random.nextInt(2030-1950+1).toInt
    if (indexOfOpenParenthesis != -1 && indexOfCloseParenthesis != -1){
        val existingYear = title.substring(indexOfOpenParenthesis+1, indexOfCloseParenthesis)
        if (existingYear.length == 4) {
            (movieId, existingYear.toInt)
        }
        else {
            (movieId, randomYear)
        }
    }else{
        (movieId, randomYear)
    }
}


val dataRdd = moviesRdd.map(record => { 
        val title = record._2
        val movieId = record._1
        val genre = record._3
        extractYear(movieId, title)
})

dataRdd = MapPartitionsRDD[16] at map at <console>:51


extractYear: (movieId: Int, title: String)(Int, Int)


MapPartitionsRDD[16] at map at <console>:51

In [8]:
val movieMetadataDF = dataRdd.toDF("movieId", "releaseYear")

movieMetadataDF = [movieId: int, releaseYear: int]


[movieId: int, releaseYear: int]

In [10]:
val target_storage_path = "gs://artifacts_spark_jobs/movie_metadata/yaseen"

movieMetadataDF.coalesce(1).write.mode("overwrite").json(target_storage_path)

target_storage_path = gs://artifacts_spark_jobs/movie_metadata/yaseen


gs://artifacts_spark_jobs/movie_metadata/yaseen

In [11]:
val movieMetadataJson = spark.read.format("json").option("header", true).option("inderSchema", true).load(target_storage_path)

movieMetadataJson = [movieId: bigint, releaseYear: bigint]


[movieId: bigint, releaseYear: bigint]

In [12]:
val joinedDf = movie_df.join(movieMetadataJson, "movieId")

joinedDf = [movieId: int, title: string ... 2 more fields]


[movieId: int, title: string ... 2 more fields]

In [13]:
def checkYearExists(title: String):Boolean = { 
        val leftParenIndex = title.lastIndexOf("(")
        val rightParenIndex = title.lastIndexOf(")")
    
        if (leftParenIndex != -1 && rightParenIndex != -1) {
            val extractYear = title.substring(leftParenIndex+1, rightParenIndex)
            if (extractYear.length == 4) {
                true
            } else {
                false
            }
        }else{
            false
        }
    }

checkYearExists: (title: String)Boolean


In [14]:
val structuredData = joinedDf.rdd.map(records => { 
                    val movieId = records.get(0).toString.toInt
                    val title = records.get(1).toString
                    val genre = records.get(2).toString
                    val releaseYear = records.get(3).toString.toInt
                    val checkYear = checkYearExists(title)
                    if (!checkYear) {
                        (movieId, s"${title} (${releaseYear})", genre, releaseYear)
                    } else {
                        (movieId, title, genre, releaseYear)
                    }
                })

structuredData = MapPartitionsRDD[39] at map at <console>:34


MapPartitionsRDD[39] at map at <console>:34

In [15]:
val hdfsPath = "hdfs:/user/hdfs/CaseStudies"
structuredData.toDF("movieId", "title", "genre", "releaseYear").write.format("parquet").mode("overwrite").save(hdfsPath)

hdfsPath = hdfs:/user/hdfs/CaseStudies


hdfs:/user/hdfs/CaseStudies