# MLSD Assignment 2 - User Movie ratings

### We want to recommend new movies to an user based on users with similar ratings that watched the movie.

In [2]:
!pip install pyspark
!pip install pandas
!pip install numpy
!pip install sympy
!pip install tqdm




[notice] A new release of pip is available: 23.1 -> 23.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 23.1 -> 23.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 23.1 -> 23.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
import pandas as pd 
import numpy as np
from tqdm import tqdm # progress bar
from sympy import nextprime

In [4]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext(appName="exercise1")
spark = SparkSession.builder.appName("BFR").getOrCreate()

Read the dataset

In [5]:
df = spark.read.option("sep", "\t").csv("data/movielens/u.data") 

Rename columns

In [6]:
new_columns = ["user id", "item id", "rating","timestamp" ]

df_rdd = df.rdd
df = df_rdd.toDF(new_columns)
df.show()

+-------+-------+------+---------+
|user id|item id|rating|timestamp|
+-------+-------+------+---------+
|    196|    242|     3|881250949|
|    186|    302|     3|891717742|
|     22|    377|     1|878887116|
|    244|     51|     2|880606923|
|    166|    346|     1|886397596|
|    298|    474|     4|884182806|
|    115|    265|     2|881171488|
|    253|    465|     5|891628467|
|    305|    451|     3|886324817|
|      6|     86|     3|883603013|
|     62|    257|     2|879372434|
|    286|   1014|     5|879781125|
|    200|    222|     5|876042340|
|    210|     40|     3|891035994|
|    224|     29|     3|888104457|
|    303|    785|     3|879485318|
|    122|    387|     5|879270459|
|    194|    274|     2|879539794|
|    291|   1042|     4|874834944|
|    234|   1184|     2|892079237|
+-------+-------+------+---------+
only showing top 20 rows



In [7]:
from pyspark.sql.functions import asc
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col

df = df.select('user id', 'item id', 'rating')

# loop through columns and cast to float
for col_name in df.columns:
    df = df.withColumn(col_name, col(col_name).cast("int")) # Convert string values to int values
# assuming df is your dataframe
sorted_df = df.orderBy(asc("user id"), asc("item id"), asc("rating"))

In [8]:
sorted_df.show()

+-------+-------+------+
|user id|item id|rating|
+-------+-------+------+
|      1|      1|     5|
|      1|      2|     3|
|      1|      3|     4|
|      1|      4|     3|
|      1|      5|     3|
|      1|      6|     5|
|      1|      7|     4|
|      1|      8|     1|
|      1|      9|     5|
|      1|     10|     3|
|      1|     11|     2|
|      1|     12|     5|
|      1|     13|     5|
|      1|     14|     5|
|      1|     15|     5|
|      1|     16|     5|
|      1|     17|     3|
|      1|     18|     4|
|      1|     19|     5|
|      1|     20|     4|
+-------+-------+------+
only showing top 20 rows



Create a item-user table with the ratings

In [9]:
# pivot the data on item id and aggregate the rating by user id
df = sorted_df.groupBy('item id').pivot('user id').agg({'rating': 'first'}).na.fill(int(0))

In [10]:
# order the index
df = df.orderBy(asc("item id"))
new_column_name = "item/ user"
old_column_name = "item id"
df = df.withColumnRenamed(old_column_name,new_column_name)

df.show()

+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

Function of the Pearson correlation

In [11]:
def  N(row):
  "Returns the number of values that are different from 'null' in order to compute its mean"
  
  N = 0
  for i in row:
    if i != 0:
      N += 1
  return N 

In [13]:
def pearson_corr(row, N):
  "Returns a row of the Pearson correlation"
  mean = np.sum(row)/N
  new_row = [] 
  
  for i in row:
    if i != 0:
      new_number = i - mean
      new_row.append(new_number) 
    else:
      new_row.append(i)
  return new_row


In [14]:
# count the number of non-zero ratings
user_item_matrix = df.rdd.map(lambda x: x[1:])

user_item_matrix = user_item_matrix.map(lambda x: (x, N(x)))

In [15]:
# apply the pearson correlation to the user-item matrix
matrix_scaled = user_item_matrix.map(lambda x: pearson_corr(x[0],x[1]))

Add the index again

In [16]:
matrix_scaled = matrix_scaled.zipWithIndex().map(lambda x: (x[1], x[0])).cache()
new_matrix_scaled = matrix_scaled.collect()

In [17]:
num_items = matrix_scaled.count()
num_items

1682

## Apply LSH to find candidate pairs

The same way it was done in the first assignment

In [18]:
num_users = 943
num_items = 1682 

In [19]:
num_permutations = 13 * 11 # bands x rows obtained from b*r=n; n is the number of permutations

p = nextprime(num_users)
a = np.random.randint(1, p, size = (num_permutations))
b = np.random.randint(0, p, size = (num_permutations))

a_bc = sc.broadcast(a)
b_bc = sc.broadcast(b)

def hash_function(x, i):
  return ((a_bc.value[i] * hash(x) + b_bc.value[i]) % p) % num_users

In [20]:
def minhash(item, permutations):
    sign_values = []
    for i in range(permutations):
      hash_value = [hash_function(user,i) for user in item]
      sign_values.append(min(hash_value))
    return sign_values


In [21]:
def signature_matrix(user_item_mat, permutations, number_of_items , block_size):
    # Create a set for each hash_set
    signature_matrix_blocks = []
    # Shingles with an index
    user_item_mat_with_ids = user_item_mat.zipWithIndex().map(lambda x: (x[1], x[0])).cache()

    # Process the data in blocks
    for i in tqdm(range(0, number_of_items , block_size)):
        # Get the next block of data and compute the MinHash values for each document in the block
        block = user_item_mat_with_ids.filter(lambda x: x[0] >= i and x[0] < i + block_size)\
        .map(lambda x: (minhash(x[1][1], permutations)))

        # Construct the signature matrix for the block
        signature_matrix_block = np.array(block.collect()).T
        
        # Add the block's signature matrix to the list of blocks
        signature_matrix_blocks.append(signature_matrix_block)
        
    # Combine the signature matrices for each block
    signature_matrix = np.concatenate(signature_matrix_blocks, axis=1)
    
    return  signature_matrix


