In [1]:
import csv
from pyspark.sql.functions import min, max, col, when, lit, udf
from pyspark.sql.types import StructType,StructField, StringType
from pyspark.sql import Row
import pandas as pd
pd.set_option('max_columns', None)

spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)


# load the data
movies_file = sc.textFile("dataset/movies.csv")
genres_file = sc.textFile("dataset/genres.csv")
actors_file = sc.textFile("dataset/actors.csv")
tagNames_file = sc.textFile("dataset/tag_names.csv")
tags_file = sc.textFile("dataset/tags.csv")

# we separate the fields for each table in csv format
data_movies = movies_file.map(lambda row: next(csv.reader(row.splitlines(), skipinitialspace=True)))
data_genres = genres_file.map(lambda row: next(csv.reader(row.splitlines(), skipinitialspace=True)))
data_actors = actors_file.map(lambda row: next(csv.reader(row.splitlines(), skipinitialspace=True)))
data_tagNames = tagNames_file.map(lambda row: next(csv.reader(row.splitlines(), skipinitialspace=True)))
data_tags = tags_file.map(lambda row: next(csv.reader(row.splitlines(), skipinitialspace=True)))

# we create the dataframe for each data generated
table_movies = spark.createDataFrame(data_movies, ['mid', 'title','year','rating','num_ratings'])
table_genres = spark.createDataFrame(data_genres, ['mid', 'genre'])
table_actors = spark.createDataFrame(data_actors, ['mid', 'name', 'cast_position'])
table_tagNames = spark.createDataFrame(data_tagNames, ['tid', 'tag'])
table_tags = spark.createDataFrame(data_tags, ['mid', 'tid'])

# create an alias for each table
movies = table_movies.alias('movies')
genres = table_genres.alias('genres')
actors = table_actors.alias('actors')
tagNames = table_tagNames.alias('tagNames')
tags = table_tags.alias('tags')

# we mantain the tables in cache

genres.persist()
tagNames.persist()
tags.persist()
movies.persist()
actors.persist()
movies.persist()

mid,title,year,rating,num_ratings
1,Toy story,1995,3.7,102338
2,Jumanji,1995,3.2,44587
3,Grumpy Old Men,1993,3.2,10489
4,Waiting to Exhale,1995,3.3,5666
5,Father of the Bri...,1995,3.0,13761
6,Heat,1995,3.9,42785
7,Sabrina,1954,3.8,12812
8,Tom and Huck,1995,2.7,2649
9,Sudden Death,1995,2.6,3626
10,GoldenEye,1995,3.4,28260


### 1. Print all movie titles starring ‘Daniel Craig’, sorted in an ascending alphabetical order.

In [33]:
movies_with_actors = movies.join(actors, movies.mid == actors.mid)
movies_title_actors = movies_with_actors.select('title','name')
movies_with_Craig = movies_title_actors.filter(movies_title_actors.name == 'Daniel Craig').sort(movies_title_actors.title.asc()).select('title')
movies_with_Craig

title
A Kid in King Art...
Archangel
Casino Royale
Casino Royale
Elizabeth
Enduring Love
Infamous
Lara Croft: Tomb ...
Layer Cake
Munich


In [34]:
movies_with_actors = movies.join(actors, movies.mid == actors.mid)
reduced_movies = movies_with_actors.rdd.map(lambda x: (x.name,x.title)).filter(lambda x: (x[0] == 'Daniel Craig'))
movies_with_Craig = reduced_movies.toDF().withColumnRenamed('_1','name').withColumnRenamed('_2','title').select('title')
movies_with_Craig.sort(movies_with_Craig.title.asc())

title
A Kid in King Art...
Archangel
Casino Royale
Casino Royale
Elizabeth
Enduring Love
Infamous
Lara Croft: Tomb ...
Layer Cake
Munich


### 2. Print names of the cast of the movie ‘The Dark Knight’ in an ascending alphabetical order.

In [32]:
movies_with_actors = movies.join(actors, movies.mid == actors.mid)
reduced_movies = movies_with_actors.rdd.map(lambda x: (x.name,x.title)).filter(lambda x: (x[1] == 'The Dark Knight'))
cast_TheDarkKnight = reduced_movies.toDF().withColumnRenamed('_1','name').withColumnRenamed('_2','title').select('name')
cast_TheDarkKnight.sort(cast_TheDarkKnight.name.asc()).toPandas()

