### Cleaning | Preprocessing the given csv files

In [None]:
# imports
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t
import pandas as pd
import json

In [None]:
# creating a spark session
spark = SparkSession.builder.master("yarn").appName("MovieAnalysis").getOrCreate()

In [None]:
# defining the absolute path to the directory where the files will be stored
root_path = "dbfs:/FileStore/shared_uploads/Ibele@stud.dhbw-ravensburg.de/"

In [None]:
# defining a schema for reading in the credits.csv
credit_schema = t.StructType([t.StructField('movie_id', t.IntegerType(), True),
                          t.StructField('title', t.StringType(), True),
                          t.StructField('cast', t.StringType(), True),
                          t.StructField('crew', t.StringType(), True)])

# defining a schema for reading in the movies.csv
movie_schema = t.StructType([t.StructField('budget', t.IntegerType(), True),
                          t.StructField('genres', t.IntegerType(), True),
                          t.StructField('homepage', t.StringType(), True),
                          t.StructField('id', t.IntegerType(), True),
                          t.StructField('keywords', t.StringType(), True),
                          t.StructField('original_language', t.StringType(), True),
                          t.StructField('original_title', t.StringType(), True),
                          t.StructField('overview', t.StringType(), True),
                          t.StructField('popularity', t.FloatType(), True),
                          t.StructField('production_companies', t.StringType(), True),
                          t.StructField('production_countries', t.StringType(), True),
                          t.StructField('release_date', t.TimestampType(), True),
                          t.StructField('revenue', t.IntegerType(), True),
                          t.StructField('runtime', t.FloatType(), True),
                          t.StructField('spoken_languages', t.StringType(), True),
                          t.StructField('status', t.StringType(), True),
                          t.StructField('tagline', t.StringType(), True),
                          t.StructField('title', t.StringType(), True)])

# defining a schema for reading in the recommendations.csv
recom_schema = t.StructType([t.StructField('movie_id', t.IntegerType(), True),
                          t.StructField('user_id', t.IntegerType(), True),
                          t.StructField('vote', t.IntegerType(), True)])

In [None]:
# reading the given csv files in a pyspark dataframe with schema and options to handle the format of the data in the csv
credits = (spark.read
            .option("header", "true").option("escape",'"').option("mode", "DROPMALFORMED").schema(credit_schema)
            .csv("dbfs:/FileStore/shared_uploads/ibele@stud.dhbw-ravensburg.de/credits_groupA-1.csv"))

movies = (spark.read
            .option("header", "true").option("escape",'"').option("mode", "DROPMALFORMED")
            .csv("dbfs:/FileStore/shared_uploads/ibele@stud.dhbw-ravensburg.de/movies_groupA-1.csv"))

recoms = (spark.read
            .option("header", "true").option("mode", "DROPMALFORMED").schema(recom_schema)
            .csv("dbfs:/FileStore/shared_uploads/ibele@stud.dhbw-ravensburg.de/recommendations_groupA-1.csv"))

In [None]:
# converting the movie and credits pyspark dataframe to a pandas dataframe for the cleaning process later
df_movies = movies.toPandas()
df_credits = credits.toPandas()

In [None]:
def get_normalized_df(df: pd.DataFrame, col: str, key_attribute: str="movie_id") -> pd.DataFrame:
    """
    function for cleaning and normalizing the given data in the pandas dataframes
    this function doesn't check for the completeness and validity of rows in the dataset to prevent potential loss of data
    
    attributes: 
        df (pd.Dataframe): dataframe to be normalized
        col (str): column of the dataframe to be normalized
        key_attribute (str): key attribute of dataframe
    """
    # create copy of the dataframe, so the original doesn't get alternated
    df_copy = df.copy() 
    
    df_copy[col] = (df_copy[col] # select column of interest
                    .map(json.loads) # read as json
                    .map(pd.json_normalize)) # create normalized dataframes
    
    # assign key of the row to the newly created normalized dataframes
    clean_dfs = [row[col].assign(id=row[key_attribute]) for _, row in df_copy.iterrows()] 
    
    # join normalized dataframes
    df_clean = pd.concat(clean_dfs, ignore_index=True)

    return df_clean

# list of the to be created dataframes: (column, source dataframe, key attribute)
new_dataframes = [
    ("production_companies", df_movies, "id"),
    ("spoken_languages", df_movies, "id"),
    ("genres", df_movies, "id"),
    ("cast", df_credits, "movie_id"),
    ("crew", df_credits, "movie_id")
]

# looping over new_dataframes in order to normalize and save as parquet file for group and individual tasks
for column, source_df, key_attribute in new_dataframes:
    normalized_df = get_normalized_df(source_df, column, key_attribute)
    normalized_df = spark.createDataFrame(normalized_df)
    normalized_df.write.format("parquet").mode("overwrite").save(f"{root_path}filtered_data/{column}.parquet", index=False)

In [None]:
# since recoms is already clean there is no cleaning and no converting to pandas needed -> saving recoms as parquet file
recoms.write.format("parquet").mode("overwrite").save(f"{root_path}filtered_data/recom.parquet")

In [None]:
# selecting the remaining needed columns of movie and saving them in a parquet file
(movies
    .select("title", "id", "budget", "revenue", "release_date", "runtime", "popularity")
    .write.format("parquet").mode("overwrite")
    .save(f"{root_path}filtered_data/movies.parquet"))