In [22]:
def get_buckets(bands,number_of_items):
  # This function hash band columns into buckets
  buckets = []
  for band in tqdm(bands, desc = 'Bucket hashing'):
    
    band_hashed = [hash_function(tuple(band[:, j]), 0) for j in range(number_of_items)]
    bucket_dict = {}
        
    for j, b_hash in enumerate(band_hashed):
      if b_hash not in bucket_dict:
        bucket_dict[b_hash] = []
      bucket_dict[b_hash].append(j)
    buckets.append(bucket_dict)
  return buckets


In [23]:
from itertools import combinations

def lsh(sig_mat,number_of_items,num_bands,num_permutations):
    # Hash bukets
    bands = np.split(sig_mat, num_bands)
    buckets = get_buckets(bands,number_of_items)

    # Generate candidate pairs
    candidate_pairs = set()
    for bucket in tqdm(buckets, desc = 'Candidate pairs generation'):
        for b_hash, items in list(bucket.items()):
            if len(items) < 2:
                continue
            for pair in combinations(items, 2):
                candidate_pairs.add(pair)

    candidate_pairs = list(candidate_pairs)
    return candidate_pairs # It returns the ID of each pair

In [24]:
block_size = 100
signature_mat = signature_matrix(matrix_scaled, num_permutations,num_items, block_size)

100%|██████████| 17/17 [03:00<00:00, 10.59s/it]


In [25]:
num_bands = 13
candidate_pairs = lsh(signature_mat,num_items,num_bands,num_permutations)

Bucket hashing: 100%|██████████| 13/13 [00:00<00:00, 124.04it/s]
Candidate pairs generation: 100%|██████████| 13/13 [00:00<00:00, 98.24it/s]


Functions to obtain cosine similarity between items

In [26]:
def cosine_sim(r1, r2):
  return np.dot(r1, r2) / (np.linalg.norm(r1) * np.linalg.norm(r2))

Testing on the first 2 items

In [27]:
cosine_sim(new_matrix_scaled[0][1], new_matrix_scaled[1][1])

0.09133106690893115

In [28]:
print("candidate pairs: ",len(candidate_pairs))

def compute_weights(c_pairs, ratings_mat):
    sim_weights = {}

    # get similarities between candidate pairs
    for item1, item2 in candidate_pairs:
        r1 = new_matrix_scaled[item1 - 1][1]
        r2 = new_matrix_scaled[item2 - 1][1]
        # many are 0 because norm of some items are 0
        sim = cosine_sim(r1, r2)
        try:
            if np.nan:
                sim_weights[item1].append(0)
            else:
                sim_weights[item1].append(sim)
        except KeyError:
            sim_weights[item1] = []
        
    return sim_weights

sim_weights = compute_weights(candidate_pairs, new_matrix_scaled)

  return np.dot(r1, r2) / (np.linalg.norm(r1) * np.linalg.norm(r2))


candidate pairs:  37421


In [29]:
def calculate_new_ratings(item_i, user, ratings_mat, sim_weights):

  '''
  Given the id of item_i, it calculates the new rating using the weighted sum
  '''

  item_i = ratings_mat[item_i][1][user]
  # new_rating = np.sum([w * x for x ])
  # sum_weigth[item_i][user]
  # Initialize our coefficients
  
  new_rating = 0
  
  for item, row in sim_weights.items():
    new_rating += row[user] * ratings_mat[item - 1][1][user]
  
  return new_rating

In [30]:
new_matrix = matrix_scaled.map(lambda row: [i if i != 0 
                                            else calculate_new_ratings(row, idx, new_matrix_scaled, sim_weights) 
                                            for idx, i in enumerate(row[1])] )

In [31]:
new_matrix.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 137.0 failed 1 times, most recent failure: Lost task 0.0 in stage 137.0 (TID 38) (DESKTOP-PI3ARK7.Home executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\PySpark\spark-3.3.2-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 686, in main
  File "C:\PySpark\spark-3.3.2-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 678, in process
  File "C:\PySpark\spark-3.3.2-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\PySpark\spark-3.3.2-bin-hadoop3\python\lib\pyspark.zip\pyspark\util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\franc\AppData\Local\Temp\ipykernel_2096\1151421235.py", line 1, in <lambda>
  File "C:\Users\franc\AppData\Local\Temp\ipykernel_2096\1151421235.py", line 2, in <listcomp>
  File "C:\Users\franc\AppData\Local\Temp\ipykernel_2096\1866953820.py", line 7, in calculate_new_ratings
TypeError: list indices must be integers or slices, not tuple

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor76.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\PySpark\spark-3.3.2-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 686, in main
  File "C:\PySpark\spark-3.3.2-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 678, in process
  File "C:\PySpark\spark-3.3.2-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\PySpark\spark-3.3.2-bin-hadoop3\python\lib\pyspark.zip\pyspark\util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\franc\AppData\Local\Temp\ipykernel_2096\1151421235.py", line 1, in <lambda>
  File "C:\Users\franc\AppData\Local\Temp\ipykernel_2096\1151421235.py", line 2, in <listcomp>
  File "C:\Users\franc\AppData\Local\Temp\ipykernel_2096\1866953820.py", line 7, in calculate_new_ratings
TypeError: list indices must be integers or slices, not tuple

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
