# Silver Data Transformation

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
df = spark.read.format("delta")\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://bronze@netflixprojectdlbianca.dfs.core.windows.net/netflix_titles")

In [0]:
display(df)

In [0]:
df = df.fillna({"duration_minutes" : 0, "duration_seasons" : 0})

In [0]:
display(df)

In [0]:
df = df.withColumn("duration_minutes", col("duration_minutes").cast(IntegerType()))\
    .withColumn("duration_seasons", col("duration_seasons").cast(IntegerType()))

In [0]:
df.printSchema()

In [0]:
display(df)

In [0]:
df = df.withColumn("short_title", split(col("title"),':')[0])

In [0]:
display(df)

In [0]:
df = df.withColumn("short_rating", split(col("rating"),'-')[0])

In [0]:
display(df)

In [0]:
df = df.withColumn("type_flag",when(col("type")=="Movie", 1)
                            .when(col("type") == "TV Show", 0)
                                  .otherwise(2))

In [0]:
display(df)

In [0]:
df = df.withColumn("duration_ranking", dense_rank().over(Window.orderBy(col("duration_minutes").desc())))

In [0]:
display(df)

In [0]:
df.createOrReplaceGlobalTempView("global_view")

In [0]:
df = spark.sql("""
               
               SELECT * FROM global_temp.global_view
""")

In [0]:
display(df)

In [0]:
df_vis = df.groupBy("type").agg(count("*").alias("total_count"))

In [0]:
display(df_vis)

Databricks visualization. Run in Databricks to view.

In [0]:
df.write.format("delta")\
        .mode("overwrite")\
        .option("path", "abfss://silver@netflixprojectdlbianca.dfs.core.windows.net/netflix_titles")\
        .save()