#### Factorization Machines in Spark
Author: Gautam S. Muralidhar
#### Reference: http://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf
#### Spark Environment: Spark 1.6.1
#### To run this notebook: 
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark

#### First define a few helper functions that will help parse the data, originally in (user,item,rating) format, and convert it to a feature vector

In [2]:
# Function that parses a line in the data csv file and returns an array of (user, item, rating)
# values
def parseLine(line):
    import numpy as np
    (user,item,rating,ts)=line.split('\t')
    return np.array([int(user),int(item),float(rating)])

# Function that returns a key-value pair, with key being the userid and value being 1
def getUser(arr):
    return (arr[0],1)

# Function that returns a key-value pair, with key being the itemid and value being 1
def getItem(arr):
    return (arr[1],1)

# Function that returns the rating for the given tuple
def getRating(arr):
    return arr[2]

# Function that creates a feature vector from a (user,item,rating) tuple
def createFMData(arr):
    import numpy as np
    all_users = usersBroadcastVar.value
    all_items = itemsBroadcastVar.value
    numusers = len(all_users)
    numitems = len(all_items)
    useridx = {}
    itemidx = {}
    
    for i in range(0,numusers):
        useridx[int(all_users[i])] = i
    for i in range(0,numitems):
        itemidx[int(all_items[i])] = i
        
    x = [0 for i in range(0,numusers+numitems+1)] # +1 is for the rating at the end
    user_id = int(arr[0])
    item_id = int(arr[1])
    rating  = arr[2]
    if useridx.has_key(user_id):
        x[useridx[user_id]] = 1.0
    if itemidx.has_key(item_id):
        x[numusers+itemidx[item_id]] = 1.0
    x[-1] = rating
    return x

In [3]:
# Create an RDD based on the Training Data File
trainingFileRdd = sc.textFile(
                    "/Users/gmuralidhar/Projects/MLDatasets/movielens-100k/ua.base",
                    minPartitions = 16, 
                    use_unicode = True
                )
for line in trainingFileRdd.take(5):
  print line

trainRdd = trainingFileRdd.map(parseLine)
trainUsers = trainRdd.map(getUser).distinct().sortByKey().keys().collect()
trainItems = trainRdd.map(getItem).distinct().sortByKey().keys().collect()
minTrainRating = trainRdd.map(getRating).min()
maxTrainRating = trainRdd.map(getRating).max()

print minTrainRating, maxTrainRating
usersBroadcastVar = sc.broadcast(trainUsers)
itemsBroadcastVar = sc.broadcast(trainItems)

# Create the Training Data RDD
trainingData = trainRdd.map(createFMData)

1	1	5	874965758
1	2	3	876893171
1	3	4	878542960
1	4	3	876893119
1	5	3	889751712
1.0 5.0


In [4]:
print type(trainingData)
oneExample = trainingData.take(1)[0]
print oneExample

