In [45]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession . \
builder . \
appName("Assignment4") . \
master("spark://spark-master:7077") . \
config("spark.executor.memory", "512m") . \
getOrCreate()

In [46]:
df_movies = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('movies.csv')

df_ratings = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('ratings.csv')

df_tags = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('tags.csv')

In [47]:
""" Q1: How many movies of genre drama are there?
# Since movie may has multiple movie genre tags, for each movie, we only to check if its genres contains Drama """

drama_movie = df_movies.filter(col("genres").contains("Drama"))
num_drama = drama_movie.count()
print("There are " + str(num_drama) + " Drama tagged movies")

There are 4361 Drama tagged movies


In [48]:
""" Q2: How many unique movies are rated,how many are not rated?
# A movie may has review from multiple users, therefore, we only extract the distinct movies from all the reviewed movies
# Compare the reviewed movies with the total movies, we will get the unrated movies """

# Q2.1 How many unique movies are rated
unique_movie = df_ratings.select(col("movieId")).distinct().sort("movieId")
#unique_movie.show()
num_unique = unique_movie.count()
print("There are " + str(num_unique) + " unique movies are rated")

# Q2.2 how many movies are not rated?
total_movies = df_movies.select(col("movieId")).distinct().sort("movieId")
#movies.show()

leftMovie = total_movies.join(unique_movie,["movieId"],'leftanti').sort("movieId")
leftMovie.show()
Not_rate_movie = leftMovie.count()
#print("There are " + str(total_movies) + " movies in total")
print("There are " + str(Not_rate_movie) + " movies not rated")

                                                                                

There are 9724 unique movies are rated
+-------+
|movieId|
+-------+
|   1076|
|   2939|
|   3338|
|   3456|
|   4194|
|   5721|
|   6668|
|   6849|
|   7020|
|   7792|
|   8765|
|  25855|
|  26085|
|  30892|
|  32160|
|  32371|
|  34482|
|  85565|
+-------+

There are 18 movies not rated


In [49]:
""" Q3: Who gave the most ratings, how many rates did the person make?
# users can write multiple rates, we only have to count the number of the occurrences of the userID, 
# then we can know how many rates the user gave
which is: group the rates by userId and count the number of rates person wrote """

user_rates = df_ratings.groupBy("userId").count()
user_rates.sort(col("userId"))
#user_rates.show()

Max_rate = user_rates.groupBy().max("count").head()['max(count)'] #get the first item in the list
user = user_rates.select("userId").where(col("count") == Max_rate).head()["userId"]
print("UserId:" + str(user) + " gave the most ratings which are " + str(Max_rate) + " ratings in total")

#deptDF.show(truncate=False)

                                                                                

UserId:414 gave the most ratings which are 2698 ratings in total


In [50]:
""" Q4: Compute min, average, max rating per movie
# groupBy the movieId and calculate the min, average, max rating """

movie_rate_min = df_ratings.groupBy("movieId").min("rating")
movie_rate_max = df_ratings.groupBy("movieId").max("rating")
movie_rate_avg = df_ratings.groupBy("movieId").mean("rating")

movie_rate = movie_rate_min.join(movie_rate_max,["movieId"]).join(movie_rate_avg,["movieId"]).sort(col("movieId"))
movie_rate.show()

+-------+-----------+-----------+------------------+
|movieId|min(rating)|max(rating)|       avg(rating)|
+-------+-----------+-----------+------------------+
|      1|        0.5|        5.0|3.9209302325581397|
|      2|        0.5|        5.0|3.4318181818181817|
|      3|        0.5|        5.0|3.2596153846153846|
|      4|        1.0|        3.0| 2.357142857142857|
|      5|        0.5|        5.0|3.0714285714285716|
|      6|        1.0|        5.0| 3.946078431372549|
|      7|        1.0|        5.0| 3.185185185185185|
|      8|        1.0|        5.0|             2.875|
|      9|        1.5|        5.0|             3.125|
|     10|        0.5|        5.0| 3.496212121212121|
|     11|        1.0|        5.0|3.6714285714285713|
|     12|        1.0|        5.0|2.4210526315789473|
|     13|        2.0|        4.0|             3.125|
|     14|        3.0|        5.0|3.8333333333333335|
|     15|        1.0|        5.0|               3.0|
|     16|        1.0|        5.0| 3.9268292682

                                                                                

In [51]:
movie_rate.toPandas().to_csv('MovieRates.csv')

                                                                                

