# **Data Pulling, Cleaning and Merging**

## **File Type Conversion**

Upload Movies, Ratings, & Users DAT files from [MovieLens 1M Dataset](https://grouplens.org/datasets/movielens/1m/) for conversion to CSV

In [2]:
from google.colab import drive

# mount Google drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
!ls /content/drive/MyDrive

 718_movie_recommendation_system.ipynb			    'IDS Project'
 BDA-Labs						    'Linear Regression and Statistics'
 Big-Data-Presentation.gslides				     links.csv
'Big Data Project'					     movie_genome_joined.csv
 Cat-1.jpeg						     movie-recommendation-system
 Cat-2.jpeg						     Real-Cat-check.jpeg
 Cat-3.jpeg						     spectrograms.zip
'Colab Notebooks'					     Sports_Analytics
'Copy of Film Script Project Proposal by Slidesgo.gslides'   sst_dev.txt
 data.csv						     sst_test.txt
'Deep Learning Project'					     sst_train.txt
'DLP_Lab_05_CNN (1).ipynb'				    'Untitled document.gdoc'
'EDA (1).ipynb'						     User-Movie-data.parquet
 glove.6B.100d.txt					     whale-detection-challenge


# Convert Movies data into CSV

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split

# start session
spark = SparkSession.builder.appName("MovieRecommender").getOrCreate()

# Google drive folder
drive_path = '/content/drive/MyDrive/movie-recommendation-system/read-in/'

# file path
dat_file_path = drive_path + 'movies.dat'

# read in DAT as text
df = spark.read.text(dat_file_path)

# mark deliminater (based on README data description)
delimiter = "::"

# split into columns
df_split = df.withColumn("split_data", split(df["value"], delimiter))

# extract/name columns (based on README data description)
df_final = df_split.selectExpr(
    "split_data[0] as MovieID",
    "split_data[1] as Title",
    "split_data[2] as Genres"
)

# Google drive output folder
output_drive_path = '/content/drive/MyDrive/movie-recommendation-system/output-csv/'

# CSV name
output_csv_path_movies = output_drive_path + "movies.csv"

# save to one CSV file, overwrite for code rerun
df_final.coalesce(1).write.mode("overwrite").csv(output_csv_path_movies, header=True)

print("Conversion complete!")

Conversion complete!


#Convert Ratings data to CSV

In [7]:
dat_file_path = drive_path + 'ratings.dat'

df = spark.read.text(dat_file_path)

delimiter = "::"

df_split = df.withColumn("split_data", split(df["value"], delimiter))

df_final = df_split.selectExpr(
    "split_data[0] as UserID",
    "split_data[1] as MovieID",
    "split_data[2] as Rating",
    "split_data[3] as Timestamp"
)

output_csv_path_ratings = output_drive_path + "ratings.csv"

df_final.coalesce(1).write.mode("overwrite").csv(output_csv_path_ratings, header=True)

print("Conversion 2 complete!")

Conversion 2 complete!


# Convert Users data to CSV

In [8]:
dat_file_path = drive_path + 'users.dat'

df = spark.read.text(dat_file_path)

delimiter = "::"

df_split = df.withColumn("split_data", split(df["value"], delimiter))

df_final = df_split.selectExpr(
    "split_data[0] as UserID",
    "split_data[1] as Gender",
    "split_data[2] as Age",
    "split_data[3] as Occupation",
    "split_data[4] as Zip_Code"
)

output_csv_path_users = output_drive_path + "users.csv"

df_final.coalesce(1).write.mode("overwrite").csv(output_csv_path_users, header=True)

print("Conversion 3 complete!")

Conversion 3 complete!


## **Converted File Schema Checks**

## Movies Data

In [9]:
movies = spark.read.csv(output_csv_path_movies, header=True, inferSchema=True)

movies.printSchema()

rows = movies.count()
columns = len(movies.columns)
print(f"Movies DF Shape: ({rows}, {columns})")

root
 |-- MovieID: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = true)