Unnamed: 0,name
0,Aaron Eckhart
1,Adam Kalesperis
2,Aidan Feore
3,Andrew Bicknell
4,Andy Luther
...,...
86,Walter Lewis
87,Will Zahrn
88,William Armstrong
89,William Fichtner


In [110]:
movies_with_actors = movies.join(actors, movies.mid == actors.mid)
movies_title_actors = movies_with_actors.select('title','name')
cast_TheDarkKnight = movies_title_actors.filter(movies_title_actors.title == 'The Dark Knight').sort(movies_title_actors.name.asc())
only_cast = cast_TheDarkKnight.select('name')
only_cast

name
Aaron Eckhart
Adam Kalesperis
Aidan Feore
Andrew Bicknell
Andy Luther
Anthony Michael Hall
Ariyon Bakare
Beatrice Rosen
Bill Smille
Brandon Lambdin


### 3. Print the distinct genres in the database and their corresponding number of movies N where N is greater than 1000, sorted in the ascending order of N

In [2]:
reduced_genres = genres.rdd.map(lambda x: (x.genre,1)).reduceByKey(lambda a,b: a+b).filter(lambda x: (int(x[1]) > 1000))
reduced_genresdf = reduced_genres.toDF(['genre', 'countMovies']).sort('countMovies')
reduced_genresdf

Unnamed: 0,genre,countMovies
0,Adventure,1003
1,Crime,1086
2,Action,1445
3,Romance,1644
4,Thriller,1664
5,Comedy,3566
6,Drama,5076


### 4. For each year, print the movie title, year, and rating, sorted in the ascending order of year and the descending order of movie rating

In [5]:
movies_sorted= movies.sort(movies.year.asc(), movies.rating.desc())
movies_sortedFiltered = movies_sorted.select('title', 'year', 'rating')
movies_sortedFiltered

title,year,rating
The Great Train R...,1903,0.0
The Birth of a Na...,1915,3.3
Intolerance: Love...,1916,3.8
The Immigrant,1917,0.0
Otets Sergiy,1917,0.0
A Dog's Life,1918,0.0
Broken Blossoms o...,1919,3.7
"Die Spinnen, 1. T...",1919,0.0
Male and Female,1919,0.0
Das Cabinet des D...,1920,4.1


### 5. Critiques say that some words used in tags to convey emotions are very recurrent. To convey positive and negative emotions, the words ‘good’ and ‘bad’, respectively, are used predominantly in tags. Print all movie titles whose audience opinion is split (i.e., has at least one audience who expresses positive emotion and at least one who expresses negative emotion).

In [40]:
movies_with_tags = movies.join(tags, ['mid']).join(tagNames, ['tid'])
movies_with_tags_onlyGood = movies_with_tags.rdd.map(lambda x: (x.mid,x.tag)).filter(lambda x: ('good' in x[1])).map(lambda x: (x[0])) #249
movies_with_tags_onlyBad = movies_with_tags.rdd.map(lambda x: (x.mid,x.tag)).filter(lambda x: ('bad' in x[1])).map(lambda x: (x[0])) #93
movies_with_tags_onlyGoodBad = movies_with_tags_onlyGood.intersection(movies_with_tags_onlyBad) #14

In [39]:
movies_with_tags = movies.join(tags, ['mid']).join(tagNames, ['tid'])
movies_with_tags_onlyGood = movies_with_tags.filter(movies_with_tags.tag.contains( 'good')) #249
movies_with_tags_onlyBad = movies_with_tags.filter(movies_with_tags.tag.contains( 'bad')) #93
movies_with_tags_onlyGoodBad = (movies_with_tags_onlyGood.select('mid','title')).intersect(movies_with_tags_onlyBad.select('mid','title')) #14
result = movies_with_tags_onlyGoodBad.select('title')
result.toPandas()

Unnamed: 0,title
0,Twilight
1,The Forgotten
2,Starship Troopers
3,Bridget Jones's Diary
4,Howard the Duck
5,Hercules in New York
6,C.H.U.D.
7,The Wicker Man
8,Ocean's Eleven
9,Return of the Killer Tomatoes!


### 6. One would expect that the movie with the highest number of user ratings is either the highest rated movie or perhaps the lowest rated movie. Let’s find out if this is the case here:

#### 6.1 : Print all information (mid, title, year, num ratings, rating) for the movie(s) with the highest number of ratings

In [25]:
num_ratings = movies.filter(movies.num_ratings != '\\N').withColumn('num_ratings', col('num_ratings').cast('int'))
max_num_rating = num_ratings.select(max('num_ratings'))
movies_with_max_num_ratings = movies.filter(movies.num_ratings == max_num_rating.collect()[0][0])
movies_with_max_num_ratings

Unnamed: 0,mid,title,year,rating,num_ratings
0,4201,Pirates of the Caribbean: At World's End,2007,3.8,1768593
1,53125,Pirates of the Caribbean: At World's End,2007,3.8,1768593


#### 6.2: Print all information (mid, title, year, num ratings, rating) for the movie(s) with the highest rating (include tuples that tie), sorted by the ascending order of movie id.

In [26]:
num_ratings = movies.filter(movies.rating != '\\N').withColumn('rating', col('rating').cast('double'))
max_rating = num_ratings.select(max('rating'))
movies_with_max_ratings = movies.filter(movies.rating == max_rating.collect()[0][0])
movies_with_max_ratings

mid,title,year,rating,num_ratings
4311,1732 Høtten,1998,5,5


#### 6.3: Is (Are) the movie(s) with the most number of user ratings among these highest rated movies? Print the output of the query that will check our conjecture (i.e., your query will print the movie(s) that has (have) the highest number of ratings as well as the highest rating)

In [27]:
result = movies_with_max_num_ratings.intersect(movies_with_max_ratings)
result

mid,title,year,rating,num_ratings


#### 6.4: Print all information (mid, title, year, num ratings, rating) for the movie(s) with the lowest rating (include tuples that tie), sorted by the ascending order of movie id.

In [30]:
num_ratings = movies.filter(movies.rating != '\\N').withColumn('rating', col('rating').cast('double')).withColumn('num_ratings', col('num_ratings').cast('int'))
new_num_ratings = num_ratings.filter(num_ratings.num_ratings > 0)
min_rating = new_num_ratings.select(min('rating'))
movies_with_min_ratings = movies.filter(movies.rating == min_rating.collect()[0][0])
movies_with_min_ratings

Unnamed: 0,mid,title,year,rating,num_ratings
0,4230,Too Much Sleep,1997,1.5,3


#### 6.5: Is (Are) the movie(s) with the most number of user ratings among these lowest rated movies? Print the output of the query that will check our conjecture (i.e., your query will print the movie(s) that has (have) the highest number of ratings as well as the lowest rating)

In [31]:
result = movies_with_max_num_ratings.intersect(movies_with_min_ratings)

mid,title,year,rating,num_ratings


#### 6.6: In conclusion, is our hypothesis or conjecture true for the MovieLens database?

No, the results of query 6.3 and 6.5 show that both the intersection of max_num_ratings and max_rating/min_rating are empty! 

### 7. Print the movie title, year, and rating of the lowest and highest movies for each year in 2005 – 2011, inclusive, in the ascending order of year. In case of a tie, print the records in the ascending order of title.

In [17]:
movies_2005_2011 = movies.filter((movies.year >= 2005) & (movies.year <= 2011))
movies_list = []

for year in range(2005,2012):
    ratings = movies_2005_2011.filter((movies_2005_2011.year == year) & (movies_2005_2011.num_ratings != '\\N') & (movies_2005_2011.num_ratings > 0)).withColumn('rating', col('rating').cast('double'))
    min_rating = ratings.select(min('rating'))
    max_rating = ratings.select(max('rating'))
    result_min = ratings.filter(ratings.rating == min_rating.collect()[0][0]).sort(ratings.title)
    result_max = ratings.filter(ratings.rating == max_rating.collect()[0][0]).sort(ratings.title)
    final_result = result_min.union(result_max).select('title','year', 'rating')
    movies_list.extend(final_result.collect())

rdd = sc.parallelize(movies_list)
result = rdd.toDF()
result.toPandas()

Unnamed: 0,title,year,rating
0,Alone in the Dark,2005,2.1
1,Alone in the Dark,2005,2.1
2,Alone in the Dark,2005,2.1
3,Son of the Mask,2005,2.1
4,No Direction Home: Bob Dylan,2005,4.3
5,Basic Instinct 2,2006,2.5
6,Basic Instinct 2,2006,2.5
7,Bug,2006,2.5
8,Bug,2006,2.5
9,Doogal,2006,2.5