In [52]:
"""
# To distinguish the timestamp in tags.csv and ratings.cav, we rename the timestamp column in the file """
df_tags = df_tags.withColumnRenamed("timestamp","tags_timestamp").sort("userId")
df_ratings = df_ratings.withColumnRenamed("timestamp","ratings_timestamp").sort("userId")
#df_tags.show()

""" Q5: Output data-set containing users that have rated a movie but not tagged it
# for rated but non-tag movies, join the tags and rating files together and 
# return records only in the rating.csv """
rateNotTag = df_ratings.join(df_tags,["userId","movieId"],'leftanti').sort("userId")
rateNotTag.show()

""" Q6: Output data-set containing users that have rated AND tagged a movie
# for rated And tagged movies, join the tags and rating files together and 
# return records not only in rating.csv but also in tags.csv """
rateAndTag = df_ratings.join(df_tags,["userId","movieId"],'inner').sort("userId")
rateAndTag.show()

+------+-------+------+-----------------+
|userId|movieId|rating|ratings_timestamp|
+------+-------+------+-----------------+
|     1|      1|   4.0|        964982703|
|     1|      3|   4.0|        964981247|
|     1|      6|   4.0|        964982224|
|     1|     47|   5.0|        964983815|
|     1|     50|   5.0|        964982931|
|     1|     70|   3.0|        964982400|
|     1|    101|   5.0|        964980868|
|     1|    110|   4.0|        964982176|
|     1|    151|   5.0|        964984041|
|     1|    157|   5.0|        964984100|
|     1|    163|   5.0|        964983650|
|     1|    216|   5.0|        964981208|
|     1|    223|   3.0|        964980985|
|     1|    231|   5.0|        964981179|
|     1|    235|   4.0|        964980908|
|     1|    260|   5.0|        964981680|
|     1|    296|   3.0|        964982967|
|     1|    316|   3.0|        964982310|
|     1|    333|   5.0|        964981179|
|     1|    349|   4.0|        964982563|
+------+-------+------+-----------

In [53]:
rateNotTag.toPandas().to_csv('rateNotTag.csv')
rateAndTag.toPandas().to_csv('rateAndTag.csv')

In [54]:
""" Q7: Output data-set showing the number of movies per genre, per year
# Q7.1: data-set showing the number of movies per year
# regression format: text within the last parentheses has to be numbers, may be with special characters, 
# parentheses may also followed by some trailing spaces, or other special characters and then it has to be the end of the line """

df = df_movies.withColumn("movieYear", regexp_extract(col("title"),  r"\((\d[^()]+)\)\s*$", 1)).drop('genres')
#df.show()
#final.toPandas().to_csv('FinalWithYear.csv')

moviePerYear = df.groupBy("movieYear").count()
moviePerYear.sort(col("movieYear")).show()
moviePerYear.toPandas().to_csv('MovieYear.csv')

+---------+-----+
|movieYear|count|
+---------+-----+
|         |   13|
|     1902|    1|
|     1903|    1|
|     1908|    1|
|     1915|    1|
|     1916|    4|
|     1917|    1|
|     1919|    1|
|     1920|    2|
|     1921|    1|
|     1922|    1|
|     1923|    4|
|     1924|    5|
|     1925|    4|
|     1926|    5|
|     1927|    7|
|     1928|    4|
|     1929|    4|
|     1930|    5|
|     1931|   14|
+---------+-----+
only showing top 20 rows



In [55]:
""" Q7.2: data-set showing the number of movies per genre
# The movie may have multiple genres, with multiGenre tags, each genre is separated by a special character | 
# turn the multiGenre tags into arrays for easy future splitting
# Split the movie genre in multiGenre tags into separate row
# GroupBy the movie genres and count the number of each movie genre """

movies = df_movies.withColumn("MovieGenres",split(col("genres"),"\|"))
movie_genres = movies.select(movies.movieId, explode(movies.MovieGenres).alias('MovieGenres'))
#df2.show()
moviePerGenre = movie_genres.groupBy("MovieGenres").count()
moviePerGenre.sort(col("MovieGenres")).show()

+------------------+-----+
|       MovieGenres|count|
+------------------+-----+
|(no genres listed)|   34|
|            Action| 1828|
|         Adventure| 1263|
|         Animation|  611|
|          Children|  664|
|            Comedy| 3756|
|             Crime| 1199|
|       Documentary|  440|
|             Drama| 4361|
|           Fantasy|  779|
|         Film-Noir|   87|
|            Horror|  978|
|              IMAX|  158|
|           Musical|  334|
|           Mystery|  573|
|           Romance| 1596|
|            Sci-Fi|  980|
|          Thriller| 1894|
|               War|  382|
|           Western|  167|
+------------------+-----+

