##Q7. Collaborative Filtering (Streaming Distributed Stochastic Gradient Descent for Matrix Factorization)



Imports

In [0]:
# Necessary Libraries
import scipy.sparse as sparse
import scipy.stats as stats
import numpy as np
import threading
import random

Function 1: Creating Rating Matrix Randomly 

In [0]:
# Creating the Rating Matrix that has values of rating between 0 to 9
def create_Rating_matrix(numUsers, numMovies):
  np.random.seed(42)
  A = sparse.random(numUsers, numMovies, density=0.3)*10    # Sparse rating matrix of 10 X 10
  A = np.floor(A.toarray())                    # to make values of ratings between 0 to 9
  return A                                    # returning the rating matrix

Function 2: Extracting list of tuples (i,j,Rij) from the Rating Matrix R

In [0]:
# Function that makes a list of tuples (i,j,Rij) from the rating matrix R 
def listOfTuples(R):
  list1 = []
  cnt = 0
  for i in range(len(R)):     # iterating through all rows in R
    for j in range(len(R[0])):   # iterating through all columns in R
      if(R[i][j]!=0):      # Rij not equal to zero for training purpose
        list1.append((i,j,R[i][j]))   # adding tuples (i,j,Rij) where i-> user id and j -> movie id
        cnt = cnt + 1
  print('No.  of non-zeros:',cnt) 
  return list1  # returning the list of tuples

Function 3: Generator Function

In [0]:
# Generator function yielding tuples (i,j,Rij) for streaming purposes
def myGeneratorfun():
    counter=0
    random.shuffle(list1) # randomly shuffling the list of tuples (i,j,Rij)
    for chunk in list1:
        counter=counter+1
        yield (chunk)

Function 4: To check whether the Block is locked or not

In [0]:
# Functions to check whether a block is locked or not
def check_block(block, rows, cols):
  if(any(block[0]==i for i in rows) or any(block[1]==i for i in cols) ):     # if block[0] = block row number is in block rows list OR if block[1] = block column number is in block columns list then return false 
    return False  # Block is locked
  else:
    return True  # Block is unlocked
    
def check_block2(block, rows, cols):
  if(any(block[0]==i for i in rows) and any(block[1]==i for i in cols) ):   # if block[0] = block row number is in block rows list and if block[1] = block column number is in block columns list then return false 
    return False  # Block is locked
  else:
    return True  # Block is unlocked

Function 5: Find block number (p,q) of the tuple (i,j,Rij)

In [0]:
#Function that computes the block number (p,q) of the tuple (i,j, Rij)
def find_block(list1, u, m, d):
  index1 = u/d    # NUmber of users in one block
  index2 = m/d   # number of movies in one block
  p = np.floor(list1[0]/index1)    # Block row p
  q = np.floor(list1[1]/index2)    # Block col q
  return p,q

Thread Class for Worker