### 8. Let us find out who are the ‘no flop’ actors. A ‘no flop’ actor can be defined as one who has played only in movies which have a rating greater than or equal to 4. We split this problem into the following steps:

##### 8.1 Create a view called high ratings which contains the distinct names of all actors who have played in movies with a rating greater than or equal to 4. Similarly, create a view called low ratings which contains the distinct names of all actors who have played in movies with a rating less than 4. Print the number of rows in each view.

In [20]:
movies_with_actors = movies.join(actors,['mid']).withColumn('rating', col('rating').cast('double'))
high_ratings = movies_with_actors.filter(movies_with_actors.rating >= 4.0).select('name').distinct()
low_ratings = movies_with_actors.filter(movies_with_actors.rating < 4.0).select('name').distinct()
high_ratings.count() #13710
low_ratings.count() #87032

87032

##### 8.2 Use the above views to print the number of ‘no flop’ actors in the database.

In [21]:
result = high_ratings.subtract(low_ratings)
result.count() #7015

7015

###### 8.3 For each ‘no flop’ actor, print the name of the actor and the number of movies N that he/she played in, sorted in descending order of N. Finally, print the top10 only.

In [24]:
noFlopactors_with_mid = actors.join(result,['name'])

reduced_actors = noFlopactors_with_mid.rdd.map(lambda x: (x.name,1)).reduceByKey(lambda a,b: a+b)
reduced_actors_df = reduced_actors.toDF(['name', 'countMovies'])
final_result = reduced_actors_df.withColumn('countMovies', col('countMovies').cast('int')).sort(reduced_actors_df.countMovies.desc())
final_result

name,countMovies
Nikolai Grinko,8
John Cazale,7
Paul Frankeur,7
Tsutomu Yamazaki,6
Kuniko Miyake,6
Gunnel Lindblom,6
Allan Garcia,6
Anatoli Solonitsin,5
Timothy T. Mitchum,5
Megan Gallagher,5


### 9. Let us find out who is the actor with the highest ‘longevity.’ Print the name of the actor/actress who has been playing in movies for the longest period of time (i.e., the time interval between their first movie and their last movie is the greatest).

In [32]:
movies_with_actors = movies.join(actors,['mid']).withColumn('year', col('year').cast('int'))
partial_result = movies_with_actors.select('name','year').distinct()
result_max = partial_result.groupBy('name').max('year').withColumnRenamed('max(year)', 'recentYear')
result_min = partial_result.groupBy('name').min('year').withColumnRenamed('min(year)', 'firstYear')
result_join = result_min.join(result_max,['name'])
result = result_join.select('name',result_join.recentYear-result_join.firstYear).withColumnRenamed('(recentYear - firstYear)', 'difference')
max_difference = result.select(max('difference'))
final_result = result.filter(result.difference == max_difference.collect()[0][0])
final_result

name,difference
Morgan Jones,102


### 10. Let us find the close friends of Annette Nicole. Print the names of all actors who have starred in (at least) all movies in which Annette Nicole has starred in. Note that it is OK if these actors have starred in more movies than Annette Nicole has played in.

###### 10.1 First, create a view called co_actors, which returns the distinct names of actors who played in at least one movie with Annette Nicole. Print the number of rows in this view.

In [2]:
movies_with_actors = movies.join(actors,['mid'])
movies_with_Annette = movies_with_actors.filter(movies_with_actors.name == 'Annette Nicole')
movies_with_Annette_mid = movies_with_Annette.select('mid')
movies_with_cast_and_Annette = actors.join(movies_with_Annette_mid,['mid'])
co_actors = movies_with_cast_and_Annette.select('name').distinct()
co_actors.count() #179

179

###### 10.2 Second, create a view called all_combinations which returns all possible combinations of co_actors and the movie ids in which Annette Nicole played. Print the number of rows in this view. Note how that this view contains fake (co_actor, mid) combinations!

In [3]:
partial_all_combinations = movies_with_Annette.withColumnRenamed('name', 'nameAnnette').crossJoin(co_actors)
all_combinations = partial_all_combinations.select('mid','name')
all_combinations.count() #537

537

