### Silver Data Transformations

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark.conf.set(
    "fs.azure.account.key.netflixprojectadls.dfs.core.windows.net",
    dbutils.secrets.get(scope="AccessforADLS", key="StorageAccessKey"))

In [0]:
df = spark.read.format("delta")\
        .option("header",True)\
        .option("inferSchema",True)\
        .load("abfss://bronze@netflixprojectadls.dfs.core.windows.net/netflix_titles")

In [0]:
df.display()

In [0]:
df = df.fillna({"duration_minutes": "0", "duration_seasons": "1"})

In [0]:
df.display()

In [0]:
df = df.withColumn("duration_minutes", col("duration_minutes").cast(IntegerType()))\
    .withColumn("duration_seasons", col("duration_seasons").cast(IntegerType()))\
    .withColumn("show_id", col("show_id").cast(IntegerType()))

In [0]:
df=df.withColumn("title",split(col("title"),':')[0])\
    .withColumn('date_added', to_date(col('date_added'), 'M/d/yyyy'))

display(df)

In [0]:
df = df.withColumn("rating",split(col('rating'),'-')[0])

display(df)

In [0]:
df = df.withColumn("type_flag",when(col('type')=='Movie',1)\
            .when(col('type')=='TV Show',2)\
            .otherwise(0))

display(df)

In [0]:
from pyspark.sql import Window

WindowSpec = Window.partitionBy().orderBy(col('duration_minutes').desc())

df = df.withColumn("duration_ranking",dense_rank().over(WindowSpec))

display(df)

In [0]:
from pyspark.sql.functions import count

df.groupBy('type').agg(count('*').alias("total_count")).display()

Databricks visualization. Run in Databricks to view.

In [0]:
df.write.format("delta")\
.mode("overwrite")\
.save("abfss://silver@netflixprojectadls.dfs.core.windows.net/netflix_titles")