In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc=SparkContext()

ratings=sc.textFile('ratings.dat')
ratings.take(5)

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

In [2]:
def parseRatings(row):
    splitted = list(row.split('::'))
    return (int(splitted[0]),int(splitted[1]),int(splitted[2]),splitted[3])

In [3]:
ratings = ratings.map(parseRatings)

In [4]:
ratings.take(5)

[(1, 1193, 5, '978300760'),
 (1, 661, 3, '978302109'),
 (1, 914, 3, '978301968'),
 (1, 3408, 4, '978300275'),
 (1, 2355, 5, '978824291')]

In [5]:
rating_1_count = ratings.filter(lambda x: x[2] == 1).count()
rating_1_count

56174

In [6]:
unique_movies = ratings.map(lambda x: x[1]).distinct().count()
unique_movies

3706

In [7]:
user_id = ratings.map(lambda x : (x[0] ,1)).reduceByKey(lambda x,y: x+y).max(lambda x : x[1])
user_id

(4169, 2314)

In [8]:
movies_rated_by_user = ratings.filter(lambda x: x[0] == user_id[0])

In [10]:
movies_rated_by_user.take(5)

[(4169, 3789, 5, '965333672'),
 (4169, 571, 4, '973310265'),
 (4169, 574, 3, '975805232'),
 (4169, 575, 3, '976589949'),
 (4169, 577, 3, '988324145')]

In [11]:
movies = sc.textFile('movies.dat')
movies.take(5)

["1::Toy Story (1995)::Animation|Children's|Comedy",
 "2::Jumanji (1995)::Adventure|Children's|Fantasy",
 '3::Grumpier Old Men (1995)::Comedy|Romance',
 '4::Waiting to Exhale (1995)::Comedy|Drama',
 '5::Father of the Bride Part II (1995)::Comedy']

In [12]:
def parseMovies(row):
    splitted = list(row.split('::'))
    return (int(splitted[0]),splitted[1],splitted[2])

In [13]:
movies = movies.map(parseMovies)
movies.take(5)

[(1, 'Toy Story (1995)', "Animation|Children's|Comedy"),
 (2, 'Jumanji (1995)', "Adventure|Children's|Fantasy"),
 (3, 'Grumpier Old Men (1995)', 'Comedy|Romance'),
 (4, 'Waiting to Exhale (1995)', 'Comedy|Drama'),
 (5, 'Father of the Bride Part II (1995)', 'Comedy')]

In [14]:
users = sc.textFile('users.dat')
users.take(5)

['1::F::1::10::48067',
 '2::M::56::16::70072',
 '3::M::25::15::55117',
 '4::M::45::7::02460',
 '5::M::25::20::55455']

In [15]:
def parseUsers(row):
    splitted = list(row.split('::'))
    return (int(splitted[0]),splitted[1],int(splitted[2]), splitted[3],splitted[4])

In [16]:
users = users.map(parseUsers)
users.take(5)

[(1, 'F', 1, '10', '48067'),
 (2, 'M', 56, '16', '70072'),
 (3, 'M', 25, '15', '55117'),
 (4, 'M', 45, '7', '02460'),
 (5, 'M', 25, '20', '55455')]

In [18]:
genders = movies.map(lambda x : x[2]).flatMap(lambda x : x.split('|'))

In [19]:
genders0 = movies.map(lambda x : x[2]).map(lambda x : x.split('|'))

In [20]:
genders.take(10)

['Animation',
 "Children's",
 'Comedy',
 'Adventure',
 "Children's",
 'Fantasy',
 'Comedy',
 'Romance',
 'Comedy',
 'Drama']

In [21]:
genders = genders.sortBy(lambda x:x,0).distinct()
genders.take(20)

['Western',
 'Thriller',
 'Sci-Fi',
 'Romance',
 'Musical',
 'Horror',
 'Fantasy',
 'Drama',
 'Documentary',
 "Children's",
 'Action',
 'War',
 'Mystery',
 'Film-Noir',
 'Crime',
 'Comedy',
 'Animation',
 'Adventure']

In [22]:
nb_genders = movies.flatMap(lambda x: x[2].split('|')).map(lambda g: (g, 1)).reduceByKey(lambda x, y: x + y)
nb_genders.take(5) 

[("Children's", 251),
 ('Fantasy', 68),
 ('Romance', 471),
 ('Drama', 1603),
 ('Action', 503)]

Question 8

In [23]:
valid_users = users.filter(lambda x : x[1] == 'M' and x[2] > 45 ).map(lambda x : (x[0],1))
valid_ratings = ratings.filter(lambda x : x[2] >= 4).map(lambda x : (x[0] , x[1]))

valid_rating_by_users = valid_users.join(valid_ratings).map(lambda x: (x[1][1],1))


In [24]:
movies_genres = valid_rating_by_users.join(movies.map(lambda x: (x[0], x[2]))).flatMap(lambda x: x[1][1].split('|'))
movies_genres.distinct().sortBy(lambda x:x[0],1).take(10)

['Action',
 'Animation',
 'Adventure',
 "Children's",
 'Comedy',
 'Crime',
 'Drama',
 'Documentary',
 'Film-Noir',
 'Fantasy']

In [25]:
genre_counts = movies_genres.map(lambda genre: (genre, 1)).reduceByKey(lambda x, y: x + y)
genre_counts.take(5)

[('Action', 11988),
 ('Sci-Fi', 7137),
 ('Western', 2024),
 ('Animation', 1361),
 ('Thriller', 9586)]

Question 9

In [26]:
import re 
def get_year(title) :
    match = re.search(r"\((\d{4})\)", title)
    return int(match[1]) if match else None

In [27]:
movies_selected = movies.map(lambda x: (get_year(x[1]), x[2])) \
                           .flatMap(lambda x: [((x[0], genre),1) for genre in x[1].split('|')])

In [28]:
movies_selected = movies_selected.reduceByKey(lambda x,y : x+y).map(lambda x: (x[0][0], (x[0][1], x[1])))

In [29]:
movies_selected.take(10)

[(1995, ('Animation', 8)),
 (1995, ('Comedy', 89)),
 (1995, ('Adventure', 25)),
 (1995, ('Crime', 18)),
 (1995, ('War', 12)),
 (1994, ('Action', 32)),
 (1994, ('Drama', 121)),
 (1994, ('Thriller', 31)),
 (1994, ('Romance', 37)),
 (1995, ('Mystery', 8))]

In [32]:
movies_selected = movies_selected.reduceByKey(lambda a, b: a if a[1] > b[1] else b)

In [33]:
movies_selected.sortBy(lambda x : x[0] , 0 ).take(10)

[(2000, ('Comedy', 69)),
 (1999, ('Drama', 130)),
 (1998, ('Drama', 166)),
 (1997, ('Drama', 139)),
 (1996, ('Drama', 150)),
 (1995, ('Drama', 158)),
 (1994, ('Drama', 121)),
 (1993, ('Drama', 81)),
 (1992, ('Drama', 38)),
 (1991, ('Drama', 26))]