In [1]:
# PySpark Pearson correlation function for distributed data processing

# Access configuration to GCP Cloud Storage

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sqrt
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType, DoubleType
from scipy.stats import pearsonr
from itertools import combinations

# 1. Configure the Project ID (not Project Name!!!) as per your GCP Dataproc setup
project_id = 'amishr96-cis415-2025springc'

# 2. Configure Bucket name as per your Google Cloud Storage setup
bucket = 'amishr96_data_for_gcp_labs'

# 3. Configure the path to the movie reviews data file as per your Google Cloud Storage setup
path_to_data_files = "/data_for_assignment/"
movie_reviews_file_name = "movie_ratings (1).csv"
relative_path_to_file = path_to_data_files[1:] + movie_reviews_file_name
full_file_path = "gs://" + bucket + "/" + relative_path_to_file

print(f"ProjectID (and not the Project Name) is: {project_id}")
print(f"Bucket name is: {bucket}")

ProjectID (and not the Project Name) is: amishr96-cis415-2025springc
Bucket name is: amishr96_data_for_gcp_labs


In [2]:
# Typically, any big data platform (like GCP Dataproc) will have PySpark pre-installed
# This might not be the case in other platforms.
# This paragraph is to check if PySpark is available in the system and install if it's not available
# You should expect this paragraph to RUN the PySpark installation in Google Colab
# You should expect this paragraph NOT TO RUN the PySpark installation in GCP Dataproc

try:
  from pyspark.sql import SparkSession
  pyspark_available = 'Y'
except:
  pyspark_available = 'N'

# If PySpark is not installed, then go through all these steps

if pyspark_available == 'N':
  # Update Installer
  !apt-get update

  # Intsall Java
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null

  # install spark (change the version number if needed)
  !wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

  # unzip the spark file to the current folder
  !tar xf spark-3.0.0-bin-hadoop3.2.tgz

  # set your spark folder to your system path environment.
  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

  # install findspark using pip
  !pip install -q findspark

  import findspark
  findspark.init()

else:
    # Spark / PySpark already pre-installed in the environment
    print("PySpark already pre-installed!")


PySpark already pre-installed!


In [6]:
# Check if the code is running in Colab
if 'google.colab' in str(get_ipython()):
  print('Running on CoLab')
  RunningInColab = True
else:
  print('Not running on CoLab')
  RunningInColab = False# Authorize access from Colab to GCP Cloud Storage
from google.cloud import storage
client = storage.Client()
print(f"Package imports done")

if RunningInColab == True:
  # To access Google Cloud Storage
  from google.cloud import storage
  import google.auth

  !pip install gcsfs
  import gcsfs

  from google.colab import auth
  auth.authenticate_user()

  credentials, default_project_id = google.auth.default()
  !gcloud config set project {project_id}# Creat a Spark session if in Google Colab
from pyspark.sql import SparkSession

if RunningInColab == True:
    spark = SparkSession.builder.master("local[*]").getOrCreate()# READ DATA

if RunningInColab == True:
  # Currently, it's not easy to read files from Google Storage using spark.read.csv in Google Colab.
  # As a workaround, we will copy the file from Google Storage to a local file in the runtime machine on which Colab is running.

  # Initialize a Google Cloud Storage client
  storage_client = storage.Client()

  # Download the file from GCS to a local file in the default folder "/content" in the runtime machine of colab
  temp_file_path = "/content/"+ movie_reviews_file_name

  print(f"bucket_name={bucket}")
  print(f"relative_path_to_file = {relative_path_to_file}")
  print(f"Downloading the file from GCS to a local file in Colab")
  bucket_object = storage_client.bucket(bucket)
  print(bucket_object.name)
  blob = bucket_object.blob(relative_path_to_file)
  print(blob)
  blob.download_to_filename(temp_file_path)

  print(f"Reading the local file using spark")
  # Read the file using the local temporary path
  spark_df = spark.read.csv(temp_file_path, sep=",", header=True)