<class 'pyspark.rdd.PipelinedRDD'>
[1.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

In [5]:
# Create an RDD based on the Testing Data File
testingFileRdd = sc.textFile(
                    "/Users/gmuralidhar/Projects/MLDatasets/movielens-100k/ua.test",
                    minPartitions = 16, 
                    use_unicode = True
                )
for line in testingFileRdd.take(5):
  print line

testRdd = testingFileRdd.map(parseLine)
# Create the Training Data RDD
testingData = testRdd.map(createFMData)

1	20	4	887431883
1	33	4	878542699
1	61	4	878542420
1	117	3	874965739
1	155	2	878542201


In [6]:
print type(testingData)
oneExample = testingData.take(1)[0]
print oneExample

<class 'pyspark.rdd.PipelinedRDD'>
[1.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

In [7]:
print trainingData.getNumPartitions()
print testingData.getNumPartitions()

16
16


#### We will now implement the FM learning function.

#### First define the function SGDUpdate that will be executed on the Spark executors. This is the function that will be used in a mapPartitions transformation on the training data RDD and called from the driver . 

In [8]:
def SGDUpdate(
    iterator,
    minRatingBroadcastVar, 
    maxRatingBroadcastVar, 
    paramsBroadcastVar,
    weightsBroadcastVar
):
    import numpy as np
    
    def update_params_batch(
            x,
            y,
            w_t,
            n,
            k,
            miny,
            maxy,
            lambda0,
            lambda1,
            lambda2,
            learning_rate,
            b
        ):
            w0 = w_t[0]
            w  = np.reshape(np.array(w_t[1:n+1]), (1,n))
            v  = np.matrix(np.reshape(w_t[n+1:],(n,k)))

            y_hat = predict(x,w0,w,v)
            y_hat = np.minimum(maxy, y_hat)
            y_hat = np.maximum(miny, y_hat)
            term1 = 2*(y_hat - y)

            gradw0 = np.sum(term1, axis = 0) + lambda0*w0
            
            if(gradw0.shape[0] <> 1 or gradw0.shape[1] <> 1):
                raise ValueError('Gradient w0 has more than 1 element')
            gradw = np.sum(np.multiply(term1,x), axis = 0) + lambda1*w
            if(gradw.shape[1] <> x.shape[1]):
                raise ValueError('Gradient w has incorrect number of elements')   
            gradv = np.zeros(v.shape)
            d = x.shape[0]
            instances = range(0,d)
            for dd in instances:
                xd = x[dd,:]
                xdv = xd*v
                gradv += term1[dd][0,0] * (
                            np.multiply(xd.T,np.repeat(xdv,xd.shape[1],axis=0)) -
                            np.multiply(v,np.square(xd.T))
                        ) 
            gradv += lambda2*v

            w0 -= (learning_rate/b*1.0)*gradw0
            w  -= (learning_rate/b*1.0)*gradw
            v  -= (learning_rate/b*1.0)*gradv
            return [w0[0,0]] + w[0,:].tolist() + np.array(np.reshape(v,(1,n*k))).tolist()[0]
    
    def update_params(
            x,
            y,
            w_t,
            n,
            k,
            miny,
            maxy,
            lambda0,
            lambda1,
            lambda2,
            learning_rate
        ):
            w0 = w_t[0]
            w  = np.reshape(np.array(w_t[1:n+1]),(1,n))
            v  = np.matrix(np.reshape(w_t[n+1:],(n,k)))

            y_hat = predict(x,w0,w,v)
            y_hat = np.minimum(maxy, y_hat)
            y_hat = np.maximum(miny, y_hat)
            term1 = 2*(y_hat - y)
            gradw0 = term1 + lambda0*w0
            if(gradw0.shape[0] <> 1 or gradw0.shape[1] <> 1):
                raise ValueError('Gradient w0 has more than 1 element')
            gradw = np.multiply(term1,x) + lambda1*w
            if(gradw.shape[1] <> x.shape[1]):
                raise ValueError('Gradient w has incorrect number of elements')   
            gradv = np.zeros(v.shape)
            xdv = x*v
            gradv += term1[0,0] * (
                        np.multiply(x.T,np.repeat(xdv,x.shape[1],axis=0)) -
                        np.multiply(v,np.square(x.T))
                    ) 
            gradv += lambda2*v
            w0 -= learning_rate*gradw0
            w  -= learning_rate*gradw
            v  -= learning_rate*gradv
            return [w0[0,0]] + w[0,:].tolist() + np.array(np.reshape(v,(1,n*k))).tolist()[0]
    
    def predict(x, w0, w, v):
        return (w0 + 
            x*w.T + 
            0.5*(np.sum(np.square(x*v) - (np.square(x)*np.square(v)),axis=1))
        )
    
    miny = minRatingBroadcastVar.value
    maxy = maxRatingBroadcastVar.value
    parameters = paramsBroadcastVar.value
    
    lambda0 = parameters[0]
    lambda1 = parameters[1]
    lambda2 = parameters[2]
    learning_rate = parameters[3]
    n = int(parameters[4])
    k = int(parameters[5])
    random_seed = parameters[6]
    
    mat = np.matrix(list(iterator),dtype=float)
    x = mat[:,0:n]
    y = np.array(mat[:,n],dtype=float)
    d = x.shape[0]
    
    minibatch_size = 300
    indx = range(0,d)
    if (random_seed != 'None'):
        random_seed = int(random_seed)
        np.random.seed(random_seed)

    w_t = np.array(weightsBroadcastVar.value)
    if (d >= 600):
        shuffledindx = np.random.permutation(indx)
        start = 0;
        end = minibatch_size
        while (start < d):
            xbatch = x[shuffledindx[start:end],:]
            ybatch = y[shuffledindx[start:end]]
            w_t = update_params_batch(
                        xbatch,
                        ybatch,
                        w_t,
                        n,
                        k,
                        miny,
                        maxy,
                        lambda0,
                        lambda1,
                        lambda2,
                        learning_rate,
                        xbatch.shape[0]
                )
            start += minibatch_size
            end += minibatch_size
            if (end > d):
                end = d
    else:
        shuffledindx = np.random.permutation(indx)
        for i in shuffledindx:
            xd = x[i,:]
            yd = y[i]
            w_t = update_params(
                        xd,
                        yd,
                        w_t,
                        n,
                        k,
                        miny,
                        maxy,
                        lambda0,
                        lambda1,
                        lambda2,
                        learning_rate
                )
    return w_t
    # If reduceByKey is called on mapPartitions, then return the tuple below.
    #wt_indx = range(0,n+(n*k)+1)
    #return zip(wt_indx,w_t)

#### Now we will create the driver function called fm_fit

In [9]:
def fm_fit(
    trainingDataRdd,
    minRating,
    maxRating,
    params
):
    import numpy as np
    from functools import partial
    from operator import add

    lambda0 = 1
    lambda1 = 1
    lambda2 = 1
    n_iter = 100
    learning_rate = 1
    k = 10
    random_seed = 'None'
    
    if params is not None and len(params) != 0:
        if params.has_key('lambda0'):
            lambda0 = float(params['lambda0'])
        if params.has_key('lambda1'):
            lambda1 = float(params['lambda1'])
        if params.has_key('lambda2'):
            lambda2 = float(params['lambda2'])
        if params.has_key('n_iter'):
            n_iter = int(params['n_iter'])
        if params.has_key('learning_rate'):
            learning_rate = float(params['learning_rate'])
        if params.has_key('k'):
            k = int(params['k'])
        if params.has_key('random_seed'):
            random_seed = int(params['random_seed'])
    
    n = len(trainingDataRdd.take(1)[0])-1
    
    parameters = [lambda0,lambda1,lambda2,learning_rate,n,k,random_seed]

    w0 = 0
    w = np.zeros([1,n])
    v = np.random.normal(scale=0.1,size=(n,k))
    w_t = [w0] + w[0,:].tolist() + np.array(np.reshape(v,(1,n*k))).tolist()[0]

    paramsBroadcastVar    = sc.broadcast(parameters)
    minRatingBroadcastVar = sc.broadcast(minRating)
    maxRatingBroadcastVar = sc.broadcast(maxRating)

    for i in range(0,n_iter):
        print 'Epoch number: ' + str(i)
        weightsBroadcastVar = sc.broadcast(w_t)
        weights = trainingDataRdd.mapPartitions(partial(
                                                SGDUpdate,
                                                minRatingBroadcastVar=minRatingBroadcastVar,
                                                maxRatingBroadcastVar=maxRatingBroadcastVar,
                                                paramsBroadcastVar=paramsBroadcastVar,
                                                weightsBroadcastVar=weightsBroadcastVar
                                              )
                                          ).glom().collect() 
        w_t = np.matrix(weights)
        w_t = np.mean(w_t,axis=0)
        w_t = w_t.tolist()[0]
        # Alternately, can have mapPartitions return a tuple of (indx,wt) and use reduceByKey
        # If using reduceByKey, your final wts are averaged as below
        #w_t = np.array([v for k, v in weights])
        #w_t = w_t/trainingDataRdd.getNumPartitions()
        #w_t  = w_t.tolist()
    return w_t

#### Call the driver function to train the FM model

In [10]:
w_t = fm_fit(
    trainingData,
    minTrainRating,
    maxTrainRating,
    {'lambda0':0.01,'lambda1':0.01,'lambda2':0.02,
         'k':10, 'n_iter':100, 'learning_rate':0.5}
)

Epoch number: 0
Epoch number: 1
Epoch number: 2
Epoch number: 3
Epoch number: 4
Epoch number: 5
Epoch number: 6
Epoch number: 7
Epoch number: 8
Epoch number: 9
Epoch number: 10
Epoch number: 11
Epoch number: 12
Epoch number: 13
Epoch number: 14
Epoch number: 15
Epoch number: 16
Epoch number: 17
Epoch number: 18
Epoch number: 19
Epoch number: 20
Epoch number: 21
Epoch number: 22
Epoch number: 23
Epoch number: 24
Epoch number: 25
Epoch number: 26
Epoch number: 27
Epoch number: 28
Epoch number: 29
Epoch number: 30
Epoch number: 31
Epoch number: 32
Epoch number: 33
Epoch number: 34
Epoch number: 35
Epoch number: 36
Epoch number: 37
Epoch number: 38
Epoch number: 39
Epoch number: 40
Epoch number: 41
Epoch number: 42
Epoch number: 43
Epoch number: 44
Epoch number: 45
Epoch number: 46
Epoch number: 47
Epoch number: 48
Epoch number: 49
Epoch number: 50
Epoch number: 51
Epoch number: 52
Epoch number: 53
Epoch number: 54
Epoch number: 55
Epoch number: 56
Epoch number: 57
Epoch number: 58
Epoch n

#### We will now implement the prediction function
#### First, create the predict_example function that will be executed on the Spark executors. This is the function that will be used in a map transformation on the testing data RDD and called from the driver . 

In [33]:
def predict_example(
    arr,
    minRatingBroadcastVar, 
    maxRatingBroadcastVar, 
    weightsBroadcastVar,
    nBroadcastVar,
    kBroadcastVar
    
):
    import numpy as np
    miny = minRatingBroadcastVar.value
    maxy = maxRatingBroadcastVar.value
    w_t  = weightsBroadcastVar.value
    n    = nBroadcastVar.value
    k    = kBroadcastVar.value
    
    x = np.matrix(arr[0:n], dtype=float)
    w0 = w_t[0]
    w  = np.reshape(np.array(w_t[1:n+1]),(1,n))
    v  = np.matrix(np.reshape(w_t[n+1:],(n,k)))
    y_hat = (w0 + 
            x*w.T + 
            0.5*(np.sum(np.square(x*v) - (np.square(x)*np.square(v)),axis=1))
           )

    y_hat = np.minimum(maxy, y_hat)
    y_hat = np.maximum(miny, y_hat)
    return [arr[n],y_hat[0,0]]

#### Now we will create the driver function called fm_predict

In [34]:
def fm_predict(
    testingDataRdd,
    w_t,
    k,
    minRating,
    maxRating
):
    import numpy as np
    from functools import partial
    
    n = len(testingDataRdd.take(1)[0])-1
    minRatingBroadcastVar = sc.broadcast(minRating)
    maxRatingBroadcastVar = sc.broadcast(maxRating)
    weightsBroadcastVar   = sc.broadcast(w_t)
    nBroadcastVar         = sc.broadcast(n)
    kBroadcastVar         = sc.broadcast(k)
    
    predictions = testingDataRdd.map(partial(
                                    predict_example,
                                    minRatingBroadcastVar=minRatingBroadcastVar,
                                    maxRatingBroadcastVar=maxRatingBroadcastVar,
                                    weightsBroadcastVar=weightsBroadcastVar,
                                    nBroadcastVar=nBroadcastVar,
                                    kBroadcastVar=kBroadcastVar
                                    )
                                ).collect()
    return predictions

#### Call the prediction driver function and compute MSE

In [35]:
predictions = fm_predict(
    testingData,
    w_t,
    10,
    minTrainRating,
    maxTrainRating
)

In [37]:
import numpy as np
predictions = np.matrix(predictions)
y_test = predictions[:,0]
y_hat = predictions[:,1]

from sklearn.metrics import mean_squared_error
mse = mean_squared_error(y_test,y_hat)
print ("Mean squared error = " + str(mse))

Mean squared error = 0.964102916613