In [0]:
# Thread Class
class myThread (threading.Thread):

    # Init Method of thread class  (Worker)
   def __init__(self, threadID, name):
      threading.Thread.__init__(self) 
      self.threadID = threadID     # setting the TheadId for accessing ID of different worker
      self.name = name              # setting the thread name

   # Implementation of WORKER FUNCTION
   def run(self):   
      #print("Starting ", self.name, " ", self.threadID)

      # Running Continuously
      while(True):
        
        # Indicator set for extracting (p,q) sent by master 
        # If the flag for that worker ID is 2, then (p,q) is received from master, else wait 
        if(Worker_flags[self.threadID] == 2):
          
          # Extracting (p,q) that is arrived in the Worker_blocks[p] list added by master method
          (p,q) = Worker_blocks[self.threadID].pop()

          # setting the indicator back to 0 for further indications of addition of (p,q) block number
          Worker_flags[int(p)] = 0  # indicator

          # Request Up from data matrix U
          # finding the total number of users and movies in one block 
          index = numUsers/numWorkers
          index2 = numMovies/numWorkers
          
          # Extracting Up block from the matrix U 
          start = int(p*index)      # defining the start index 
          end = int((p+1)*index)    # defining the end index 
          Up = U[:,start:end]       # Extracting Up from U 

          # Extracting Vq block from the matrix V
          start = int(q*index2)     # defining the start index 
          end = int((q+1)*index2)   # defining the end index 
          Vq = V[:,start:end]       # Extracting Vq from V 

          pq_list = []  # empty list

          # Finding (i,j,Rij) that belongs to (p,q) block. Tuples (i,j,Rij) that belongs to block row p are mentioned in Worker_tuples[p]
          for i,j,rating in Worker_tuples[int(p)]:   # iterating through all tuples present in block row p
            nos_in_block = numMovies/numWorkers      # numbers of movie indexes in one block
            checkQ = np.floor(j/nos_in_block)        # calculating the block col 'checkQ' of the tiple (i,j,Rij) from for loop

            # if the block col checkQ matches with q 
            if(checkQ==q):
              pq_list.append((i,j,rating))       # add the tuples in the empty list pq_list
              Worker_tuples[int(p)].remove((i,j,rating)) # remove that tuple from the Worker_tuples 

          # for each i,j,Rij in pq_list, update the blocks U and V
          for (i,j,rating) in pq_list:
            try :
                list1.remove((i,j,rating))      # remove the tuple (i,j,Rij)
                pq_list.remove((i,j,rating))    # remove the tuple (i,j,Rij)

                pos_user = int(i - (p)*index)    # mapping the index of i (ith user for U) for Up  
                pos_movie = int(j - (q)*index2)  # mapping the index of j (jth user in V) for Vq

                # Simultaneous Update of ui row and vj col
                temp_U = Up[:,pos_user]          # storing the vector into a temporary variable
                temp_V = Vq[:,pos_movie]          # storing the vector into a temporary variable

                for no in range(no_of_iteration):
                    temp_U = temp_U - 2*eta*( (np.dot(temp_U.T, temp_V)-rating)*temp_V + (lambda1/numMovies)*temp_U )   # Updating ui
                    temp_V = temp_V - 2*eta*( (np.dot(temp_U.T, temp_V)-rating)*temp_U + (lambda1/numUsers)*temp_V )    # updating vj
                
                Up[:,pos_user] = temp_U   # assigning temp values to final
                Vq[:, pos_movie] = temp_V  # assigning temp values to final
                print('predicted: ', np.dot(Up[:,pos_user].T, Vq[:,pos_movie]) , ' actual: ', rating) 

                # Adding the error amount (actual - predicted) in the list of error in worker p
                error[self.threadID].append(np.dot(Up[:,pos_user].T, Vq[:,pos_movie]) - rating)  
            except Exception:
                print('',end="")   # exception handling 
           
          # Sending Up back to matrix store - Assigning Up to appropriate block in U
          start = int(p*index)
          end = int((p+1)*index)
          U[:,start:end] = Up

           # Sending Vq back to matrix store - Assigning Vq to appropriate block in V
          start = int(q*index2)
          end = int((q+1)*index2)
          V[:,start:end] = Vq

          # send (p,q) back to sender - Removing the blocking on block row p and block col q
          if(not check_block2((p,q),block_rows,block_cols)):
            block_rows.remove(p)
            block_cols.remove(q)

Initialization of Variables

In [8]:
# Creating Random Matrix R and extracting the list of tuples (i,j,Rij) corresponding to non zero ratings
numUsers = 10
numMovies = 10
R = create_Rating_matrix(numUsers, numMovies)   #  Rating matrix (randomly initialized sparse matrix)
list1 = listOfTuples(R)      # Making tuples (i,j,Rij) for which Rij is not equal to zero

# Initializing Variables
numWorkers = 5           # number of workers
numFactors = 5           # number of factors
eta = 0.003              # eta value
lambda1 = 0.3            # Lambda value
no_of_iteration = 100    # Number of iterations for updating ui and vj
numUsers = R.shape[0]    # number of users
numMovies = R.shape[1]   # number of movies         
block_rows = []          # list of blocked block row numbers
block_cols = []          # list of blocked block col numbers

Threads = np.ndarray((numWorkers,),dtype=np.object)         # Threads of size number of workers
Worker_tuples = np.ndarray((numWorkers,),dtype=np.object)   # List of size number of workers for tuples (i,j,Rij) for each worker
Worker_blocks = np.ndarray((numWorkers,),dtype=np.object)   # List of size number of workers for block numbers (p,q) for each worker
error = np.ndarray((numWorkers,),dtype=np.object)           # List of size number of workers for error calculaton for each worker
Worker_flags = np.ndarray((numWorkers,),dtype=np.object)    # List of size number of workers for flags to indicate different events