###### 10.3 Third, create a view called non_existent from the view all_combinations by removing all legitimate (co_actor,mid) pairs (i.e., pairs that exist in the actors table). Print the number of rows in this view

In [4]:
actors_real = actors.select('mid', 'name')
non_existent = all_combinations.subtract(actors_real)
non_existent.count() #239

239

###### 10.4 Finally, from the view co_actors, eliminate the distinct actors that appear in the view non_extistent. Print the names of all co_actors except Annette Nicole.

In [5]:
final = co_actors.subtract(non_existent.select('name'))
final.filter(final.name != 'Annette Nicole')

name
Kristen Connolly
Christian Perry


### 11. Let us find out who is the most social actor. A social actor is the one with the highest number of distinct co-actors. We will break this into two sub-tasks:

###### 11.1 For the actor Tom Cruise, print his name and the number of distinct co-actors.

In [2]:
movies_with_Tom = actors.filter(actors.name == 'Tom Cruise').select('mid','name') 
movies_with_coActors = movies_with_Tom.join(actors.withColumnRenamed('name', 'nameCoActors'),['mid'])
final_coActors_Tom = movies_with_coActors.filter(movies_with_coActors.nameCoActors != 'Tom Cruise').select('name', 'nameCoActors').distinct().groupBy('name').count().withColumnRenamed('count', 'countCoActors')
final_coActors_Tom

name,countCoActors
Tom Cruise,1238


###### 11.2 For each actor, compute the number of distinct co-actors. For the highest such number, print the name of the actor and the number of distinct co-actors. In case of a tie, print the records sorted in alphabetical order by name.

In [8]:
################################## INEFFICIENT SOLUTION ########################################################
most_social_actor = []
maxCoActors = 0
actors_list = actors.select('name').distinct().collect()

for actor in actors_list:
    movies_with_specific_actor = actors.filter(actors.name == actor[0]).select('mid','name') 
    movies_with_coActors = movies_with_specific_actor.join(actors.withColumnRenamed('name', 'nameCoActors'),['mid'])
    final_coActors_specific_actor = movies_with_coActors.filter(movies_with_coActors.nameCoActors != actor[0]).select('name', 'nameCoActors').distinct().groupBy('name').count().withColumnRenamed('count', 'countCoActors')
    most_social_actor.append((actor[0], final_coActors_specific_actor.collect()[0][1]))
    
rdd = sc.parallelize(most_social_actor)
result = rdd.toDF()
max_coActors = result.select(max('_2'))

final_result = result.filter(result._2 == max_coActors.collect()[0][0]) 
final_result

_1,_2
Oliver Platt,831


In [95]:
reduced_actors = actors.rdd.map(lambda x: (x.mid,1)).reduceByKey(lambda a,b: a+b)
result_countActors = reduced_actors.toDF().withColumnRenamed('_1','mid').withColumnRenamed('_2','countActors')
movies_with_countActors = result_countActors.join(actors,['mid']).select('mid','countActors','name')

movies_with_countActors_reduced = movies_with_countActors.rdd.map(lambda x: (x.name,x.countActors-1)).reduceByKey(lambda a,b: a+b)
result_countActors = movies_with_countActors_reduced.toDF().withColumnRenamed('_1','name').withColumnRenamed('_2','countCoActors')

# search for duplicates, we tried use mapReduce but it was to slow. Surprisingly groupBy and cout() was faster than mapReduce.
couples_coActors = actors.join(actors.withColumnRenamed('name','coActor'),['mid']).groupBy('name','coActor').count().withColumnRenamed('count', 'countCoActors')
result_partial_couples = couples_coActors.filter(couples_coActors.name != couples_coActors.coActor).select('name', 'coActor',couples_coActors.countCoActors-1).withColumnRenamed('(countCoActors - 1)', 'duplicates').withColumn('duplicates', col('duplicates').cast('int'))
result_couples = result_partial_couples.groupBy('name').sum('duplicates').withColumnRenamed('sum(duplicates)', 'duplicates')

r= result_countActors.join(result_couples,['name'])
final_r = r.select('name', r.countCoActors - r.duplicates).withColumnRenamed('(countCoActors - duplicates)','countCoActors')
max_coActors = final_r.select(max('countCoActors'))
#final_r.filter(final_r.name == 'Tom Cruise') --> 1238 it's correct!
final_result = final_r.filter(final_r.countCoActors == max_coActors.collect()[0][0]) 
final_result

