In [1]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import *
import pathlib
import numpy as np
import warnings
import math
warnings.filterwarnings('ignore')

In [2]:
#sc = SparkSession.builder.appName('als-recommender').getOrCreate()

In [4]:
sc = SparkSession \
    .builder \
    .appName("movie recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.master", "local[16]") \
    .config("spark.local.dir","/home/bh_parijat/spark_temp/")\
    .getOrCreate()
# get spark context
#sc = spark.sparkContext

In [5]:
ratings_rdd  = sc.sparkContext.textFile('ml-1m/ratings.dat') \
                .map(lambda line: line.split("::")) \
                .map(lambda x: (int(x[0]),int(x[1]),int(x[2])))\
                .cache()

In [6]:
ratings_rdd.take(2)



[(1, 1193, 5), (1, 661, 3)]

In [7]:
#ratings_rdd = ratings_rdd.sample(False,0.6)

In [8]:
schema = StructType([StructField('userId',LongType(),True),StructField('movieId',LongType(),True),StructField('rating',LongType(),True)])

In [9]:
ratings = sc.createDataFrame(ratings_rdd,schema=schema)

In [10]:
ratings.take(3)

[Row(userId=1, movieId=1193, rating=5),
 Row(userId=1, movieId=661, rating=3),
 Row(userId=1, movieId=914, rating=3)]

## Spilt data for training, validation and testing

In [11]:
train_data, validation_data, test_data = ratings.randomSplit([0.6,0.2,0.2],seed=90)

In [12]:

def model_train(train_data, validation_data, num_iters, ranks, regularizations):
    
    min_error = np.float('inf')
    
    best_rank = None
    
    best_model = None
    
    best_regularization = None
    
    for rank in ranks:
        
        for param in regularizations:
            
            model = ALS(rank=rank,maxIter=num_iters,regParam=param,userCol='userId',itemCol='movieId',nonnegative = True).fit(train_data)
            
            train =  train_data.select(['userId','movieId'])
            
            valid = validation_data.select(['userId','movieId'])
            
            
            
            predictions = model.transform(valid).fillna(0)
            
            
            cond = [predictions.userId == validation_data.userId, predictions.movieId == validation_data.movieId]
            temp = validation_data.join(predictions,cond).select(validation_data.rating, predictions.prediction)
            
            #print(temp.take(2))
            error = math.sqrt(temp.rdd.map(lambda x: (x[-1]-x[-2])**2).mean())
            
            if error<min_error:
                min_error = error
                best_rank = rank
                best_regularization = param
                best_model = model
            
            #avg_error = error.select("select avg()")
            #print(error.take(2),error.mean(),error.sum(),type(error),error.count())
            #print(predictions.take(2))
            
    return best_model,best_rank,best_regularization

In [13]:
model,rank,regular_param = model_train(train_data, validation_data, 10,range(8,22,2),[0.1,1,2,4])

## Best Hyperparameters

In [15]:
print("rank is {} and regularization hyper-parameter is".format(rank,regular_param))

rank is 18 and regularization hyper-parameter is


# Test trained model

NameError: name 'h' is not defined