In [3]:
# Run this cell only once! 
from pyspark import SparkConf, SparkContext
conf = SparkConf() \
  .setAppName("MovieLensALS") \
  .set("spark.executor.memory", "2g")
spark_context = SparkContext(conf=conf)

In [3]:
# For testing only.
lines = spark_context.textFile("../README.md")  
words = lines.flatMap(lambda line: line.split())  
count = words.count()  
print("Word Count: " + str(count)) 

Word Count: 157


In [None]:
# Try recommender system
import pandas as pd
import numpy as np
import scipy.sparse as sp

# This is actually not necessary.
def preprocess_data(data):
    """preprocessing the text data, conversion to numerical array format."""
    def deal_line(line):
        pos = line[0]
        rating = line[1]
        row, col = pos.split("_")
        row = row.replace("r", "")
        col = col.replace("c", "")
        return int(row), int(col), float(rating)
    
    def statistics(data):
        min_row = np.min(data[:,0])
        max_row = np.max(data[:,0])
        min_col = np.min(data[:,1]) 
        max_col = np.max(data[:,1])
        return min_row, max_row, min_col, max_col

    # parse each line
    data_matrix = np.apply_along_axis(deal_line,axis=1,arr=data)

    min_row, max_row, min_col, max_col = statistics(data_matrix)
    ratings = sp.lil_matrix((int(max_row), int(max_col)))
    for row, col, rating in data_matrix:
        ratings[row - 1, col - 1] = rating
    return ratings, data_matrix

#data = pd.read_csv("../data/data_train.csv")
#__, data_new = preprocess_data(data.as_matrix().reshape((-1,2)))
#train_pd = pd.DataFrame({'user':data_new[:,0],'item':data_new[:,1],'rating':data_new[:,2]})

In [5]:
from helpers import load_data
import scipy.sparse as sp
print("load")
train_raw = load_data("../data/data_train.csv")
print("train raw shape:",train_raw.shape)
#df_panda = pd.DataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],["user", "item", "rating"])
#df_panda = pd.DataFrame([(0, 0, 1, 1, 2, 2),(0,1,1,2,1,2),(4.0,2.0,3.0,4.0,1.0,5.0)],["user", "item", "rating"])

load
number of items: 10000, number of users: 1000
train raw shape: (10000, 1000)


In [6]:
from pyspark.sql import SQLContext
import pandas as pd
print("find")
rows, cols, ratings = sp.find(train_raw) 
print("panda dataframe")
train_pd = pd.DataFrame({'item':rows+1,'user':cols+1,'rating':ratings})
print("sql dataframe")
context = SQLContext(spark_context)
train = context.createDataFrame(train_pd)
print("done")

find
panda dataframe
sql dataframe
done


In [None]:
train.show()

In [None]:
# Apply alternating least squares
from pyspark.ml.recommendation import ALS

# TODO: split in test and training data for cross validation.

#als = ALS(rank=30, maxIter=30,regParam=0.001,userCol="user",itemCol="item",ratingCol="rating")
als = ALS(rank=8, maxIter=50,regParam=0.065,userCol="user",itemCol="item",ratingCol="rating")
model = als.fit(train)
print('solved for rank',model.rank)
#print(model.userFactors.orderBy("id").collect())
#print(model.itemFactors.orderBy("id").collect())

In [None]:
from helpers import load_data
ratings = load_data('../data/sampleSubmission.csv')
rows, cols, __ = sp.find(ratings)
rows = rows + 1
cols = cols + 1
test_pd = pd.DataFrame({'item':rows,'user':cols})
test = context.createDataFrame(test_pd)
test.show()

In [None]:
test = context.createDataFrame([(1, 2), (1, 1), (2, 1)], ["user", "item"])
test.show()
x = [(int(y[0]),int(y[1])) for y in zip(rows,cols)]
print(x[0:3])
print(type(x[0][0]))
test2 = context.createDataFrame(x,["item", "user"])
test2.show()

In [None]:
#test['user':1].select()
#print(test2.filter(test2.user==1).collect())
print(test2.where(test2.user==1).where(test2.item==37).collect())
print(test2.where(test2.user==1).where(test2.item==7833).collect())
print(test2.where(test2.user==1).where(test2.item==3986).collect())
test2 = test2.sort('item','user')
test2.show()
test2.count()

In [None]:
predictions_df =  model.transform(test)
predictions = sorted(predictions_df.collect(), key=lambda r: r[0])
print(predictions[0])
print(predictions[1])
predictions_df.show()
predictions_df =  model.transform(test2)
predictions = sorted(predictions_df.collect(), key=lambda r: r[0])
print(predictions[0])
print(predictions[1])
predictions_df.show()

In [None]:
predictions_df = predictions_df.sort('item','user')
predictions_pd = predictions_df.toPandas()
predictions_pd["Id"] = "r" + predictions_pd["item"].map(str) + "_c" +predictions_pd["user"].map(str)
predictions_pd["Prediction"] = predictions_pd["prediction"].clip(0,5).round()
predictions_pd_new = predictions_pd.drop(["item","user","prediction"],1)
print(predictions_pd_new)

In [None]:
output_path = '../results/submission_pyspark.csv'
predictions_pd_new.to_csv(output_path,columns=["Id","Prediction"],index=False)

In [None]:
als.explainParams()

In [None]:
type(predictions[0])
print(model.userFactors.orderBy("id").collect())

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

#train = context.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
#                                 ["user", "item", "rating"])
#test = context.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

als = ALS()
model = als.fit(train)
param_map = ParamGridBuilder() \
                    .addGrid(als.rank, [25]) \
                    .addGrid(als.maxIter, [20]) \
                    .addGrid(als.regParam, [0.0001]) \
                    .build()
len(param_map)

In [None]:
print('testing with {} parameters:'.format(len(param_map)))
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating")
cross_validator = CrossValidator(estimator=als, estimatorParamMaps=param_map, evaluator=evaluator)
models = cross_validator.fit(train)
predicions =  models.bestModel.transform(test)

In [None]:
print('best model: \n rank:',models.bestModel.rank)
models.
#print('maxIter:',models.bestModel.maxIter)
#print('regParam: \n',models.bestModel.regParam)