name,countCoActors
Samuel L. Jackson,1824


### 12. We will now write some queries for a Content-Based Movie Recommendation System such as NetFlix. However, in this project we shall deploy a simple algorithm that may or may not produce optimal recommendations. Content-based recommendations focus on the properties of items, in our case movies. The similarity of two movies is determined by measuring the similarity of their properties. For a movie item, we shall consider the following five properties: actors, tags, genres, year, and rating. 

##### Given two movies X and Y, the similarity of Y to X, sim(X,Y), can be computed as: (fraction of common actors + fraction of common tags + fraction of common genres + age gap + rating gap) /5 where fraction is the number of common elements between X and Y divided by the number of elements of X, age gap is the normalized difference between the production years of X and Y, and rating gap is the normalized difference between the ratings of X and Y. Intuitively, the smaller the gaps are, the better (since movies of the same decade and rating are more likely to be similar). Moreover, note that we divide by five because each property is given an equal weight of 1. 

##### Given a user who is known to like the movie ‘Mr. & Mrs. Smith’, write a query that prints the movie title, rating, and similarity percentage (i.e., similarity * 100) for the top 10 movies that are most similar to the ‘Mr. & Mrs. Smith’ movie, ordered by the similarity percentage.

In [4]:
################################## INEFFICIENT SOLUTION ########################################################
def get_mid(movie):
    return movies.filter(movies.title == movie).select('mid').collect()[0][0]

maxYear = int(movies.select(max(movies.year)).collect()[0][0])
maxRating = float(movies.filter(movies.rating != '\\N').select(max(movies.rating)).collect()[0][0])

def similarity(mid_movie1, mid_movie2):
    
    get_num_actor = len(actors.filter(actors.mid == mid_movie1).select('name').collect())
    get_num_tags = len(tags.filter(tags.mid == mid_movie1).select('tid').distinct().collect())
    get_num_genres = len(genres.filter(genres.mid == mid_movie1).select('genre').distinct().collect())
    
    actorsMovie1 = actors.filter(actors.mid == mid_movie1).select('name')
    actorsMovie2 = actors.filter(actors.mid == mid_movie2).select('name')
    intersect_actor =  len(actorsMovie1.intersect(actorsMovie2).collect())/get_num_actor
    
    tagsMovie1 = tags.filter(tags.mid == mid_movie1).select('tid')
    tagsMovie2 = tags.filter(tags.mid == mid_movie2).select('tid')
    intersect_tag = len(tagsMovie1.intersect(tagsMovie2).collect())/get_num_tags
    
    genreMovie1 = genres.filter(genres.mid == mid_movie1).select('genre')
    genreMovie2 = genres.filter(genres.mid == mid_movie2).select('genre')
    intersect_genre =  len(genreMovie1.intersect(genreMovie2).collect())/get_num_genres
    
    year_movie1 = int(movies.filter(movies.mid == mid_movie1).select('year').collect()[0][0])
    year_movie2 = int(movies.filter(movies.mid == mid_movie2).select('year').collect()[0][0])
    difference =  abs(year_movie1 - year_movie2)
    age_gap =  1 - (difference/maxYear)
    
    
    rating_movie1 = float(movies.filter(movies.mid == mid_movie1).select('rating').collect()[0][0])
    rating_movie2 = float(movies.filter(movies.mid == mid_movie2).select('rating').collect()[0][0])
    difference_gap = abs(rating_movie1 - rating_movie2) 
    rating_gap = 1 - (difference_gap/maxRating)
    similarity = round((intersect_actor+intersect_tag+intersect_genre + age_gap + rating_gap)/5,2)*100

    return similarity

mid_movieMrSmith = get_mid('Mr. & Mrs. Smith')
movies_without_MrSmith = movies.filter(movies.mid != mid_movieMrSmith).filter(movies.rating != '\\N').limit(100).select('mid')
similarity_for_movies = []

for mid in movies_without_MrSmith.collect():
    similarity_for_movies.append([mid_movieMrSmith,mid[0], similarity(mid_movieMrSmith,mid[0])])

