In [39]:
from pyspark.sql.types import *
from pyspark.sql.functions import * 
from pyspark.sql.types import Row

In [2]:
ratings = sc.textFile("ml-100k/u.data").map(lambda line: line.split("\t"))
movies = sc.textFile("ml-100k/u.item").map(lambda line: line.split("|"))
users = sc.textFile("ml-100k/u.user").map(lambda line: line.split("|"))
#genres = sc.textFile("ml-100k/u.genre").map(lambda line: line.split("|"))

In [29]:
ratings_line = sc.textFile("ml-100k/u.data")
movies_line = sc.textFile("ml-100k/u.item")
users_line = sc.textFile("ml-100k/u.user")

In [54]:
ratings_Schema= StructType([StructField("user id", StringType(), True),
                            StructField("item id", StringType(), True),
                            StructField("rating", StringType(), True),
                            StructField("timestamp", StringType(), True)])
movies_Schema= StructType([StructField("movie id", StringType(), True),
                           StructField("movie title", StringType(), True),
                           StructField("release date", StringType(), True),
                           StructField("video release date", StringType(), True),
                           StructField("IMBd URL", StringType(), True),
                           StructField("unknown", StringType(), True),
                           StructField("action", StringType(), True),
                           StructField("adventure", StringType(), True),
                           StructField("animation", StringType(), True),
                           StructField("Childen's", StringType(), True),
                           StructField("comedy", StringType(), True),
                           StructField("crime", StringType(), True),
                           StructField("documentary", StringType(), True),
                           StructField("drama", StringType(), True),
                           StructField("fantasy", StringType(), True),
                           StructField("film-noir", StringType(), True),
                           StructField("horror", StringType(), True),
                           StructField("musical", StringType(), True),
                           StructField("mystery", StringType(), True),
                           StructField("romance", StringType(), True),
                           StructField("sci-fi", StringType(), True),
                           StructField("thriller", StringType(), True),
                           StructField("war", StringType(), True),
                           StructField("western", StringType(), True)])
users_Schema= StructType([StructField("user id", StringType(), True),
                          StructField("age", StringType(), True),
                          StructField("gender", StringType(), True),
                          StructField("occupation", StringType(), True),
                          StructField("zip code", StringType(), True)])

In [58]:
ratings_df = spark.createDataFrame(ratings, ratings_Schema).alias("ratings_df")
movies_df = spark.createDataFrame(movies, movies_Schema)
users_df = spark.createDataFrame(users, users_Schema)

In [59]:
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

def parse_movie_genres(line):
    row = line.split("|")
    return Row(movie_id=int(row[0]), movie_title=row[1], uknown=int(row[5]), action=int(row[6]), adventure=int(row[7]), animation=int(row[8]), children=int(row[9]), comedy=int(row[10]), crime=int(row[11]), documentary=int(row[12]), fantasy=int(row[13]), film_noir=int(row[14]), horror=int(row[15]), musical=int(row[16]), mystery=int(row[17]), romance=int(row[18]), sci_fi=int(row[19]), thriller=int(row[20]), war=int(row[21]), western=int(row[22]))

# 1-2) Print top 10 movies by ratings

In [60]:
movieNames = loadMovieNames()
df = ratings_df.withColumn("counter", lit(1))
df2 = df.withColumn("user id",df["user id"].cast(IntegerType()))
df3 = df2.groupBy("user id").sum("counter")
df4 = df3.sort("sum(counter)", ascending=False).take(10)
for result in df4:
    print(movieNames[result[0]], result[1])

Mission: Impossible (1996) 737
Stand by Me (1986) 685
Mighty Aphrodite (1995) 636
Star Trek V: The Final Frontier (1989) 540
Leaving Las Vegas (1995) 518
Old Yeller (1957) 493
My Own Private Idaho (1991) 490
Ulee's Gold (1997) 484
Jaws (1975) 480
Mrs. Doubtfire (1993) 448


# 3) Print list of the number of ratings by genre

In [66]:
df = movies_line.map(parse_movie_genres)
df2 = spark.createDataFrame(df).alias("movies_genres")
df3 = df2.join(ratings_df, df2.movie_id == ratings_df["item id"])
df3.groupBy().sum().take(10)

[Row(sum(action)=25589, sum(adventure)=13753, sum(animation)=3605, sum(children)=7182, sum(comedy)=29832, sum(crime)=8055, sum(documentary)=758, sum(fantasy)=39895, sum(film_noir)=1352, sum(horror)=1733, sum(movie_id)=42553013, sum(musical)=5317, sum(mystery)=4954, sum(romance)=5245, sum(sci_fi)=19461, sum(thriller)=12730, sum(uknown)=10, sum(war)=21872, sum(western)=9398)]

# 4) Print the oldest movie with a "5" rating

In [140]:
df = movies_df[["movie id", "movie title", "release date"]]
df2 = df.withColumn("release date",to_date(df["release date"], "dd-MMM-yyyy"))
rating = ratings_df[["item id", "rating"]].filter(col("rating") == 5)
df3 = df2.join(rating, rating["item id"] == df3["movie id"])
df4 = df3.orderBy("release date", ascending=True).where(df3["release date"].isNotNull()).take(1)
df4

[Row(movie id='675', movie title='Nosferatu (Nosferatu, eine Symphonie des Grauens) (1922)', release date=datetime.date(1922, 1, 1), item id='675', rating='5')]

# 5) Print a list of the genres of the top 10 most rated movies

In [144]:
df = ratings_df.withColumn("counter", lit(1))
df2 = df.withColumn("user id",df["user id"].cast(IntegerType()))
df3 = df2.groupBy("user id").sum("counter")
df4 = df3.sort("sum(counter)", ascending=False).take(10)

In [146]:
df4

[Row(user id=405, sum(counter)=737),
 Row(user id=655, sum(counter)=685),
 Row(user id=13, sum(counter)=636),
 Row(user id=450, sum(counter)=540),
 Row(user id=276, sum(counter)=518),
 Row(user id=416, sum(counter)=493),
 Row(user id=537, sum(counter)=490),
 Row(user id=303, sum(counter)=484),
 Row(user id=234, sum(counter)=480),
 Row(user id=393, sum(counter)=448)]