### Import Dipendencise

In [0]:
from pyspark.sql.functions import *

### Read Data Using Dataframe

In [0]:
movies_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/netflix_data/raw_data/nf_raw_data/movies.csv")
ratings_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/netflix_data/raw_data/nf_raw_data/ratings.csv")
tags_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/netflix_data/raw_data/nf_raw_data/tags.csv")
links_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/netflix_data/raw_data/nf_raw_data/links.csv")
gtag_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/netflix_data/raw_data/nf_raw_data/genome-tags.csv")
gscore_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/netflix_data/raw_data/nf_raw_data/genome-scores.csv")

### Work With Ratings DataFrame

In [0]:
ratings_df = ratings_df.filter((col("rating")>=1.0) & (col("rating")<=5.0))\
                       .withColumn("timestamp", from_unixtime(col("timestamp")).cast("timestamp"))\
                        .withColumn("ratings_timestamp", date_format("timestamp", "yyyy-MM-dd HH:mm:ss"))\
                        .drop("timestamp")\
                        .dropDuplicates()
                       

In [0]:
#Checking For Nulls
ratings_df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c + "_nulls")
    for c in ratings_df.columns]).display()

In [0]:
(ratings_df.write.format("delta")
                .mode("overwrite")
                .option("overwriteSchema", "true")
                .saveAsTable("netflix_data.cleandata.c_ratings_table"))

### Work With Movie DataFrame

In [0]:
movies_df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c + "_nulls")
    for c in movies_df.columns]).display()

In [0]:
(movies_df.write.format("delta")
                .mode("overwrite")
                .option("overwriteSchema", "true")
                .saveAsTable("netflix_data.cleandata.c_movies_table"))

### Work With Tags Dataframe

In [0]:
tags_df = tags_df.withColumn("timestamp", from_unixtime(col("timestamp")).cast("timestamp"))\
                        .withColumn("tags_timestamp", date_format("timestamp", "yyyy-MM-dd HH:mm:ss"))\
                        .drop("timestamp")\
                        .dropDuplicates()

In [0]:
#Checking for nulls
tags_df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c + "_nulls")
    for c in tags_df.columns]).display()

In [0]:
(tags_df.write.format("delta")
                .mode("overwrite")
                .option("overwriteSchema", "true")
                .saveAsTable("netflix_data.cleandata.c_tags_table"))

### Work With links Dataframe

In [0]:
links_df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c + "_nulls")
    for c in links_df.columns]).display()

In [0]:
links_df = links_df.withColumn("tmdbId", when(col("tmdbId").isNull(), 0). otherwise(col("tmdbId")))

In [0]:
(links_df.write.format("delta")
                .mode("overwrite")
                .option("overwriteSchema", "true")
                .saveAsTable("netflix_data.cleandata.c_links_table"))

### Work With G-Tag Dataframe

In [0]:
gtag_df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c + "_nulls")
    for c in gtag_df.columns]).display()

In [0]:
gtag_df = gtag_df.withColumnRenamed("tagId", "gtag_Id")\
                 .withColumnRenamed("tag", "gtag")

In [0]:
gtag_df.write.format("delta")\
            .mode("overwrite")\
            .option("overwriteSchema", "true")\
            .saveAsTable("netflix_data.cleandata.c_gtag_table")

### Work With G-Score Dataframe

In [0]:
gscore_df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c + "_nulls")
    for c in gscore_df.columns]).display()

In [0]:
gscore_df = gscore_df.withColumnRenamed("tagId", "gtag_Id")

In [0]:
gscore_df.write.format("delta")\
                .mode("overwrite")\
                .option("overwriteSchema", True)\
                .saveAsTable("netflix_data.cleandata.c_gscore_table")