In [1]:
import pyspark as ps
import json
import hashlib

from pyspark.sql import SQLContext

from pyspark.mllib.recommendation import ALS

import warnings
warnings.filterwarnings("ignore")



In [2]:
try:
    sc = ps.SparkContext('local[*]')
except:
    warnings.warn("SparkContext already exists in this scope")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/19 10:16:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load Dataset

In [3]:
fields = ['product_id',
           'user_id',
           'review',
           'profile_name',
           'helpfulness',
           'score',
           'time']

In [4]:
def validate(line):
    for field in fields:
        if field not in line :
            return False
    return True

In [5]:
reviews_raw = sc.textFile('./movies 1.json')
reviews = reviews_raw.map(lambda line: json.loads(line)).filter(validate)
reviews.cache()

PythonRDD[2] at RDD at PythonRDD.scala:53

In [6]:
reviews.take(1)

24/08/19 10:16:21 WARN BlockManager: Task 0 already completed, not releasing lock for rdd_2_0
                                                                                

[{'user_id': 'A141HP4LYPWMSR',
  'product_id': 'B003AI2VGA',
  'review': 'Synopsis: On the daily trek from Juarez, Mexico to El Paso, Texas an ever increasing number of female workers are found raped and murdered in the surrounding desert. Investigative reporter Karina Danes (Minnie Driver) arrives from Los Angeles to pursue the story and angers both the local police and the factory owners who employee the undocumented aliens with her pointed questions and relentless quest for the truth.<br /><br />Her story goes nationwide when a young girl named Mariela (Ana Claudia Talancon) survives a vicious attack and walks out of the desert crediting the Blessed Virgin for her rescue. Her story is further enhanced when the "Wounds of Christ" (stigmata) appear in her palms. She also claims to have received a message of hope for the Virgin Mary and soon a fanatical movement forms around her to fight against the evil that holds such a stranglehold on the area.<br /><br />Critique: Possessing a life

## Split Dataset

In [7]:
def get_hash(s):
    return int(hashlib.sha1(s).hexdigest(), 16) % (10 ** 8)

In [8]:
ratings = reviews.map(lambda entry: tuple([get_hash(entry['user_id'].encode('utf-8')), get_hash(entry['product_id'].encode('utf-8')), int(entry['score'])]))

train_data = ratings.filter(lambda entry: ((entry[0]+entry[1]) % 10) >= 2)
test_data = ratings.filter(lambda entry: ((entry[0]+entry[1]) % 10) < 2)
train_data.cache()

PythonRDD[4] at RDD at PythonRDD.scala:53

In [9]:
print(f'Num of train samples: {train_data.count()}')
print(f'Num of test samples: {test_data.count()}')

Num of train samples: 39992
Num of test samples: 10008


## Train ALS model

In [10]:
rank = 20
numIterations = 20
model = ALS.train(train_data, rank, numIterations)



In [11]:
def convertToFloat(lines):
    returnedLine = []
    for x in lines:
        returnedLine.append(float(x))
    return returnedLine

## Evaluate model

In [12]:
unknown = test_data.map(lambda entry: (int(entry[0]), int(entry[1])))
predictions = model.predictAll(unknown).map(lambda r: ((int(r[0]), int(r[1])), r[2]))
true_and_predictions = test_data.map(lambda r: ((int(r[0]), int(r[1])), r[2])).join(predictions)

MSE = true_and_predictions.map(lambda r: (int(r[1][0]) - int(r[1][1]))**2).reduce(lambda x,y: x+y)/true_and_predictions.count()

24/08/19 10:16:35 WARN BlockManager: Task 1074 already completed, not releasing lock for rdd_2_0
                                                                                

In [13]:
true_and_predictions.take(5)

[((65965270, 62577830), (5, 4.3545339147364865)),
 ((7383110, 62577830), (5, 0.32252550165554317)),
 ((22477285, 58302865), (4, 1.427840029213564)),
 ((5479805, 58302865), (5, 0.25600302544482556)),
 ((39998009, 30926631), (3, 8.502361683209527))]

In [14]:
print(MSE)

8.678799666574049