rdd = sc.parallelize(similarity_for_movies)
result = rdd.toDF().withColumnRenamed('_1','mid_movieMrSmith').withColumnRenamed('_2','mid').withColumnRenamed('_3','similarity')
r = result.join(movies,['mid']).select('title', 'rating','similarity')
final_result= r.sort(r.similarity.desc()).limit(10)
final_result


title,rating,similarity
Waiting to Exhale,3.3,61.0
Mighty Aphrodite,3.3,60.0
Gazon maudit,3.4,60.0
The American Pres...,3.2,59.0
Grumpy Old Men,3.2,59.0
Beautiful Girls,3.6,59.0
Bottle Rocket,3.7,59.0
Sabrina,3.8,57.99999999999999
Sense and Sensibi...,3.8,57.99999999999999
Vampire in Brooklyn,2.4,57.99999999999999


In [5]:
from pyspark.sql.functions import abs, round

def get_mid(movie):
    return movies.filter(movies.title == movie).select('mid').collect()[0][0]

maxYear = int(movies.select(max(movies.year)).collect()[0][0])
maxRating = float(movies.filter(movies.rating != '\\N').select(max(movies.rating)).collect()[0][0])
mid_Mr = get_mid('Mr. & Mrs. Smith')
num_actors_Mr = actors.filter(actors.mid == mid_Mr).count()
num_genres_Mr = genres.filter(genres.mid == mid_Mr).count()
num_tags_Mr = tags.filter(tags.mid == mid_Mr).count()
year_Mr = int(movies.filter(movies.mid == mid_Mr).select('year').collect()[0][0])
rating_Mr = float(movies.filter(movies.mid == mid_Mr).select('rating').collect()[0][0])

#we create table common_actors
actors_Mr = actors.filter(actors.mid == mid_Mr).select('mid','name').withColumnRenamed('mid','midMr')
# here we don't have count = 0 for no relation between actors
partial_common_actors = actors.filter(actors.mid != mid_Mr).join(actors_Mr,['name']).groupBy('mid').count()
# we add count = 0 for the remaining movies
partial_common_actors_withCount= movies.select('mid').withColumnRenamed('mid','midNew')
c = partial_common_actors_withCount.join(partial_common_actors, partial_common_actors.mid == partial_common_actors_withCount.midNew, how='left').select('midNew','count')
common_actors = c.fillna({'count':'0'}).select('midNew', col('count')/num_actors_Mr).withColumnRenamed('midNew','mid').withColumnRenamed('(count / 12)','common_actors_value')

#we create table common_genres
genres_Mr = genres.filter(genres.mid == mid_Mr).select('mid','genre').withColumnRenamed('mid','midMr')
partial_common_genres = genres.filter(genres.mid != mid_Mr).join(genres_Mr,['genre']).groupBy('mid').count()
partial_common_genres_withCount= movies.select('mid').withColumnRenamed('mid','midNew')
g = partial_common_genres_withCount.join(partial_common_genres, partial_common_genres.mid == partial_common_genres_withCount.midNew, how='left').select('midNew','count')
common_genres = g.fillna({'count':'0'}).select('midNew', col('count')/num_genres_Mr).withColumnRenamed('midNew','mid').withColumnRenamed('(count / 2)','common_genres_value')

#we create table common_tags
tags_Mr = tags.filter(tags.mid == mid_Mr).select('mid','tid').withColumnRenamed('mid','midMr')
partial_common_tags = tags.filter(tags.mid != mid_Mr).join(tags_Mr,['tid']).groupBy('mid').count()
partial_common_tags_withCount= movies.select('mid').withColumnRenamed('mid','midNew')
t = partial_common_tags_withCount.join(partial_common_tags, partial_common_tags.mid == partial_common_tags_withCount.midNew, how='left').select('midNew','count')
common_tags = t.fillna({'count':'0'}).select('midNew', col('count')/num_tags_Mr).withColumnRenamed('midNew','mid').withColumnRenamed('(count / 3)','common_tags_value')

#we create table year_gap
y = movies.select('mid','year').withColumn('year', col('year').cast('int')).withColumn('yearMr',lit(year_Mr))
year_gap = y.select('mid',1 - ((abs(col('year')-col('yearMr')))/maxYear)).withColumnRenamed('(1 - (abs((year - yearMr)) / 2011))', 'year_gap')

