# Zeppelin notebook to get top N most rated movies

## Description

It's a zeppelin notebook to determine the top N most rated movies (by average rating) for each specified genre. It allows to set filters for the search: genres, regular expression, years from and to, as well as a number, showing how many movies of each genre to display. At the same time, it sorts movies by genre and average ratings; in case of the same rating then sort by year and title. There is a paragraph to enter arguments for filtering movies. 

## Installation
#### Requirements 
It requires [Python](https://www.python.org/downloads/)  v3+ to run, Docker and Bash.
To install Zeppelin run command:
```
docker run -p 8080:8080 -v /tmp:/tmp --name zeppelin apache/zeppelin:0.9.0
```
Then it will start Zeppelin on port 8080.

In [1]:
%pyspark

# for overwriting 
spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation", "true")

### Download and unpack MovieLens files

On next paragraph will be downloaded ml-latest-small Dataset from [MovieLens](https://grouplens.org/datasets/movielens/) web site, unpacked movies.csv and ratings.csv into /tmp/Datasets folder.

In [3]:
%sh 

if [ -e /tmp/Datasets/ml-latest-small/ratings.csv -a -e /tmp/Datasets/ml-latest-small/movies.csv ]; then 
	echo "Files csv exists."
elif [ -e /tmp/ml-latest-small.zip ]; then 
	unzip /tmp/ml-latest-small.zip \ml-latest-small/ratings.csv \ml-latest-small/movies.csv -d /tmp/Datasets 
	rm /tmp/ml-latest-small.zip
else 
	wget -P /tmp https://files.grouplens.org/datasets/movielens/ml-latest-small.zip 
	unzip /tmp/ml-latest-small.zip \ml-latest-small/ratings.csv \ml-latest-small/movies.csv -d /tmp/Datasets 
	rm /tmp/ml-latest-small.zip
fi 

### Put csv files into hdfs


In [5]:
%sh

hdfs dfs -mkdir -p /tmp/Datasets/ml-latest-small

hdfs dfs -mkdir -p /tmp/Output

hdfs dfs -put -f /tmp/Datasets/ml-latest-small/movies.csv /tmp/Datasets/ml-latest-small/

hdfs dfs -put -f /tmp/Datasets/ml-latest-small/ratings.csv /tmp/Datasets/ml-latest-small/

## Arguments and variables
In next paragraph can be setted parameters(arguments) for filtering. It imports necessary libraries.

In [7]:
%pyspark

from pyspark.sql.functions import col, split, explode, regexp_extract, avg, round

MOVIES_LOCAL_PATH = "///tmp/Datasets/ml-latest-small/movies.csv" 
RATINGS_LOCAL_PATH = "///tmp/Datasets/ml-latest-small/ratings.csv" 
MOVIES_HDFS_PATH = "hdfs:///tmp/Datasets/ml-latest-small/movies.csv" 
RATINGS_HDFS_PATH = "hdfs:///tmp/Datasets/ml-latest-small/ratings.csv" 
OUTPUT_LOCAL_PATH = "///tmp/Output/"
OUTPUT_HDFS_PATH = "hdfs:///tmp/Output/"
READING_FORMAT = 'csv'
SAVING_FORMAT = 'csv'

N = 4
GENRES = "Thriller|Crime|War|Fantasy"
REGEXP = "God"
YEAR_TO = None
YEAR_FROM = 1970


## Local mode

Create DataFrame of movies and ratings from datasets

## HDFS mode
Create DataFrame of movies and ratings from hdfs file

In [10]:
%pyspark

staging_movies_df = spark.read.format(READING_FORMAT)\
                              .options(header='true',
                                       delimiter=',',
                                       path=MOVIES_LOCAL_PATH)\
                              .load()
                               
staging_ratings_df = spark.read.format(READING_FORMAT)\
                               .options(header='true',
                                        delimiter=',',
                                        path=RATINGS_LOCAL_PATH)\
                               .load()\
                               .select(col('movieId').alias('Id'),
                                       col('rating').cast('float'))


In [11]:
%pyspark

staging_movies_df = spark.read.format(READING_FORMAT)\
                              .options(header='true',
                                       delimiter=',',
                                       path=MOVIES_HDFS_PATH)\
                              .load()
                               
staging_ratings_df = spark.read.format(READING_FORMAT)\
                               .options(header='true',
                                        delimiter=',',
                                        path=RATINGS_HDFS_PATH)\
                               .load()\
                               .select(col('movieId').alias('Id'),
                                       col('rating').cast('float'))


 
## Get filtered movies DataFrame

This paragraph parse lines from DataFrame which contain data from csv and insert it into parsed_movies_df DataFrame, where will be movieId, genre, title and year columns. Also its filters this data, drop bad data.

In [13]:
%pyspark

parsed_movies_df = staging_movies_df.select('movieId',
                                            explode(split('genres', "[|]")).alias('genre'),
                                            regexp_extract('title',"(.+)[ ]+[(](\\d{4})[)]", 1).alias('title'),
                                            regexp_extract('title',"(.+)[ ]+[(](\\d{4})[)]", 2).alias('year')
                                            )
if GENRES:
    arg_genres = GENRES.split('|')
else:
    arg_genres = GENRES

#drop bad data
filtered_movies_df = parsed_movies_df.na.drop()

if YEAR_FROM:
    filtered_movies_df = filtered_movies_df.filter(col('year').cast("int") >= YEAR_FROM)
    
if YEAR_TO:
    filtered_movies_df = filtered_movies_df.filter(col('year').cast("int") <= YEAR_TO)

if REGEXP:
    filtered_movies_df = filtered_movies_df.filter(col('title').rlike(REGEXP))
    
if GENRES:
    filtered_movies_df = filtered_movies_df.filter(col('genre').isin(arg_genres))
    
#filtered_movies_df.show(20)

## Get result DataFrame of movies with retings

This paragraph create DataFrame which joines two DataFrames (DataFrame of average rating + filtered movies), then it sorts data in necessary order and receive it like result DataFrame with N-counted distinct genres.

In [15]:
%pyspark

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

necessary_ratings_df = filtered_movies_df.join(staging_ratings_df, filtered_movies_df.movieId == staging_ratings_df.Id, 'leftouter')\
                                         .groupBy('Id')\
                                         .agg(round(avg('rating'), 3).alias('rating'))
                                         
movies_with_ratings_df = filtered_movies_df.join(necessary_ratings_df, necessary_ratings_df.Id == filtered_movies_df.movieId)\
                                           .select('genre',
                                                   'title',
                                                   'year',
                                                   'rating')
if N: 
    window_spec = Window.partitionBy('genre').orderBy( col('rating').desc(), 
                                                       col('year').desc(),
                                                      'title')
    movies_with_ratings_df = movies_with_ratings_df.withColumn('row_number', row_number().over(window_spec))\
                                                   .where(col('row_number') <= N)\
                                                   .drop('row_number')

final_movies_df = movies_with_ratings_df.orderBy('genre',
                                                 col('rating').desc(),
                                                 col('year').desc(),
                                                 'title')

## Local mode output

## HDFS mode output

In [18]:
%pyspark

final_movies_df.write.mode('overwrite')\
                     .format(SAVING_FORMAT)\
                     .options(delimiter=',')\
                     .save(OUTPUT_LOCAL_PATH)


In [19]:
%pyspark

final_movies_df.write.mode('overwrite')\
                     .format(SAVING_FORMAT)\
                     .options(delimiter=',')\
                     .save(OUTPUT_HDFS_PATH)


# Showing results

## Local mode cat result

## HDFS mode cat result

In [23]:
%sh
echo 'genre, title, year, rating'
cat /tmp/Output/*

In [24]:
%sh
echo 'genre, title, year, rating'
hdfs dfs -cat /tmp/Output/*