# Randomly intialize U and V (initially) - MATRIX STORE 
U = np.random.rand(numFactors, numUsers)
V = np.random.rand(numFactors, numMovies)

No.  of non-zeros: 28


Master Method

In [9]:
# MASTER FUNCTION

# Create all 10 workers (threads)
for i in range(numWorkers):
  Threads[i] = myThread(i, "Worker"+str(i))
for i in range(numWorkers):  
  Threads[i].start()

# Initializing the lists for every worker
for i in range(numWorkers):  
  Worker_tuples[i] = []
  Worker_blocks[i] = []
  error[i] = []
 
# Repeat till all the tuples in the tuple list created from the matrix R are processed
while len(list1)!=0:   
  myGenerator = myGeneratorfun()   # calling Generator function 
  for m in myGenerator:          # m = (i,j, Rij)
    p, q = find_block(m, numUsers, numMovies, numWorkers)  # finding the block number to which (i,j,Rij) - m tuple belongs
    p = int(p) # converting to int
    q = int(q) # converting to int
    
    # Forward (i,j,Rij) to worker p = add (i,j,Rij) into list of worker p and set the indicator to 1 of worker p
    Worker_tuples[int(p)].append(m)

    # Randomly select p1 and q1 from unlocked blocks and sending to worker
    while True:
        p1 = random.randrange(numWorkers)   # randomly select p1
        q1 = random.randrange(numWorkers)   # randomly select a1
        block = (p1,q1)  # make tuple
        answer = check_block(block, block_rows, block_cols)   # Check if the block is locked or not
    
        if(answer == True):  # if that block is not blocked, then select it and block rows and cols of it
            block_rows.append(p1)    # locking the block row q
            block_cols.append(q1)    # locking the block col q
            Worker_blocks[p1].append((p1,q1))  # add the tuple (p,q) - block number inside worker p block list
            Worker_flags[int(p1)] = 2  # indicator that (p,q) have arrived to worker p
            break
        else:  # if that block is blocked, find another one randomly
            continue

predicted:  4.9513923206401405  actual:  5.0
predicted:  1.9087970384844852  actual:  2.0
predicted:  5.968303349088524  actual:  6.0
predicted:  5.962801443095854 predicted:  1.0952013649015058  actual:  1.0
 actual:  6.0predicted: 
 7.966176399007296  actual:  8.0
predicted:  6.973435963574142  actual:  7.0
predicted: predicted:   3.9812605151903027  actual:  4.0
6.971978505070078  actual:  7.0
predicted:  4.924449164565523  actual:  5.0
predicted:  7.969972731883546  actual:  8.0
predicted:  3.9649098461349412  actual:  4.0
predicted: predicted:  2.975392392960269  actual:  3.0
 8.970505349107999  actual:  9.0
predicted:  5.968647575460187  actual:  6.0
predicted:  7.968430530995277  actual:  8.0
predicted:  6.973397463299582  actual:  7.0
predicted:  6.972530297036958  actual:  7.0
predicted: predicted:  8.973322376119398  actual:  9.0
 2.0158635401578557  actual:  2.0
predicted: predicted:  1.9956734663427322  actual:  2.0
 2.9944774304849187predicted:  5.977085732366358  actual: 

Calculating MAE and MSE Error

In [10]:
# Calculating MSE and MAE error between actual and predicted ratings
cnt = 0
sum1 = 0
sum2 = 0
for i in range(numWorkers):   
    sum1 = sum1 + np.sum(np.abs(error[i]))    # adding all the errors from each worker i
    sum2 = sum2 +np.sum(np.multiply(error[i],error[i]))  # adding all the sqaured errros from each worker i 
    cnt = cnt + len(error[i])       # counting the values

# Printing MAE and MSE error
print('after training on ',cnt,' tuples' )
print('MAE:',sum1/cnt)
print('MSE:',sum2/cnt)


after training on  28  tuples
MAE: 0.03262844013298055
MSE: 0.0015189104413599463