Movies DF Shape: (3883, 3)


In [None]:
#movies.show(truncate=False)

## Ratings Data

In [10]:
ratings = spark.read.csv(output_csv_path_ratings, header=True, inferSchema=True)

ratings.printSchema()

rows = ratings.count()
columns = len(ratings.columns)
print(f"Ratings DF Shape: ({rows}, {columns})")

root
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: integer (nullable = true)

Ratings DF Shape: (1000209, 4)


In [None]:
#ratings.show(truncate=False)

## Users Data

In [11]:
users = spark.read.csv(output_csv_path_users, header=True, inferSchema=True)

users.printSchema()

rows = users.count()
columns = len(users.columns)
print(f"Users DF Shape: ({rows}, {columns})")

root
 |-- UserID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- Zip_Code: string (nullable = true)

Users DF Shape: (6040, 5)


In [None]:
#users.show(truncate=False)

## **Timestamp Conversion to DateTime**

Convert to datetime format for readability

In [12]:
from pyspark.sql.functions import col, from_unixtime

ratings_datetime = ratings.withColumn("date_time", from_unixtime(col("Timestamp")))

#ratings_datetime.show(truncate=False)

## **MovieLens 1M Data Joins**

Join movies & ratings on MovieID

In [13]:
movie_rating_joined = movies\
  .join(ratings_datetime,
        on = 'MovieID',
        how = 'inner')

In [None]:
#movie_rating_joined.show(truncate=False)

Join movie_rating_joined & users on UserID

In [14]:
movie_final_joined = movie_rating_joined\
  .join(users,
        on = 'UserID',
        how = 'inner')

In [None]:
#movie_final_joined.show(truncate=False)

In [None]:
movie_final_joined.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: integer (nullable = true)
 |-- date_time: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- Zip_Code: string (nullable = true)



Convert movie_final_joined to CSV

In [15]:
# CSV path
output_csv_path_moviefinal = output_drive_path + "movie_final_joined.csv"

# save as one CSV file, overwrite for code rerun
movie_final_joined.coalesce(1).write.mode("overwrite").csv(output_csv_path_moviefinal, header=True)

print("CSV conversion complete!")

CSV conversion complete!


## **Add Metadata to Final Dataset**

