In [1]:
from pyspark import SparkConf, SparkContext
import pyspark
conf = SparkConf()
#set validateOutputSpecs to false to ignore writing file to exists output directory
conf.set("spark.hadoop.validateOutputSpecs", "false")
sc = SparkContext.getOrCreate()
sc.stop()
sc = SparkContext(appName = 'FindTopTenMoviesByHashtags', conf = conf)

In [2]:
sc # to make sure that spark is work !

# Hashtag section

In [3]:
#define hashtags here 
input_hashtags = ['action', 'adventure']

In [4]:
#load tags data
tags_raw = sc.textFile("ml-10M100K/tags.dat")

In [5]:
#user_id, movie_id, tag, timestamp
tags_array = tags_raw.map(lambda line: line.split('::'))
tags_array.takeSample(False, 5)

[['26242', '5753', 'Gota kanal', '1186089703'],
 ['38662', '4873', 'dreams', '1175022485'],
 ['622', '1391', 'alien invasion', '1215913291'],
 ['37419', '527', 'Holocaust', '1146738767'],
 ['37419', '37384', 'workplace', '1140334155']]

In [6]:
#remove special characters and whitespace
def __extract_tags_data(tags_array):
            hashtag = ''.join(e for e in tags_array[2] if e.isalnum())
            return (hashtag, tags_array[1])

In [7]:
#tag, movie_id
hashtags = tags_array.map(__extract_tags_data)
hashtags.takeSample(False, 5)

[('demons', '1350'),
 ('plotpointtelevision', '6242'),
 ('remade', '7387'),
 ('selfdiscovery', '55247'),
 ('70mm', '592')]

In [8]:
#get only movie_id that contains defined tags
#hashtag_1 = list of movie_id
#hashtag_2 = list of movie_id
movies_with_hashtags = [hashtags.filter(lambda h : h[0].lower() == i.lower()).values() for i in input_hashtags]
for m in movies_with_hashtags: print(m.takeSample(False, 5))

['260', '2133', '4896', '1210', '33493']
['1198', '1580', '1210', '45722', '6539']


In [9]:
#collect movies list in each tags into list
movies_by_tags = [m.collect() for m in movies_with_hashtags]    
#get only movies that contain all defined hashtags by intersection all list
movies_id = set(movies_by_tags[0]).intersection(*movies_by_tags)
print(movies_id)

{'34', '27706', '110', '6817', '2105', '169', '8961', '59315', '5378', '1894', '736', '5618', '35015', '5064', '3999', '1208', '41566', '780', '1196', '2054', '2987', '1801', '4993', '26662', '2046', '59615', '7308', '8361', '588', '1693', '36401', '1210', '7153', '45722', '2005', '1479', '6539', '4896', '2116', '5816', '4270', '5952', '1215', '30810', '3052', '33493', '924', '8253', '480', '48394', '2143', '2953', '34405', '40815', '8873', '653', '7438', '26776', '1408', '2470', '260', '908', '3000', '10', '7302', '1967', '2872', '2404', '2133', '1291', '4367', '1270', '6350', '7147', '2162', '1049', '2294', '2628', '34150', '1587', '45447', '1259', '2406', '2947', '2366', '31658', '2161', '32', '1198', '41569', '1580', '1', '546', '2405', '7099', '3623'}


# Rating section

In [10]:
#load ratings data
ratings_raw = sc.textFile("ml-10M100K/ratings_small.dat")
ratings_raw.takeSample(False, 5)

['188::780::5::838990779',
 '320::2916::4::1162236714',
 '338::1::4::1133744434',
 '215::1377::4::1042431039',
 '215::737::3::949009428']

In [11]:
#movie_id, rating
movies_ratings = ratings_raw.map(lambda line: (line.split('::')[1],float((line.split('::')[2]))))
movies_ratings.takeSample(False, 5)

[('1374', 4.0), ('2762', 3.0), ('2728', 4.0), ('2282', 4.0), ('724', 5.0)]

In [12]:
#filter rating only specific movie_id
movies_ratings_filter = movies_ratings.filter(lambda mv: mv[0] in movies_id)
movies_ratings_filter.takeSample(False, 5)

[('110', 5.0), ('736', 4.0), ('2405', 4.0), ('780', 3.0), ('4993', 5.0)]

In [13]:
#get total sum of rating and total number of rating from users seperated by movie_id
sum_count = (0,0)
sum_movies_ratings = movies_ratings_filter.aggregateByKey(sum_count, lambda a,b: (a[0] + b,    a[1] + 1),
                                  lambda a,b: (a[0] + b[0], a[1] + b[1]))
sum_movies_ratings.takeSample(False, 5)

[('546', (22.0, 13)),
 ('45447', (12.5, 4)),
 ('1210', (553.0, 135)),
 ('4270', (32.5, 12)),
 ('2162', (10.0, 4))]

In [14]:
#get only average rating of each movie
#movie_id, avg_rating
movie_avg_rating = sum_movies_ratings.mapValues(lambda v: round(v[0]/v[1],3)).takeOrdered(10, key = lambda x: -x[1])
print(movie_avg_rating)

[('6350', 4.417), ('260', 4.358), ('31658', 4.333), ('34405', 4.321), ('5618', 4.318), ('1196', 4.266), ('1198', 4.264), ('908', 4.241), ('8961', 4.18), ('7099', 4.167)]


# Movie section

In [15]:
#load movies data
movies_raw = sc.textFile("ml-10M100K/movies.dat")
movies_raw.takeSample(False, 5)

['40969::First Descent (2005)::Documentary',
 '3732::Fury, The (1978)::Horror',
 '31590::Grisbi (Touchez pas au grisbi) (1954)::Crime|Drama|Thriller',
 '58059::I Live in Fear (Ikimono no kiroku) (1955)::Drama',
 '591::Tough and Deadly (1995)::Action|Drama|Thriller']

In [16]:
#movie_id, movie_name
movies = movies_raw.map(lambda line: (line.split('::')[0], line.split('::')[1]))
movies.takeSample(False, 5)

[('6852', 'In Cold Blood (1967)'),
 ('3829', 'Mad About Mambo (2000)'),
 ('36537', 'Thumbsucker (2005)'),
 ('1142', 'Get Over It (1996)'),
 ('2710', 'Blair Witch Project, The (1999)')]

In [17]:
#convert rdd to be dictionary data
movies_list = movies.collect()
movies_list = dict((key, value) for (key,value) in movies_list)

In [18]:
#get the movie name from movies_list and rating from result_movies
top_ten_movies = [(movies_list.get(r[0]),r[1]) for r in movie_avg_rating]
print(top_ten_movies)

[('Castle in the Sky (Tenkû no shiro Rapyuta) (1986)', 4.417), ('Star Wars: Episode IV - A New Hope (a.k.a. Star Wars) (1977)', 4.358), ("Howl's Moving Castle (Hauru no ugoku shiro) (2004)", 4.333), ('Serenity (2005)', 4.321), ('Spirited Away (Sen to Chihiro no kamikakushi) (2001)', 4.318), ('Star Wars: Episode V - The Empire Strikes Back (1980)', 4.266), ('Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)', 4.264), ('North by Northwest (1959)', 4.241), ('Incredibles, The (2004)', 4.18), ('Nausicaä of the Valley of the Winds (Kaze no tani no Naushika) (1984)', 4.167)]


# Output

In [19]:
#save output to hdfs
sc.parallelize(top_ten_movies).coalesce(1).saveAsTextFile('output/FindTopTenMoviesByHashtags/')