#we create table rating_gap, we change null value with 0.0
r = movies.select('mid','rating').withColumn('rating', col('rating').cast('double')).fillna({'rating':'0'}).withColumn('ratingMr',lit(rating_Mr))
rating_gap = r.select('mid',1 - ((abs(col('rating')-col('ratingMr')))/maxRating)).withColumnRenamed('(1 - (abs((rating - ratingMr)) / 5.0))', 'rating_gap')

final_table = common_actors.join(common_genres,['mid']).join(common_tags,['mid']).join(year_gap,['mid']).join(rating_gap,['mid'])
f = final_table.withColumn('similarity',lit(((final_table.common_actors_value+final_table.common_genres_value+final_table.common_tags_value+final_table.year_gap+final_table.rating_gap)/5)*100))

partial_result = f.join(movies,['mid']).select('title', 'rating','similarity').withColumn('similarity', round('similarity',2))
result = partial_result.sort(partial_result.similarity.desc()).limit(10)
result.toPandas()

Unnamed: 0,title,rating,similarity
0,Mr. & Mrs. Smith,3.4,80.0
1,Hitch,3.4,66.67
2,Waitress,3.4,66.65
3,"Definitely, Maybe",3.5,66.24
4,Bend It Like Beckham,3.2,65.84
5,Walking and Talking,3.6,65.78
6,French Kiss,3.2,65.77
7,The Naked Gun: From the Files of Police Squad!,3.2,65.7
8,The Women,3.0,65.04
9,L'ultimo bacio,3.8,65.03


In [None]:
################################## MAP SOLUTION WITH ERROR ########################################################


minYear = int(movies.select(min(movies.year)).collect()[0][0])
maxYear = int(movies.select(max(movies.year)).collect()[0][0])
maxRating = float(movies.filter(movies.rating != '\\N').select(max(movies.rating)).collect()[0][0])

def get_mid(movie):
    return movies.filter(movies.title == movie).select('mid').collect()[0][0]


def similarity(mid_movie1, mid_movie2):
    
    get_num_actor = len(actors.filter(actors.mid == mid_movie1).select('name').collect())
    get_num_tags = len(tags.filter(tags.mid == mid_movie1).select('tid').distinct().collect())
    get_num_genres = len(genres.filter(genres.mid == mid_movie1).select('genre').distinct().collect())
    
    actorsMovie1 = actors.filter(actors.mid == mid_movie1).select('name')
    actorsMovie2 = actors.filter(actors.mid == mid_movie2).select('name')
    intersect_actor =  len(actorsMovie1.intersect(actorsMovie2).collect())/get_num_actor
    
    tagsMovie1 = tags.filter(tags.mid == mid_movie1).select('tid')
    tagsMovie2 = tags.filter(tags.mid == mid_movie2).select('tid')
    intersect_tag = len(tagsMovie1.intersect(tagsMovie2).collect())/get_num_tags
    
    genreMovie1 = genres.filter(genres.mid == mid_movie1).select('genre')
    genreMovie2 = genres.filter(genres.mid == mid_movie2).select('genre')
    intersect_genre =  len(genreMovie1.intersect(genreMovie2).collect())/get_num_genres
    
    year_movie1 = int(movies.filter(movies.mid == mid_movie1).select('year').collect()[0][0])
    year_movie2 = int(movies.filter(movies.mid == mid_movie2).select('year').collect()[0][0])
    difference = abs(year_movie1 - year_movie2)
    age_gap =  1 - (difference/maxYear)
    
    
    rating_movie1 = float(movies.filter(movies.mid == mid_movie1).select('rating').collect()[0][0])
    rating_movie2 = float(movies.filter(movies.mid == mid_movie2).select('rating').collect()[0][0])
    difference_gap = abs(rating_movie1 - rating_movie2) 
    rating_gap = 1 - (difference_gap/maxRating)
    similarity = round((intersect_actor+intersect_tag+intersect_genre + age_gap + rating_gap)/5,2)*100

    return similarity#result.select('similarity')


mid_movieMrSmith = get_mid('Mr. & Mrs. Smith')
movies_without_MrSmith = movies.filter(movies.mid != mid_movieMrSmith).filter(movies.rating != '\\N').select('mid')
similarity_for_movies = []

map_film = movies_without_MrSmith.rdd.map(lambda x: ((mid_movieMrSmith,x.mid), similarity(mid_movieMrSmith,x.mid)))
map_film.take(5)