# MODUPE OYEDIRAN
##modupeolayinkaoyediran@gmail.com

Movielens Data Analytics
--------------------------
For this notebook, we will be using a MovieLens sample dataset. The data includes 100,000 ratings and 3,600 tag applications applied to 9,000 movies by 600 users and can be found in https://grouplens.org/datasets/movielens/latest/

We load the data we just downloaded from the MovieLens website. The files were uploaded to an S3 bucket and loaded from there.

In [0]:
links = spark.read.format("csv").option("inferSchema", "true").option("header","true").load("dbfs:/FileStore/shared_uploads/dupe_oyediran@yahoo.com/links.csv")
movies = spark.read.format("csv").option("inferSchema", "true").option("header","true").load("dbfs:/FileStore/shared_uploads/dupe_oyediran@yahoo.com/movies.csv")
tags = spark.read.format("csv").option("inferSchema", "true").option("header","true").load("dbfs:/FileStore/shared_uploads/dupe_oyediran@yahoo.com/tags.csv")
ratings = spark.read.format("csv").option("inferSchema", "true").option("header","true").load("dbfs:/FileStore/shared_uploads/dupe_oyediran@yahoo.com/ratings.csv")

Once the data is loaded, we take a look at them by using the show action is spark.

In [0]:
links.show()

In [0]:
movies.show()

In [0]:
tags.show()

In [0]:
ratings.show()

As we cans see in the table previews, the year of the movie is embeded in the movie title. In order to extract this information, I created a User Defined Function(UDF) that takes the year when available, otherwise it returns null.

In [0]:
from pyspark.sql.functions import udf
import pyspark.sql.functions as F

def get_year(title):
  try:
    return(int(title[-5:-1]))
  except:
    return(None)

get_year_udf = udf(get_year)

In [0]:
movies = movies.withColumn("year", get_year_udf(movies.title))
movies.show()

# Insights

As part of the analysis of this dataset, it would be useful to have the average rating for each movie. In the following cell of code, I aggregate over the ratings table to get the average rating for each movie ID.

In [0]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import bround
from pyspark.sql.functions import mean

ratings_agg = ratings.groupBy("movieId").agg(mean("rating").alias("avg_rating"))
ratings_agg = ratings_agg.withColumn("average_rating", ratings_agg.avg_rating.cast(FloatType())).drop("avg_rating").withColumnRenamed("average_rating", "avg_rating")
ratings_agg = ratings_agg.select("movieId",bround("avg_rating",2).alias("avg_rating"))
ratings_agg.show()

In [0]:
joined_movies = movies.join(ratings_agg,"movieId")
joined_movies.show()

Here we evaluate the average rating by year to identify if there is a trend in the ratings either to decrease or increase over the years. Visually, it is not possible to appreaciate such trend, but it was possible to identify some outlayer values in the year column.

To achieve this, it was necessary to join the aggregated ratings table with movies table that includes the year as a column.

In [0]:
joined_movies.select("year",'avg_rating').groupBy("year").mean().orderBy("year").display()

year,avg(avg_rating)
,3.395833333333333
1902.0,3.5
1903.0,2.5
1908.0,4.0
1915.0,2.0
1916.0,3.5625
1917.0,4.5
1919.0,2.0
1920.0,3.6799999475479126
1921.0,4.099999904632568


There does not seem to be any appreciable difference in movie ratings over the years considered.

As mentioned before, the year column contains some outlayers and null values. Thus, here I aggregate the data counting the number of movies by year. By doing this, we can see that there are some movies with years from the early 1900s.

In [0]:
joined_movies.select("year",'avg_rating').groupBy("year").count().orderBy("year").display()

year,count
,24
1902.0,1
1903.0,1
1908.0,1
1915.0,1
1916.0,4
1917.0,1
1919.0,1
1920.0,2
1921.0,1


We can see that there is an exponential increase in the qauntity of movie produced over the years.

We want to see how each movie genre accounts for the quantity of movies reviewed.

In [0]:
from pyspark.sql.functions import split,explode

exploded_movies = movies.withColumn("genres", explode(split("genres","[|]")))
exploded_movies.groupBy("genres").count().display()

genres,count
Crime,1199
Romance,1596
Thriller,1894
Adventure,1263
Drama,4361
War,382
Documentary,440
Fantasy,779
Mystery,573
Musical,334


We can see that the Drama and Comedy genre are the most reviewed genre about doubling any of the other genres. This perhaps indicates that they are most liked by the public.

Something noteworthy to analyze is to check wether the voters have a bias on a particular genre. This could be tested by taking the average rating by genre and see whether the rating distribution is uniform, that is, that there is no systematic bias towards a particular genre.

In [0]:
rated_genres = exploded_movies.join(ratings,"movieId").select("genres","rating")
rated_genres.groupBy("genres").mean().display()

genres,avg(rating)
Crime,3.658293867274144
Romance,3.506510704038844
Thriller,3.4937055799183425
Adventure,3.5086089151939075
Drama,3.656184411371876
War,3.8082938876312
Documentary,3.797785069729286
Fantasy,3.4910005070136894
Mystery,3.632460255407871
Musical,3.5636781053649105


By doing a visual analysis, it seems that the people rating the movies has no bias towards a particular genre. The distribution looks quite uniform, even though the sample is small.

Let's now proceed to see which movies are the ones with the best score and those ones with the worst score. For this purpose, I will take the top 10 elements from the joined table that includes the movie title and the average rating.

In [0]:
from pyspark.sql.functions import desc
joined_movies.select("title","avg_rating").orderBy(desc("avg_rating")).head(10)

Now I take the 10 movies with the lowest average rating to see the worst movies in our dataset.

In [0]:
joined_movies.select("title","avg_rating").orderBy("avg_rating").head(10)

finally I save the dataframe to an S3 bucket in parquet format for storage to a AWS Redshift datawarehouse for later anayltics

In [0]:
joined_movies.write.format("parquet").mode(SaveMode.Overwrite).option("path","s3a://filestoragedatabricks/Spark-essentials-data/bands2.parquet").save()