# Cross validation of the ALS method using Spark
We refer to this tutorial: http://spark.apache.org/docs/latest/ml-tuning.html#example-model-selection-via-cross-validation

The tutorial was not clear: we used SKLearn to split the dataset

# Import

In [112]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import itertools
from pyspark.mllib.recommendation import ALS
import math

%matplotlib inline
%load_ext autoreload
%autoreload 2

pd.options.display.max_columns = 100


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [33]:
import random


In [34]:
from pyspark.sql.functions import col

cross validation imports

In [35]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [124]:
class KFoldIndexes:
    ''' Class to get indexes for cross validation
    
    Usage: 
    indexes=KFoldIndexes(4,10)
    indexes.indexes  # to access the indexes of the cross validation, it is a list of tuples (train,test)
    
    @ variables
        - indexes, a list of tuples (train,test), access to this variable to get the splits train-test.
        train and test are 2 list of the indexes of the elements of the train and of the test.
    '''
    
    def __init__(self,n_splits,rows):
        ''' Method to initialize the class
        @ params
            - n_splits, the number of folds in the cross validation
            - rows, the rows in the database to split
        '''
        test_elements=int(rows/n_splits) # How many elements in the tests partitions
        remaining_elements=rows%n_splits # If rows is not divisible by n_splits, some elements have to be reallocated
        elements_remaining_list=list(range(rows)) # List of all the elements that are not already been present in a test dataset
        
        self.indexes=[]
        
        # Compute all the (train,test) tuples
        # The lists train and test are correct but not sorted
        for i in range(n_splits):
            random.shuffle(elements_remaining_list)
            test=elements_remaining_list[:test_elements+int(remaining_elements>i)]
            train=elements_remaining_list[test_elements+int(remaining_elements>i):]
            elements_remaining_list=train.copy()

            for j in range(i):
                train+=self.indexes[j][1]            
            
            self.indexes.append((train,test))
            
        # Sort all the lists train and tests
        for i in self.indexes:
            i[0].sort()
            i[1].sort()
        

# Dataframe creation

In [42]:
train = pd.read_csv('../data/data_train.csv')


In [43]:
train['UserID'] = train['Id'].apply(lambda x: int(x.split('_')[0][1:]))
train['MovieID'] = train['Id'].apply(lambda x: int(x.split('_')[1][1:]))
train['Rating'] = train['Prediction']
train = train.drop(['Id', 'Prediction'], axis=1)

In [44]:
train.shape[0]

1176952

In [68]:
k_folds=4
k_fold_indexes=KFoldIndexes(k_folds,train.shape[0])

In [74]:
train.loc[[1,2,3]]

Unnamed: 0,UserID,MovieID,Rating
1,61,1,3
2,67,1,4
3,72,1,3


In [75]:
def get_tests_database(df,k_fold_indexes):
    tests=[]
    for i in k_fold_indexes.indexes:
        tests.append(df.loc[i[1]])
    return tests

In [93]:
def get_sql_from_pd(df_list):
    sql_list=[]
    for i in df_list:
        print(1)
        sql_list.append(sqlContext.createDataFrame(i))
    return sql_list

In [76]:
tests=get_tests_database(train,k_fold_indexes)

In [94]:
tests_sql=get_sql_from_pd(tests)

1
1
1
1


In [108]:
tr=tests_sql[0].rdd.union(tests_sql[1].rdd).union(tests_sql[2].rdd)

In [110]:
ts=tests_sql[3].rdd

In [113]:
model = ALS.train(tr, 2, 10, 0.1)

In [115]:
validation_for_predict_RDD = ts.map(lambda x: (x[0], x[1]))

In [117]:
predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))

In [119]:
rates_and_preds = ts.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)

In [120]:
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

In [121]:
error

0.9971935506177453

In [95]:
tests_sql

[DataFrame[UserID: bigint, MovieID: bigint, Rating: bigint],
 DataFrame[UserID: bigint, MovieID: bigint, Rating: bigint],
 DataFrame[UserID: bigint, MovieID: bigint, Rating: bigint],
 DataFrame[UserID: bigint, MovieID: bigint, Rating: bigint]]

In [8]:
train_sql = sqlContext.createDataFrame(train)
train_rdd = train_sql.rdd
train_rdd.take(1)

[Row(UserID=44, MovieID=1, Rating=4)]

In [43]:
type(train_sql.map(lambda x: (x[0],x[1],float(x[2]))))

pyspark.rdd.PipelinedRDD