<a href="https://colab.research.google.com/github/PranavShashidhara/Netflix-Recommendation-system/blob/main/User_data_Preprcoessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import boto3
import os
import pyarrow as pa
import pyarrow.parquet as pq
import io
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, lag, col, when, last, lit, udf, min, max
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

In [None]:
movie_titles_path = "dbfs:/FileStore/shared_uploads/pranavmay22@gmail.com/movie_titles.csv"
user_data_path_1 = "dbfs:/FileStore/shared_uploads/pranavmay22@gmail.com/combined_data_1.txt"
user_data_path_2 = "dbfs:/FileStore/shared_uploads/pranavmay22@gmail.com/combined_data_2.txt"
user_data_path_3 = "dbfs:/FileStore/shared_uploads/pranavmay22@gmail.com/combined_data_3.txt"
user_data_path_4 = "dbfs:/FileStore/shared_uploads/pranavmay22@gmail.com/combined_data_4.txt"
parquet_file_path = "dbfs:/FileStore/shared_uploads/pranavmay22@gmail.com/"

raw_movie_titles = spark.read.format("csv").option("header", "true").load(movie_titles_path)
raw_user_data_1 = spark.read.text(user_data_path_1)
raw_user_data_2 = spark.read.text(user_data_path_2)
raw_user_data_3 = spark.read.text(user_data_path_3)
raw_user_data_4 = spark.read.text(user_data_path_4)

raw_user_data = raw_user_data_1.union(raw_user_data_2).union(raw_user_data_3).union(raw_user_data_4)
raw_user_data.show()

+--------------------+
|               value|
+--------------------+
|                  1:|
|1488844,3,2005-09-06|
| 822109,5,2005-05-13|
| 885013,4,2005-10-19|
|  30878,4,2005-12-26|
| 823519,3,2004-05-03|
| 893988,3,2005-11-17|
| 124105,4,2004-08-05|
|1248029,3,2004-04-22|
|1842128,4,2004-05-09|
|2238063,3,2005-05-11|
|1503895,4,2005-05-19|
|2207774,5,2005-06-06|
|2590061,3,2004-08-12|
|   2442,3,2004-04-14|
| 543865,4,2004-05-28|
|1209119,4,2004-03-23|
| 804919,4,2004-06-10|
|1086807,3,2004-12-28|
|1711859,4,2005-05-08|
+--------------------+
only showing top 20 rows



In [None]:
movie_id_col = regexp_extract('value', r'^(\d+):$', 1)
user_id_col = regexp_extract('value', r"^(\d+),", 1)
ratings_col = regexp_extract('value', r',(\d+),', 1)
date_col = regexp_extract('value', r',(\d{4}-\d{2}-\d{2})$', 1)

temp_data_df = raw_user_data.withColumn("Movie_id",when(movie_id_col!="", movie_id_col))\
                            .withColumn("User_id", when(user_id_col!="", user_id_col))\
                            .withColumn("Ratings", when(ratings_col!="", ratings_col))\
                            .withColumn("Date", when(date_col!="", date_col))

#user_data_df.head(5)
window_spec = Window.orderBy(lit(1)).rowsBetween(Window.unboundedPreceding, Window.currentRow)
filled_data = temp_data_df.withColumn("Movie_id", when(col("Movie_id").isNotNull(), col("Movie_id")) \
                                           .otherwise(last("Movie_id", True).over(window_spec)))
final_data = filled_data.filter(col("User_id").isNotNull())

final_data = final_data.select("Movie_id", "User_id", "Ratings", "Date")
final_data.show(20, truncate=False)

+--------+-------+-------+----------+
|Movie_id|User_id|Ratings|Date      |
+--------+-------+-------+----------+
|1       |997243 |4      |2004-04-09|
|1       |1488844|3      |2005-09-06|
|1       |1067749|3      |2004-08-08|
|1       |822109 |5      |2005-05-13|
|1       |951682 |4      |2004-08-24|
|1       |885013 |4      |2005-10-19|
|1       |1665978|3      |2004-06-15|
|1       |30878  |4      |2005-12-26|
|1       |1454822|5      |2004-05-04|
|1       |823519 |3      |2004-05-03|
|1       |184199 |3      |2004-05-03|
|1       |893988 |3      |2005-11-17|
|1       |789966 |3      |2004-06-05|
|1       |124105 |4      |2004-08-05|
|1       |2357356|4      |2004-08-10|
|1       |1248029|3      |2004-04-22|
|1       |759549 |4      |2004-08-14|
|1       |1842128|4      |2004-05-09|
|1       |1710762|3      |2004-08-19|
|1       |2238063|3      |2005-05-11|
+--------+-------+-------+----------+
only showing top 20 rows



In [None]:
unique_movie_ids = final_data.select(F.countDistinct("Movie_id")).collect()[0][0]
print(unique_movie_ids)

17768
0


In [None]:
null_count_movie_id = final_data.filter(col("Movie_id").isNull()).count()
null_count_user_id = final_data.filter(col("User_id").isNull()).count()
null_count_ratings = final_data.filter(col("Ratings").isNull()).count()

