# Movie Ranker

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

spark = (SparkSession.builder
    .master('local')
    .appName('myAppName')
    .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")

## Read the parquet file generated with pandas and omdb api

In [None]:
df = spark.read.parquet('data/movies.parquet.gzip')
print(f'{df.count()} movies loaded')

## Filter the movies with a rating less than 6.0

In [None]:
bad_movies = df.filter(col('imdbrating') < 6.0)
bad_movies_count = bad_movies.count()
print(f'We have {bad_movies_count} movies with a ranking less than 6.0')

In [None]:
bad_movies.sort(col('imdbrating').asc()).show(truncate=False)

## Filter the movies without rating

In [None]:
unrated_movies = df.where(col('imdbrating') == 'N/A')
print(f'We have {unrated_movies.count()} movies without rating')
unrated_movies.show(truncate=False)

## Search for specific movie titles to validate the metadata

In [None]:
like = df.filter(df.title.ilike('%before%'))
like.show(truncate=False)

## Check the bad movies starting with a given letter

In [None]:
b_movies = bad_movies.filter(col('title').startswith('B'))
b_movies.show(truncate=False)

## Check how much data do we save by deleting the movies with rating < 6.0

In [None]:
free_up_space = bad_movies.select(sum('size_in_bytes')).collect()
print(free_up_space[0][0]/1024/1024)