In [1]:
import org.apache.spark.sql.{SparkSession, functions => F}

var old_data_path = "C:/Thomas/Etudes/ESILV/A9/Structure_de_donnees_cloud/Projet/Report_4/startingTables/"
var new_data_path = "C:/Thomas/Etudes/ESILV/A9/Structure_de_donnees_cloud/Projet/Report_4/denTables/"

Intitializing Scala interpreter ...

Spark Web UI available at http://10.1.170.112:4040
SparkContext available as 'sc' (version = 3.4.1, master = local[*], app id = local-1702999175995)
SparkSession available as 'spark'


import org.apache.spark.sql.{SparkSession, functions=>F}
old_data_path: String = C:/Thomas/Etudes/ESILV/A9/Structure_de_donnees_cloud/Projet/Report_4/startingTables/
new_data_path: String = C:/Thomas/Etudes/ESILV/A9/Structure_de_donnees_cloud/Projet/Report_4/denTables/


In [2]:
// Create a Spark session
val spark = SparkSession.builder.appName("Denormalization")
  .config("spark.driver.memory", "8g")
  .config("spark.executor.memory", "8g")
  .getOrCreate()

// Load the CSV data into DataFrames
val moviesDF = spark.read.csv(old_data_path + "movies.csv").toDF("id","name","year","rank")
val oldMoviesGenresDF = spark.read.csv(old_data_path + "movies_genres.csv").toDF("movie_id","genre")
val oldMoviesDirectorsDF = spark.read.csv(old_data_path + "movies_directors.csv").toDF("director_id","movie_id")
val oldRolesDF = spark.read.csv(old_data_path + "roles.csv").toDF("actor_id","movie_id","role")

val directorsDF = spark.read.csv(old_data_path + "directors.csv").toDF("id", "first_name", "last_name")
val oldDirectorsGenresDF = spark.read.csv(old_data_path + "directors_genres.csv").toDF("director_id","genre","prob")

val actorsDF = spark.read.csv(old_data_path + "actors.csv").toDF("id","first_name","last_name","gender")

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@53531bf9
moviesDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]
oldMoviesGenresDF: org.apache.spark.sql.DataFrame = [movie_id: string, genre: string]
oldMoviesDirectorsDF: org.apache.spark.sql.DataFrame = [director_id: string, movie_id: string]
oldRolesDF: org.apache.spark.sql.DataFrame = [actor_id: string, movie_id: string ... 1 more field]
directorsDF: org.apache.spark.sql.DataFrame = [id: string, first_name: string ... 1 more field]
oldDirectorsGenresDF: org.apache.spark.sql.DataFrame = [director_id: string, genre: string ... 1 more field]
actorsDF: org.apache.spark.sql.DataFrame = [id: string, first_name: string ... 2 more fields]


In [3]:
// Denormalize movies.list_genres
val denormalizedMoviesDF = moviesDF
  .join(oldMoviesGenresDF, moviesDF("id") === oldMoviesGenresDF("movie_id"), "left_outer")
  .groupBy("id", "name", "year", "rank")
  .agg(F.collect_list("genre").alias("list_genres"))

denormalizedMoviesDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 3 more fields]


In [4]:
// Denormalize movies.list_directors_id
val denormalizedMoviesWithDirectorsDF = denormalizedMoviesDF
  .join(oldMoviesDirectorsDF, denormalizedMoviesDF("id") === oldMoviesDirectorsDF("movie_id"), "left_outer")
  .groupBy("id", "name", "year", "rank", "list_genres")
  .agg(F.collect_list("director_id").alias("list_directors_id"))

denormalizedMoviesWithDirectorsDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 4 more fields]


In [5]:
// Denormalize movies.den_nb_actors
val denormalizedMoviesWithActorsDF = denormalizedMoviesWithDirectorsDF
  .join(oldRolesDF, denormalizedMoviesWithDirectorsDF("id") === oldRolesDF("movie_id"), "left_outer")
  .groupBy("id", "name", "year", "rank", "list_genres", "list_directors_id")
  .agg(F.size(F.collect_list("actor_id")).alias("den_nb_actors"))
  .withColumn("id", $"id".cast("int"))
  .withColumn("list_directors_id", $"list_directors_id".cast("array<int>"))
  .withColumn("year", $"year".cast("int"))
  .withColumn("rank", $"rank".cast("float"))

denormalizedMoviesWithActorsDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 5 more fields]


In [6]:
// Denormalize directors.list_movies_id
val denormalizedDirectorsWithMoviesDF = directorsDF
  .join(oldMoviesDirectorsDF, directorsDF("id") === oldMoviesDirectorsDF("director_id"),"left_outer")
  .groupBy("id","first_name","last_name")
  .agg(F.collect_list("movie_id").alias("list_movies_id"))

denormalizedDirectorsWithMoviesDF: org.apache.spark.sql.DataFrame = [id: string, first_name: string ... 2 more fields]


In [7]:
// Denormalize directors.dict_genres_den_prob
val denormalizedDirectorsDF = denormalizedDirectorsWithMoviesDF
  .join(oldDirectorsGenresDF, denormalizedDirectorsWithMoviesDF("id") === oldDirectorsGenresDF("director_id"), "left_outer")
  .groupBy("id", "first_name", "last_name", "list_movies_id")
  .agg(
    F.map_from_entries(
      F.collect_list(
        F.when(
          F.col("genre").isNotNull && F.col("prob").isNotNull,
          F.struct("genre", "prob")
        )
      )
    ).alias("dict_genres_den_prob")
  )
  .withColumn("id", $"id".cast("int"))
  .withColumn("list_movies_id", $"list_movies_id".cast("array<int>"))
  // .withColumn("dict_genres_den_prob", $"dict_genres_den_prob".cast(""))

denormalizedDirectorsDF: org.apache.spark.sql.DataFrame = [id: int, first_name: string ... 3 more fields]


In [8]:
// Function to convert string values of dictionary into float
val convertToFloat = udf((data: Map[String, String]) =>
  data.mapValues(_.toFloat)
)

// Apply the function to column "dict_genres_den_prob"
val denormalizedDirectorsDFWithGoodTypes = denormalizedDirectorsDF.withColumn(
  "dict_genres_den_prob",
  convertToFloat(col("dict_genres_den_prob"))
)

convertToFloat: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4070/0x0000000801c5eca8@3a109b5c,MapType(StringType,FloatType,false),List(Some(class[value[0]: map<string,string>])),Some(class[value[0]: map<string,float>]),None,true,true)
denormalizedDirectorsDFWithGoodTypes: org.apache.spark.sql.DataFrame = [id: int, first_name: string ... 3 more fields]


In [9]:
// Denormalize actors.list_movies_id
val denormalizedActorsDF = actorsDF
  .join(oldRolesDF, actorsDF("id") === oldRolesDF("actor_id"), "left_outer")
  .groupBy("id", "first_name", "last_name")
  .agg(F.collect_list("movie_id").alias("list_movies_id"))
  .withColumn("id", $"id".cast("int"))
  .withColumn("list_movies_id", $"list_movies_id".cast("array<int>"))

denormalizedActorsDF: org.apache.spark.sql.DataFrame = [id: int, first_name: string ... 2 more fields]


In [10]:
// Save the denormalized data to a JSON file
denormalizedActorsDF.coalesce(1).write.mode("overwrite").format("json").save(new_data_path + "denActors.json")

In [11]:
// Save the denormalized data to a JSON file
denormalizedDirectorsDFWithGoodTypes.coalesce(1).write
  .mode("overwrite") // Use "overwrite" or "append" based on your requirement
  .format("json")
  .save(new_data_path + "denDirectors.json")

In [12]:
// Save the denormalized data to a JSON file
denormalizedMoviesWithActorsDF.coalesce(1).write
  .mode("overwrite") // Use "overwrite" or "append" based on your requirement
  .format("json")
  .save(new_data_path + "denMovies.json")

In [13]:
// Stop the Spark session
spark.stop()