Convert [MovieLens Tag Genome Dataset 2021](https://grouplens.org/datasets/movielens/tag-genome-2021/) metadata_updated.json to CSV

In [16]:
json_file_path = drive_path + 'metadata_updated.json'

df = spark.read.json(json_file_path)

#df.show(truncate=False)

In [17]:
output_csv_path_metadata = output_drive_path + "metadata_updated.csv"

df.coalesce(1).write.mode("overwrite").csv(output_csv_path_metadata, header=True)

print("JSON to CSV conversion complete!")

JSON to CSV conversion complete!


Convert metadata to dataframe

In [18]:
metadata = spark.read.csv(output_csv_path_metadata, header=True, inferSchema=True)

metadata.printSchema()

rows = metadata.count()
columns = len(metadata.columns)
print(f"Metadata DF Shape: ({rows}, {columns})")

root
 |-- avgRating: double (nullable = true)
 |-- directedBy: string (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- starring: string (nullable = true)
 |-- title: string (nullable = true)

Metadata DF Shape: (84661, 6)


Rename title column for joining

In [19]:
metadata = metadata.withColumnRenamed("title", "Title")

In [None]:
#metadata.show(truncate=False)

Join metadata to final MovieLens 1M dataset, movie_final_joined, on Title

In [20]:
movie_genome_joined = movie_final_joined\
  .join(metadata,
        on = 'Title',
        how = 'inner')

In [None]:
#movie_genome_joined.show(truncate=False)

In [21]:
movie_genome_joined.printSchema()

rows = movie_genome_joined.count()
columns = len(movie_genome_joined.columns)
print(f"Movie/Genome DF Shape: ({rows}, {columns})")

root
 |-- Title: string (nullable = true)
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: integer (nullable = true)
 |-- date_time: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- Zip_Code: string (nullable = true)
 |-- avgRating: double (nullable = true)
 |-- directedBy: string (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- starring: string (nullable = true)

Movie/Genome DF Shape: (917402, 16)


Save final dataset with metadata, movie_genome_joined, to CSV

In [22]:
# file path
output_csv_path_moviegenome = output_drive_path + "movie_genome_joined.csv"

# save as one CSV file, overwrite filename for rerunning code
movie_genome_joined.coalesce(1).write.mode("overwrite").csv(output_csv_path_moviegenome, header=True)

print("Final CSV conversion complete!")

Final CSV conversion complete!


In [23]:
pip install IMDbPY

Collecting IMDbPY
  Downloading IMDbPY-2022.7.9-py3-none-any.whl.metadata (498 bytes)
Collecting cinemagoer (from IMDbPY)
  Downloading cinemagoer-2023.5.1-py3-none-any.whl.metadata (2.9 kB)
Downloading IMDbPY-2022.7.9-py3-none-any.whl (1.2 kB)
Downloading cinemagoer-2023.5.1-py3-none-any.whl (297 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m297.2/297.2 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: cinemagoer, IMDbPY
Successfully installed IMDbPY-2022.7.9 cinemagoer-2023.5.1


## This PySpark script initializes a Spark session, loads a CSV file containing movie and user data into a DataFrame, and infers its schema. The schema lists attributes like `Title`, `UserID`, `Genres`, `Rating`, and more, providing comprehensive movie and user interaction details. It also displays sample rows for inspection.

In [24]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("IMDb Details").getOrCreate()

# Load Excel file
file_path = output_drive_path + "movie_genome_joined.csv" # Replace with the path to your Excel file
df_spark = spark.read.csv(file_path, header=True, inferSchema=True)

# Show DataFrame schema and sample rows
df_spark.printSchema()
df_spark.show(5)


root
 |-- Title: string (nullable = true)
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: integer (nullable = true)
 |-- date_time: timestamp (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- Zip_Code: string (nullable = true)
 |-- avgRating: double (nullable = true)
 |-- directedBy: string (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- starring: string (nullable = true)

+--------------------+------+-------+-------------+------+---------+-------------------+------+---+----------+--------+---------+------------+------+-------+--------------------+
|               Title|UserID|MovieID|       Genres|Rating|Timestamp|          date_time|Gender|Age|Occupation|Zip_Code|avgRating|  directedBy|imdbId|item_id|            starring

## This script uses the IMDb API to fetch detailed metadata for a subset of unique IMDb IDs. After initializing an IMDb object, it defines a function to extract movie details like title, runtime, rating, plot outline, and director names. Using `ThreadPoolExecutor`, it fetches these details in parallel to optimize execution time.

#### Progress is tracked using `tqdm`. Results are filtered to exclude errors and stored in a list. Finally, the list is converted into a Pandas DataFrame for easy visualization and analysis, enabling efficient integration of enriched metadata into the existing dataset.

In [25]:
import os
import time
from imdb import IMDb
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
# Collect unique IMDb IDs into a list
imdb_ids = df_spark.select('imdbId').distinct().rdd.flatMap(lambda x: x).collect()

from imdb import IMDb
import pandas as pd

# Initialize the IMDb object
ia = IMDb()

# Function to fetch movie details and extract required fields
def fetch_all_movie_details(imdb_id):
    try:
        # Fetch movie details
        movie = ia.get_movie(imdb_id)

        # Extract required fields
        movie_details = {
            "localized_title": movie.get("localized title", "N/A"),
            "runtimes": ', '.join(movie.get("runtimes", [])),  # Convert list to string
            "rating": movie.get("rating", "N/A"),
            "plot_outline": movie.get("plot outline", "N/A"),
            # Extract director names properly from the Person object list
            "director": ', '.join([director["name"] for director in movie.get("director", [])]),  # Accessing 'name' from each Person object
            "imdbID": movie.get("imdbID", "N/A")
        }

        return movie_details
    except Exception as e:
        print(f"Error fetching movie details: {e}")
        return None

In [None]:
# Use ThreadPoolExecutor for parallel fetching of movie details
imdb_ids=imdb_ids[:100]
movie_details_list=[]
with ThreadPoolExecutor(max_workers=10) as executor:  # Adjust max_workers as needed
    # Submit tasks to the executor and wrap in tqdm for progress tracking
    future_to_imdb = {executor.submit(fetch_all_movie_details, imdb_id): imdb_id for imdb_id in imdb_ids}

    for future in tqdm(as_completed(future_to_imdb), total=len(future_to_imdb), desc="Fetching Movie Details"):
        movie_details_list.append(future.result())

#### The `movie_details_df` DataFrame stores detailed metadata for movies retrieved from the IMDb API. It includes six key columns: **localized_title** (movie title in its localized version), **runtimes** (movie duration in minutes), **rating** (IMDb user rating), **plot_outline** (a brief synopsis), **director** (names of directors), and **imdbID** (unique IMDb identifier).

In [42]:

movie_details_list = [entry for entry in movie_details_list if entry is not None]

# Convert the list to a Pandas DataFrame
import pandas as pd
movie_details_df = pd.DataFrame(movie_details_list)

In [44]:
print(movie_details_df.head())

     localized_title runtimes  rating  \
0  The Cement Garden      105     7.0   
1     Bachelor Party      105     6.3   
2           Airheads       92     6.2   
3      Puppet Master       90     5.5   
4   Extreme Measures      118     6.2   

                                        plot_outline          director  \
0  After her husband's death, the mother of Julie...     Andrew Birkin   
1  Rick Gassko is about to marry Debbie Thompson....       Neal Israel   
2  Three would be rockers Chazz, Rex and Pip, kno...   Michael Lehmann   
3  Neil Gallagher found the secret to Toulon's pu...  David Schmoeller   
4  Thriller about Guy Luthan (Hugh Grant), a Brit...     Michael Apted   

    imdbID  
0  0106535  
1  0086927  
2  0109068  
3  0098143  
4  0116259  


In [45]:
import pandas as pd

# Adjust display settings
pd.set_option('display.max_rows', None)
# Print all data types
print(movie_details_df.dtypes)

localized_title     object
runtimes            object
rating             float64
plot_outline        object
director            object
imdbID              object
dtype: object


### Write the satged code in the drive!

In [32]:
movie_details_df.to_csv('data.csv')
!cp data.csv '/content/drive/MyDrive/movie-recommendation-system/output-csv/data.csv'


## This code cell is run only when we pick data from a temporary stage to save time!

In [35]:
'''# Load Excel file
file_path = output_drive_path+ 'data.csv'  # Replace with the path to your Excel file
movie_details_df= spark.read.csv(file_path, header=True, inferSchema=True)'''

This code snippet processes and cleans the movie metadata from a Pandas DataFrame, ensuring compatibility with Spark. Missing values are filled with defaults (e.g., empty strings or "0.0" for ratings), and list fields, if any, are converted to comma-separated strings. A custom schema is defined for a Spark DataFrame, specifying data types for each column, including strings for localized_title, runtimes, rating_imdb, plot_outline, director, and imdbID.

The cleaned Pandas DataFrame is then converted into a Spark DataFrame using this schema. This Spark DataFrame (df_imdb_spark) organizes the movie details into a structured format suitable for distributed computing, allowing operations like filtering, joining, and analysis across large datasets. The example output shows the top 5 rows with key information like movie title, runtime, IMDb rating, plot synopsis, director names, and IMDb ID.

In [40]:
from pyspark.sql import types as T
# Clean the DataFrame (ensure no NaN or float issues are present)
movie_details_df['localized_title'] = movie_details_df['localized_title'].fillna('')
movie_details_df['rating'] = movie_details_df['rating'].fillna('0.0')
movie_details_df['runtimes'] = movie_details_df['runtimes'].fillna('')
movie_details_df['plot_outline'] = movie_details_df['plot_outline'].fillna('')
movie_details_df['director'] = movie_details_df['director'].fillna('')
movie_details_df['imdbID'] = movie_details_df['imdbID'].fillna('')

# Optionally, ensure any list columns (if any) are strings
movie_details_df['runtimes'] = movie_details_df['runtimes'].apply(lambda x: ', '.join(x) if isinstance(x, list) else x)
movie_details_df['director'] = movie_details_df['director'].apply(lambda x: ', '.join(x) if isinstance(x, list) else x)

# Define the schema for Spark DataFrame
schema = T.StructType([

    T.StructField("localized_title", T.StringType(), True),
    T.StructField("runtimes", T.StringType(), True),  # Convert to string type
    T.StructField("rating_imdb", T.StringType(), True),
    T.StructField("plot_outline", T.StringType(), True),
    T.StructField("director", T.StringType(), True),
       T.StructField("imdbID", T.StringType(), True),# Convert to string type
])

# Convert the movie details DataFrame to a Spark DataFrame with the schema
df_imdb_spark = spark.createDataFrame(movie_details_df, schema=schema)

# Show the resulting Spark DataFrame
df_imdb_spark.show(5)

+-----------------+--------+-----------+--------------------+----------------+-------+
|  localized_title|runtimes|rating_imdb|        plot_outline|        director| imdbID|
+-----------------+--------+-----------+--------------------+----------------+-------+
|The Cement Garden|     105|        7.0|After her husband...|   Andrew Birkin|0106535|
|   Bachelor Party|     105|        6.3|Rick Gassko is ab...|     Neal Israel|0086927|
|         Airheads|      92|        6.2|Three would be ro...| Michael Lehmann|0109068|
|    Puppet Master|      90|        5.5|Neil Gallagher fo...|David Schmoeller|0098143|
| Extreme Measures|     118|        6.2|Thriller about Gu...|   Michael Apted|0116259|
+-----------------+--------+-----------+--------------------+----------------+-------+
only showing top 5 rows



#### The code merges two Spark DataFrames (`df_spark` and `df_imdb_spark`) using an inner join on `imdbID`. The resulting DataFrame (`df_joined`) combines user ratings, demographics, and movie details like `localized_title`, `runtimes`, and `plot_outline`, enriching the dataset for deeper analysis or recommendations. Only rows with matching `imdbID` are included.

In [41]:
# Perform the join on the 'imdbID' column (inner join)
df_joined = df_spark.join(df_imdb_spark, on="imdbID", how="inner")

# Show the resulting joined DataFrame
df_joined.show(5)

+------+--------------------+------+-------+-------+------+---------+-------------------+------+---+----------+--------+---------+--------------+-------+--------------------+-------------------+--------+-----------+--------------------+--------------+
|imdbId|               Title|UserID|MovieID| Genres|Rating|Timestamp|          date_time|Gender|Age|Occupation|Zip_Code|avgRating|    directedBy|item_id|            starring|    localized_title|runtimes|rating_imdb|        plot_outline|      director|
+------+--------------------+------+-------+-------+------+---------+-------------------+------+---+----------+--------+---------+--------------+-------+--------------------+-------------------+--------+-----------+--------------------+--------------+
| 19729|Broadway Melody, ...|    68|   1926|Musical|     3|977882471|2000-12-27 02:01:11|     M| 18|         4|   53706|  3.07674|Harry Beaumont|   1926|Charles King, Bes...|The Broadway Melody|     100|        5.6|Hank and Queenie ...|Harry Be