# Task 1

In [1]:
sc
import re

Load the file `movie_titles_canonical.txt`

In [2]:
raw_mtc = sc.textFile("movie_titles_canonical.txt")

In [3]:
raw_nmt = sc.textFile("netflix_movie_titles.txt")

Set the file contents to lower case and split using comma. The movie title is then manipulated using the regex functions to all special characters including space is removed, while some common words are also removed. The year is added as a concatenation to the title of the movie as this created a better unique identifier for the movie. 

All the original data is kept as the value so that we can reference to it later on.

In [4]:
data_mtc = (raw_mtc
              .map(lambda x: x.split(","))
              .map(lambda x: (re.sub('\W+','',
                                     re.sub('the |of |is |a ', '', x[0].lower())) + x[1], [x[0], x[1]])))

In [5]:
data_nmt = (raw_nmt
              .map(lambda x: x.split(","))
              .map(lambda x: (re.sub('\W+','', 
                                     re.sub('the |of |is |a ', '', x[2].lower())) + x[1], [x[0], x[1], x[2]])))

The following is a representation of the transformed RDD.

In [6]:
data_mtc.take(5)

[('avatar2009', ['Avatar', '2009']),
 ('amélie2001', ['Amélie', '2001']),
 ('fullmetaljacket1987', ['Full Metal Jacket', '1987']),
 ('etextraterrestrial1982', ['E.T.: The Extra-Terrestrial', '1982']),
 ('independenceday1996', ['Independence Day', '1996'])]

In [7]:
data_nmt.take(5)

[('dinosaurplanet2003', ['1', '2003', 'Dinosaur Planet']),
 ('islemantt2004review2004', ['2', '2004', 'Isle of Man TT 2004 Review']),
 ('character1997', ['3', '1997', 'Character']),
 ('paulabdulsgetupdance1994', ['4', '1994', "Paula Abdul's Get Up & Dance"]),
 ('riseandfallecw2004', ['5', '2004', 'The Rise and Fall of ECW'])]

The two RDDs can be joined on the key that should be unique and is close to to the second file, so that more movies can be matched. Infact the reason the two files where manipulated in the same way so that the final key is unique and can be joined as it would be the same in both files.

In [8]:
data_joined_mtc_nmt = data_mtc.join(data_nmt)

In [9]:
data_joined_mtc_nmt.count()

3728

The final joined RDD is then mapped to get the final desired output which is `ID => TITLE`

In [10]:
data_final_mtc_nmt = data_joined_mtc_nmt.map(lambda x: (int(x[1][1][0]), x[1][0][0]))

Finally the RDD is transformed to a dictionary and broadcasted to all spark clusters.

In [11]:
dic_final_mtc_nmt = data_final_mtc_nmt.collectAsMap()

In [12]:
broadcast_mov = sc.broadcast(dic_final_mtc_nmt)

# Task 2

In [13]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [14]:
raw_mas = sc.textFile("mv_all_simple.txt");

In [15]:
raw_mas_1, raw_mas_2 = raw_mas.randomSplit(weights=[0.8, 0.2], seed=1)

In [None]:
ratings_mas = (raw_mas_1.map(lambda x: x.split(','))
    .map(lambda l: Rating(int(l[1]), int(l[0]), int(l[2]))))

In [None]:
rank = 10
numIterations = 5
regularization_parameter = 0.01
model = ALS.train(ratings_mas, rank, numIterations, lambda_=regularization_parameter)

In [None]:
testdata = ratings_mas.map(lambda p: (p[0], p[1]))

In [None]:
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))

## Task 3

In [None]:
user_preds = predictions.filter(lambda x: x[0][0] == 30878)

In [None]:
user_recommendations = user_preds.map(lambda x: (x[0][1], x[1]))

In [None]:
user_recommendations_movie_titles = user_recommendations.join(data_final_mtc_nmt)

In [None]:
user_top_recommendations_movie_titles = user_recommendations_movie_titles.map(lambda x: (x[0], x[1][1], x[1][0])).takeOrdered(10, key=lambda x: -x[2])

In [None]:
print ('TOP recommended movies:\n%s' %
        '\n'.join(map(str, user_top_recommendations_movie_titles)))

In [None]:
user_actual = ratings_mas.filter(lambda x: x[0] == 30878)

In [None]:
user_ratings = user_actual.map(lambda x: (x[1], x[2]))

In [None]:
user_ratings_movie_titles = user_ratings.join(data_final_mtc_nmt)

In [None]:
user_ratings_movie_titles = user_ratings_movie_titles.map(lambda x: (x[0], x[1][1], x[1][0])).takeOrdered(10, key=lambda x: -x[2])

In [None]:
print ('TOP movies:\n%s' %
        '\n'.join(map(str, user_ratings_movie_titles)))

In [None]:
def toCSVLine(data):
  return ','.join(str(d) for d in data)

In [None]:
user_rec_mov_rdd = sc.parallelize(user_top_recommendations_movie_titles)
user_rec_mov_csv = user_rec_mov_rdd.map(toCSVLine)
user_rec_mov_csv.saveAsTextFile('user_rec_mov')

In [None]:
user_rat_mov_rdd = sc.parallelize(user_ratings_movie_titles)
user_rat_mov_csv = user_rat_mov_rdd.map(toCSVLine)
user_rat_mov_csv.saveAsTextFile('user_rat_mov')

## Task 4

In [16]:
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics

In [17]:
ratings_mas_2 = (raw_mas_2.map(lambda x: x.split(','))
    .map(lambda l: Rating(int(l[1]), int(l[0]), float(l[2]) - 2.5)))

In [18]:
rank = 10
numIterations = 10
regularization_parameter = 0.01
model_2 = ALS.train(ratings_mas_2, rank, numIterations, lambda_=regularization_parameter)

In [None]:
testdata_2 = ratings_mas_2.map(lambda p: (p.user, p.product))

In [None]:
predictions_2  = model_2.predictAll(testdata_2).map(lambda r: ((r.user, r.product), r.rating))

In [None]:
ratings_tuple = ratings_mas_2.map(lambda r: ((r.user, r.product), r.rating))

In [None]:
score_labels = predictions_2.join(ratings_tuple).map(lambda tup: tup[1])

In [None]:
metrics = RegressionMetrics(score_labels)

In [None]:
print("RMSE = %s" % metrics.rootMeanSquaredError)

## Task 5

In [19]:
raw_qs = sc.textFile("qualifying_simple.txt")

In [20]:
data_qs = raw_qs.map(lambda x: x.split(',')).map(lambda x: (x[1], x[0]))

In [21]:
data_qs.take(5)

[('1046323', '1'),
 ('1080030', '1'),
 ('1830096', '1'),
 ('368059', '1'),
 ('802003', '1')]

In [22]:
full_predictions  = model_2.predictAll(data_qs).map(lambda r: ((r.user, r.product), r.rating))

In [None]:
data_qs_csv = data_qs.map(toCSVLine)
data_qs_csv.saveAsTextFile('data_qs')

## Task6

In [23]:
from neo4j.v1 import GraphDatabase, basic_auth
import pandas as pd

In [24]:
sample_predictions = full_predictions.sample(False, 0.05, 1)

In [25]:
sample_predictions = sample_predictions.map(lambda x: (x[0][1], (x[0][0], x[1])))

In [26]:
joined_predictions = sample_predictions.join(data_final_mtc_nmt)

In [27]:
joined_predictions = joined_predictions.map(lambda x: (str(x[0]) + str(x[1][0][0]), (x[0], x[1][1], x[1][0][0], x[1][0][1])))

In [28]:
dict_joined_predictions = joined_predictions.collectAsMap()

In [29]:
len(dict_joined_predictions)

94316

In [None]:
driver = GraphDatabase.driver("bolt://localhost:7687", auth=basic_auth("neo4j", "1234"))

In [None]:
movieFound = False
count = 0

session = driver.session()

for key in dict_joined_predictions:
    raw_data = dict_joined_predictions[key]
    
    session.run('MATCH (m:Movie { title: {movie_title}}) MERGE (u:User { id: {user_id}}) MERGE (u)-[r:RATED{stars:{rating}}]->(m)', {'user_id': str(raw_data[2]), 'movie_title' : raw_data[1], 'rating' : raw_data[3]})
    
    count = count + 1
    if count % 500 == 0:
        print(count)
    
session.close()
         

500
1000
1500
2000
2500
3000
3500
4000
4500
5000
5500
6000
6500
7000
7500
8000
8500
9000
9500
10000
10500
11000
11500
12000
12500
13000
13500
14000
14500
15000
15500
16000
16500
17000
17500
18000
18500
19000
19500
20000
20500
21000
21500
22000
22500
23000
23500
24000
24500
25000
25500
26000
26500
27000
27500
28000
28500
29000
29500
30000
30500
31000
31500
32000
32500
33000
33500
34000
34500
35000
35500
36000
36500
37000
37500
38000
38500
39000
39500
40000
40500
41000
41500
42000
42500
43000
43500
44000
44500
45000
45500
46000
46500
47000
47500
48000
48500
49000
49500
50000
50500
51000
51500
52000
52500
53000
53500
54000
54500
55000
55500
56000
56500
57000
57500
58000
58500
59000
59500
60000
60500
61000
61500
62000
62500
63000
63500
64000
64500
65000
65500
66000
66500
67000
67500
68000
68500
69000
69500
70000
70500
71000
71500
72000
72500
73000
73500
74000
74500
75000
75500
76000
76500
77000
77500
78000
78500
79000
79500
80000
80500
81000
81500
82000
82500
83000
83500
84000
84500
85000


## TASK 7

MATCH (u:User)-[r:RATED]->(m:Movie)<-[:DIRECTED]-(d:Director)
WHERE size((u)-[:RATED]->()<-[:DIRECTED]-(d)) > 1 AND d.name = "Alfred Hitchcock" AND r.stars > 4 
RETURN u, m

MATCH (a:Actor)-[:ACTS_IN]->(m:Movie)<-[:ACTS_IN]-(ca:Actor)
WITH count(m) as count_movies, a, ca
WHERE count_movies > 3 AND a.name = "Emma Watson"
RETURN a.name, ca.name, count_movies
LIMIT 25

MATCH (actor:Actor)-[:ACTS_IN]->(movie:Movie)<-[:ACTS_IN]-(coActor:Actor)
MATCH (otherActor:Actor)-[:ACTS_IN]->(otherMovie:Movie)<-[:ACTS_IN]-(coActor)
MATCH (user:User)-[rated:RATED]->(otherMovie)
WHERE actor.name = "Tom Hanks" AND rated.stars > 3 AND NOT (actor)-[:ACTS_IN]->(otherMovie) AND NOT (user)-[:RATED]->(movie)
RETURN DISTINCT movie.title, otherMovie.title, coActor.name