else:
  #In GCP, we can read the file directly from Google Storage
  print(f"Reading the file using spark directly from GCS")
  spark_df = spark.read.csv(full_file_path, sep=",", header=True, inferSchema = True)

spark_df.describe().show()

# How many records got loaded?
print(f"Total number of records from data file = {spark_df.count()}")



Not running on CoLab
Package imports done
Reading the file using spark directly from GCS


25/04/11 22:58:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+---------+------------------+------------------+------------------+------------------+
|summary|     Name|         Inception|           Titanic|            Avatar|        The Matrix|
+-------+---------+------------------+------------------+------------------+------------------+
|  count|      100|               100|               100|               100|               100|
|   mean|     NULL| 3.113999999999999|2.9800000000000004|3.1839999999999997|2.9620000000000006|
| stddev|     NULL|1.1372499170091743|1.2192894105447922|1.1451893231507992|1.0880229498470788|
|    min|Alexander|               1.0|               1.0|               1.0|               1.0|
|    max|  William|               5.0|               5.0|               4.9|               4.9|
+-------+---------+------------------+------------------+------------------+------------------+

Total number of records from data file = 100


In [7]:
spark_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Inception: double (nullable = true)
 |-- Titanic: double (nullable = true)
 |-- Avatar: double (nullable = true)
 |-- The Matrix: double (nullable = true)



In [8]:
spark_df = spark_df.withColumn("Inception", col("Inception").cast(DoubleType()))
spark_df = spark_df.withColumn("Titanic", col("Titanic").cast(DoubleType()))
spark_df = spark_df.withColumn("Avatar", col("Avatar").cast(DoubleType()))
spark_df = spark_df.withColumn("The Matrix", col("The Matrix").cast(DoubleType()))
spark_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Inception: double (nullable = true)
 |-- Titanic: double (nullable = true)
 |-- Avatar: double (nullable = true)
 |-- The Matrix: double (nullable = true)



In [9]:
# Collect the ratings as a dictionary
ratings_dict = spark_df.rdd.map(lambda row: (row["Name"], [row["Inception"], row["Titanic"], row["Avatar"], row["The Matrix"]
                                                          ])).collectAsMap()


                                                                                

In [10]:
# Display a few entries in the dictionary
for key, value in list(ratings_dict.items())[:5]:
    print(f"{key}: {value}")

Rachel: [4.5, 2.3, 3.6, 3.0]
Larry: [2.1, 4.2, 2.9, 3.3]
Edward: [4.7, 1.1, 4.8, 3.4]
Gary: [4.2, 2.2, 4.4, 3.8]
Debra: [1.8, 4.2, 3.9, 2.7]


In [11]:
# Generate all pairs of users (user_name strings)
user_pairs = list(combinations(ratings_dict.keys(), 2))

print(user_pairs[:5])


[('Rachel', 'Larry'), ('Rachel', 'Edward'), ('Rachel', 'Gary'), ('Rachel', 'Debra'), ('Rachel', 'Carol')]


In [13]:
results = []
all_users = set()

for user1_name, user2_name in user_pairs:
    ratings_user1 = ratings_dict[user1_name]
    ratings_user2 = ratings_dict[user2_name]

    try:
        correlation, p_value = pearsonr(ratings_user1, ratings_user2)
        if correlation < 0:
            correlation = 0
        results.append((user1_name, user2_name, correlation, p_value))
        all_users.update([user1_name, user2_name])
    except Exception as e:
        print(f"Skipped pair ({user1_name}, {user2_name}) — Error: {e}")

print("All unique users involved in results:", sorted(all_users))
print(results[:5])