print("Null count for User Id: ", null_count_user_id, ", Null count for Movie Id:",null_count_movie_id, ", Null count for Ratings", null_count_ratings)

Null count for User Id:  0 , Null count for Movie Id: 0 , Null count for Ratings 0


In [None]:
# Since CustomerIDs range from 1 to 2649429, with gaps. There are 480189 users. They will have to be made sequential for better memory efficiency and speed.
unique_user_id = final_data.select("User_id").distinct().rdd.map(lambda row: row[0]).collect()

user_id_map = {cid: idx + 1 for idx, cid in enumerate(unique_user_id)}

broadcast_mapping = spark.sparkContext.broadcast(user_id_map)

def remap_user_id(user_id):
    return broadcast_mapping.value.get(user_id, -1)

remap_user_udf = udf(remap_user_id, IntegerType())

final_data_with_remapped_user_id = final_data.withColumn("RemappedUserId", remap_user_udf(final_data["User_id"]))


In [None]:
# since Movie Id's are sequential from from 1 to 17770 we can subtract it by 1 to start from from 0 to 17769 for better memory efficiency.
final_data = final_data.withColumn("Movie_id", (col("Movie_id")-1).cast(IntegerType()))


In [None]:
final_data.agg(max("Movie_id").alias("max_value"), min("Movie_id").alias("min_value")).show()


+---------+---------+
|max_value|min_value|
+---------+---------+
|    17769|        0|
+---------+---------+



In [None]:
final_data.agg(max("User_id").alias("max_value"), min("Movie_id").alias("min_value")).show()

+---------+---------+
|max_value|min_value|
+---------+---------+
|   999988|        0|
+---------+---------+



In [None]:
mapping_df.count()

Out[19]: 480189

In [None]:
mapping_df = spark.createDataFrame(user_id_map.items(), ["OriginalUserId", "RemappedUserId"])
mapping_df.write.mode("overwrite").parquet("dbfs:/path_to_save/user_id_mapping")

In [None]:
total_records = final_data.count()

print(f"Total records: {total_records}")

Total records: 100480507


In [None]:
N = final_data.select("Movie_id").distinct().count()
M = final_data.select("User_id").distinct().count()

movie_id_count = final_data.groupBy("Movie_id").count().collect()
user_id_count = final_data.groupBy("User_id").count().collect()

movie_ids_count = {row['Movie_id']: row['count'] for row in movie_id_count}
user_ids_count = {row['User_id']: row['count'] for row in user_id_count}

n = 10000  # Top N users
m = 2000   # Top M movies

# Get the top N most frequent users and M most frequent movies
top_users = [user for user, _ in sorted(user_ids_count.items(), key=lambda item: item[1], reverse=True)[:n]]
top_movies = [movie for movie, _ in sorted(movie_ids_count.items(), key=lambda item: item[1], reverse=True)[:m]]

filtered_data = final_data.filter(final_data["User_id"].isin(top_users) & final_data["Movie_id"].isin(top_movies))

print(f"Filtered data size: {filtered_data.count()}")

filtered_data.write.csv("path_to_save_filtered_data.csv", header=True)

Filtered data size: 9357999


In [None]:
filtered_data.write.parquet("dbfs:/path_to_save/small_filtered_data.parquet")

In [None]:
final_data.write.parquet("dbfs:/path_to_save/final_data.parquet")

In [None]:
aws_access_key_id = ''
aws_secret_access_key = ''
region_name = 'us-east-1'  # Adjust based on your bucket region

s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id,
                         aws_secret_access_key=aws_secret_access_key, region_name=region_name)

bucket_name = 'netflix-project-data'
parquet_s3_path = 'Preprocessed data/Small_filtered_data.parquet'

df_spark = spark.read.parquet("dbfs:/path_to_save/small_filtered_data.parquet/")

df_pandas = df_spark.toPandas()

table = pa.Table.from_pandas(df_pandas)

local_file_path = '/tmp/pandas_data.parquet'
pq.write_table(table, local_file_path)

try:
    s3_client.upload_file(local_file_path, bucket_name, parquet_s3_path)
    print(f"File uploaded successfully to s3://{bucket_name}/{parquet_s3_path}")
except Exception as e:
    print(f"Error uploading file: {e}")

os.remove(local_file_path)


File uploaded successfully to s3://netflix-project-data/Preprocessed data/Small_filtered_data.parquet


In [None]:
df_spark = spark.read.parquet("dbfs:/path_to_save/final_data.parquet/")
parquet_s3_path = 'Preprocessed data/Final_processed_data.parquet'
local_file_path = '/tmp/pandas_data.parquet'

df_pandas = df_spark.toPandas()

table = pa.Table.from_pandas(df_pandas)

pq.write_table(table, local_file_path)

try:
    s3_client.upload_file(local_file_path, bucket_name, parquet_s3_path)
    print(f"File uploaded successfully to s3://{bucket_name}/{parquet_s3_path}")
except Exception as e:
    print(f"Error uploading file: {e}")

File uploaded successfully to s3://netflix-project-data/Preprocessed data/Final_processed_data.parquet
