In [0]:
from pyspark.sql.functions import udf, col, when, split, mean, stddev, explode
from pyspark.sql.types import IntegerType, StringType
import random
import uuid

def generate_runtime(mean_val, std_val):
    if mean_val is None or std_val is None:
        return None
    val = int(random.normalvariate(mean_val, std_val))
    return max(val, 1)
generate_runtime_udf = udf(generate_runtime, IntegerType())
                           
def generate_uuid():
    return str(uuid.uuid4())
uuid_udf = udf(generate_uuid, StringType())

df = spark.read.table("imdb_dev.bronze.title_basics") 

df_not_shifted = df.filter((col("isAdult") == 0) | (col("isAdult") == 1))
df_shifted = df.filter((col("isAdult") != 0) & (col("isAdult") != 1)) #600 records were shifted by 1 column
df_shifted = (
    df_shifted.withColumn("genres", col("runtimeMinutes"))
    .withColumn("runtimeMinutes", col("endYear"))
    .withColumn("endYear", col("startYear"))
    .withColumn("startYear", col("isAdult"))
    .withColumn("isAdult", col("originalTitle"))
    .withColumn("originalTitle", col("primaryTitle"))
)

df_title = (
    df_not_shifted.unionByName(df_shifted)
    .withColumnRenamed("tconst", "titleId") 
    .withColumn("genres", when(col("genres") == "\\N", "Other").otherwise(col("genres")))
    .withColumn("genres", split("genres", ","))
)
#12% empty start year, 4% empty genre, 67% dont have runtime, 98% empty end year - empty by default for non tvseries 
#runtime should be skipped but will be filled with normal distribution of existing values for research purposes

In [0]:
df_genre = (
    df_title.select(explode("genres").alias("genre"))
    .distinct()
    .withColumn("genreId", uuid_udf())
)
df_genre = df_genre.select("genreId", "genre")
df_genre.write.format("delta").mode("overwrite").insertInto("imdb_dev.silver.Genre")

In [0]:
df_genre = spark.read.table("imdb_dev.silver.Genre") 

df_genre_of_title = (
    df_title.select("titleId", explode("genres").alias("genre"))
    .join(df_genre, on="genre", how="left")
    .drop("genre")
)
df_genre_of_title = df_genre_of_title.select("genreId", "titleId")
df_genre_of_title.write.format("delta").mode("overwrite").insertInto("imdb_dev.silver.GenreOfTitle")

In [0]:
df_title_type = (
    df_title.select("titleType")
    .distinct()
    .withColumn("titleTypeId", uuid_udf())
)
df_title_type = df_title_type.select("titleTypeId", "titleType")
df_title_type.write.format("delta").mode("overwrite").insertInto("imdb_dev.silver.TitleType")

In [0]:
df_title_type = spark.read.table("imdb_dev.silver.TitleType") 

runtime_stats = (
    df_title.filter(col("runtimeMinutes").isNotNull())
      .groupBy("titleType")
      .agg(
          mean("runtimeMinutes").alias("mean_runtime"),
          stddev("runtimeMinutes").alias("stddev_runtime")
      )
)
df_title_with_stats = df_title.join(runtime_stats, on="titleType", how="left")

df_title = (
    df_title_with_stats.withColumn("runtimeMinutes", when(col("runtimeMinutes").isNull(), generate_runtime_udf(col("mean_runtime"), col("stddev_runtime"))).otherwise(col("runtimeMinutes")))
    .drop("mean_runtime", "stddev_runtime")
    .withColumn("startYear", col("startYear").cast("int"))
    .withColumn("endYear", when(col("endYear").isNull(), col("startYear")).otherwise(col("endYear")))
    .join(df_title_type, on="titleType", how="left")
    .drop("genres", "titleType")
)
df_title = df_title.select("titleId", "titleTypeId", "primaryTitle", "originalTitle", "isAdult", "startYear", "endYear", "runtimeMinutes")
df_title.write.format("delta").mode("overwrite").insertInto("imdb_dev.silver.Title")