All unique users involved in results: ['Alexander', 'Amanda', 'Amy', 'Andrew', 'Angela', 'Anna', 'Anthony', 'Ashley', 'Barbara', 'Benjamin', 'Betty', 'Brandon', 'Brenda', 'Brian', 'Carol', 'Carolyn', 'Catherine', 'Charles', 'Christine', 'Christopher', 'Cynthia', 'Daniel', 'David', 'Deborah', 'Debra', 'Dennis', 'Donald', 'Donna', 'Dorothy', 'Edward', 'Elizabeth', 'Emily', 'Emma', 'Eric', 'Frank', 'Gary', 'George', 'Gregory', 'Helen', 'Jack', 'Jacob', 'James', 'Janet', 'Jason', 'Jeffrey', 'Jennifer', 'Jerry', 'Jessica', 'John', 'Jonathan', 'Joseph', 'Joshua', 'Justin', 'Karen', 'Katherine', 'Kathleen', 'Kenneth', 'Kevin', 'Kimberly', 'Larry', 'Laura', 'Linda', 'Lisa', 'Margaret', 'Maria', 'Mark', 'Mary', 'Matthew', 'Melissa', 'Michael', 'Michelle', 'Nancy', 'Nicholas', 'Nicole', 'Pamela', 'Patricia', 'Patrick', 'Paul', 'Rachel', 'Raymond', 'Rebecca', 'Richard', 'Robert', 'Ronald', 'Ruth', 'Ryan', 'Samantha', 'Sandra', 'Sarah', 'Scott', 'Sharon', 'Shirley', 'Stephanie', 'Stephen', 'Steven

In [14]:
# Step 2: Obtain a list of correlation values for each user and sort them (the higher the correlation the greater the similarity)

from collections import defaultdict
userDistances = defaultdict(dict)

# Task: You will create a dictionary userDistances that has a nested dictionary. The structure looks like the follows:
# {'key_userName': {'userName1': correlation between key_userName and userName1, 'userName2': correlation between key_userName and userName 2 ...},...}

# Your Code Starts HERE #
for user1, user2, correlation, _ in results:
    if user2 not in userDistances[user1]:
        userDistances[user1][user2] = correlation
    if user1 not in userDistances[user2]:
        userDistances[user2][user1] = correlation

# Your Code Ends HERE#



In [17]:
# Step 3 Determine each user's k Nearest Neighbors.


#The k Nearest neighbors of each user are essentially the first k users in the sorted inner dictionary in the decendant order (the
# higher the number the more similar).

for key, inner_dict in userDistances.items():
    userDistances[key] = dict(sorted(inner_dict.items(), key = lambda item: item[1], reverse = True))

#The print output below should show the key 'Edward' and the corresponding inner dictionary. The values in the inner dictionary should
# be sorted. They are the correlations between Rachel and other users

print(userDistances["Edward"])

k = 3

# Task: Now we only retain the first k users' information. Please note that we take not only their names but also their correlation coefficents with the user, as we will use them to
# calculate weights later. You will create a dictionary userKNNDistances to store such information. In this dictionary, a user's name is a key and the value of this key is an inner dictionary {nearest neighbor 1: coefficent value; nearest neighbor 2: coefficient value; ...}
userKNNDistances = {}
# Your Code Starts HERE
for key, inner_dict in userDistances.items():
    top_k_neighbors = dict(list(inner_dict.items())[:k])
    userKNNDistances[key] = top_k_neighbors
print("Edward's neighbors:", userKNNDistances["Edward"])

# Your Code Ends HERE

for key, inner_dict in list(userKNNDistances.items()):
    if (key == "Edward"):
        print(f"The three nearest neighbors and their distances to {key} are {inner_dict.items()}")

{'Sarah': 0.9961078632130002, 'Gary': 0.9886431886948035, 'Kevin': 0.9663526844810868, 'George': 0.9449897664771802, 'Joshua': 0.9308308637436923, 'Benjamin': 0.8992607007433459, 'Andrew': 0.8939635544523927, 'Jerry': 0.8899349063134984, 'Rachel': 0.8838821066510862, 'Brenda': 0.845568645734488, 'Donald': 0.8434548437347882, 'Jessica': 0.8400107008414178, 'William': 0.8173539673956701, 'Sharon': 0.8138086039790637, 'Ronald': 0.810454081347384, 'James': 0.8065174884550839, 'Jennifer': 0.7848106364871035, 'Carolyn': 0.7780905296182808, 'Nancy': 0.7651052880275334, 'Elizabeth': 0.7395932274474748, 'Emily': 0.7265138939446737, 'David': 0.7107839379747571, 'Donna': 0.7070710723297837, 'Stephanie': 0.6896281675560765, 'Alexander': 0.6804225750007863, 'Kathleen': 0.6629984200623009, 'Timothy': 0.6625800588444851, 'Catherine': 0.6593026183333233, 'Jason': 0.6461300931386637, 'Thomas': 0.6322039809758101, 'Pamela': 0.5779344204000937, 'Christopher': 0.5739717912626814, 'Dennis': 0.5604124522860

In [18]:
# Step 4 Compute the weights for the nearest neighbors based on their distances (pearson correlation) for each user
# weight of one Nearest Neighbor (NN) = (distance of that NN with userX) / (sum of distances between each NN with the user)
# e.g., for Rachel, the weight of Andrew would be 0.9996960054843319/(0.9996960054843319+0.9916728130040291+0.9826757036739959)

# You will use the dictionary userWeightNeighbors to record the users' names (keys of this dictionary), and the three nearest neighbors and their weights (an inner dictionary, the neighbors' names are the keys)

from collections import defaultdict

userWeightNeighbors = defaultdict(dict)

for key, inner_dict in userKNNDistances.items():
    denominator = sum(inner_dict.values())

    weights = {}
    for neighbor, similarity in inner_dict.items():
        weight = similarity / denominator if denominator > 0 else 0
        weights[neighbor] = weight

    userWeightNeighbors[key] = weights

# Your Code ENDs HERE

for key, value in userWeightNeighbors.items():
    if (key == "Edward"):
        print(f"The three nearnest neighbors of {key} and their weights for calculating {key}'s predicted ratings are: {value}")




The three nearnest neighbors of Edward and their weights for calculating Edward's predicted ratings are: {'Sarah': 0.33753739352852596, 'Gary': 0.3350079417758977, 'Kevin': 0.32745466469557644}


In [19]:
# Step 5 Suppose the new Mission Impossible movie just came out and some users have watched it but some are not.
# Find out who you would recommend this movie to, assuming that the predicted rating should be 3 (of the 5 rating scale)

# 5.1. READ THIS NEW DATA

movie_review_file_name = "missionImpossible_rating (1).csv"
relative_path_to_file = path_to_data_files[1:] + movie_review_file_name
full_file_path = "gs://" + bucket + "/" + relative_path_to_file

if RunningInColab == True:
  # Download the file from GCS to a local file in the default folder "/content" in the runtime machine of colab
  temp_file_path = "/content/"+ movie_review_file_name
    # Initialize a Google Cloud Storage client
  storage_client = storage.Client()

  # Download the file from GCS to a local file in the default folder "/content" in the runtime machine of colab
  temp_file_path = "/content/"+ movie_review_file_name
  print(f"bucket_name={bucket}")
  print(f"relative_path_to_file = {relative_path_to_file}")
  print(f"Downloading the file from GCS to a local file in Colab")
  bucket_object = storage_client.bucket(bucket)
  blob = bucket_object.blob(relative_path_to_file)
  blob.download_to_filename(temp_file_path)
  print(f"Reading the local file using spark")
  # Read the file using the local temporary path
  spark_recommendation_df = spark.read.csv(temp_file_path, sep=",", header=True)
else:
  #In GCP, we can read the file directly from Google Storage
  print(f"Reading the file using spark directly from GCS")
  relative_path_to_file = path_to_data_files[1:] + movie_review_file_name
  full_file_path_update = "gs://" + bucket + "/" + relative_path_to_file
  spark_recommendation_df = spark.read.csv(full_file_path, sep=",", header=True, inferSchema = True)

spark_recommendation_df.describe().show()


Reading the file using spark directly from GCS
+-------+---------+------------------+
|summary|     Name|Mission Impossible|
+-------+---------+------------------+
|  count|      100|               100|
|   mean|     NULL|1.6710000000000003|
| stddev|     NULL|1.6547632501785303|
|    min|Alexander|               0.0|
|    max|  William|               4.8|
+-------+---------+------------------+



In [20]:
spark_recommendation_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Mission Impossible: double (nullable = true)



In [21]:
# Task: If the value for Mission Impossible is a string type, we need to convert it to a Double Type
# (Hint: we have done this earlier in this code file for a different data file)

# Your Code Starts Here
from pyspark.sql.functions import col

# Convert the Mission Impossible column to double
spark_recommendation_df = spark_recommendation_df.withColumn(
    "Mission Impossible", col("Mission Impossible").cast("double")
)

# Confirm schema change
spark_recommendation_df.printSchema()

# Optional: show cleaned data
spark_recommendation_df.show(5)

# Your Code Ends Here

root
 |-- Name: string (nullable = true)
 |-- Mission Impossible: double (nullable = true)

+------+------------------+
|  Name|Mission Impossible|
+------+------------------+
|Rachel|               4.5|
| Larry|               2.1|
|Edward|               0.0|
|  Gary|               4.2|
| Debra|               0.0|
+------+------------------+
only showing top 5 rows



In [22]:
# 5.2. Find out who has not watched it yet



# First, collect the ratings as a dictionary

recommendation_dict = spark_recommendation_df.rdd.map(lambda row: (row["Name"], [row["Mission Impossible"]])).collectAsMap()




In [23]:
# Next, find out those with values being 0 and put them in a list usersNotWatched

# Your Code Starts HERE
# Your Code Starts HERE
usersNotWatched = (
    spark_recommendation_df
    .filter((col("Mission Impossible").isNull()) | (col("Mission Impossible") == 0.0))
    .select("Name")
    .rdd.flatMap(lambda x: x)
    .collect()
)
# Your Code Ends HERE

# Your Code Ends HERE

print(usersNotWatched)

['Edward', 'Debra', 'Carol', 'Christine', 'Nancy', 'Richard', 'Patricia', 'Jason', 'Thomas', 'Alexander', 'Mark', 'Anthony', 'Brian', 'Benjamin', 'Anna', 'Angela', 'Janet', 'Kathleen', 'Shirley', 'Dennis', 'Dorothy', 'Joshua', 'Carolyn', 'Andrew', 'Jessica', 'Deborah', 'Katherine', 'Emily', 'Timothy', 'Elizabeth', 'Paul', 'James', 'Tyler', 'Emma', 'William', 'Maria', 'John', 'Jacob', 'Raymond', 'Nicole', 'Gregory', 'Melissa', 'Steven']


In [24]:
# Now, let's find out from this list who we should recommend Mission Impossible, based on the criterion that the predicted
# rating should be at least 3. Remember, predicted rating = sum (weight of each Nearest Neighbor * rating given by that neighbor)

predictedRatings = {}

# First, we calculate predicted ratings for all who have not watched the movie yet
for key in usersNotWatched:
    predictedRatings[key] = 0
    weights = userWeightNeighbors[key].values()
    neighbors = userWeightNeighbors[key].keys()

    # Your Code Starts HERE
    for neighbor in neighbors:
        neighbor_rating = spark_recommendation_df.filter(col("Name") == neighbor).select("Mission Impossible").collect()[0][0]
        if neighbor_rating is not None:
            neighbor_rating = float(neighbor_rating)  # ensure it’s numeric
            if neighbor_rating > 0:
                weight = userWeightNeighbors[key][neighbor]
                predictedRatings[key] += weight * neighbor_rating
    # Your Code Ends HERE

#Let's print out one from predictedRatings dictionary using Edward as the key

print(predictedRatings["Edward"])


3.7793503003122257


In [25]:
# From this predictedRatings dictionary, we will select those whose ratings is at least 3. We use a list forRecommendations to store
# the list of these users

# Your Code STARTS HERE
forRecommendations = [user for user, rating in predictedRatings.items() if rating >= 3]

# Your Code ENDS Here

print(f"This is the list of users we will recommend the new movie Mission Impossible: {forRecommendations}")

This is the list of users we will recommend the new movie Mission Impossible: ['Edward', 'Thomas', 'Andrew']
