# Spark Preparation
We check if we are in Google Colab.  If this is the case, install all necessary packages.

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 3.2.1 with hadoop 3.2, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab.
Learn more from [A Must-Read Guide on How to Work with PySpark on Google Colab for Data Scientists!](https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/)

In [1]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [2]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
    !tar xf spark-3.3.0-bin-hadoop3.tgz
    !mv spark-3.3.0-bin-hadoop3 spark
    !pip install -q findspark

In [3]:
if IN_COLAB:
  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark"

# Start a Local Cluster

In [4]:
import findspark
findspark.init()

In [5]:
spark_url = 'local'

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark ML')\
        .getOrCreate()

22/11/15 03:05:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/15 03:05:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Spark Assignment

Based on the movie review dataset in 'netflix-rotten-tomatoes-metacritic-imdb.csv', answer the below questions.

**Note:** do not clean or remove missing data

In [7]:
path = 'netflix-rotten-tomatoes-metacritic-imdb.csv'

In [8]:
df = spark.read.option("delimiter", ",").option("header", True).csv(path)

In [9]:
df.show(5)

+-------------------+--------------------+--------------------+----------------+---------------+----------------+--------------------+------------+---------------+--------------------+--------------------+-----------+----------+---------------------+----------------+---------------+--------------------+----------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+
|              Title|               Genre|                Tags|       Languages|Series or Movie|Hidden Gem Score|Country Availability|     Runtime|       Director|              Writer|              Actors|View Rating|IMDb Score|Rotten Tomatoes Score|Metacritic Score|Awards Received|Awards Nominated For| Boxoffice|Release Date|Netflix Release Date|    Production House|        Netflix Link|           IMDb Link|             Summary|IMDb Votes|               Image|              

22/11/15 03:05:57 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [10]:
cols = [c.replace(' ', '_') for c in df.columns]
df = df.toDF(*cols)
df.columns

['Title',
 'Genre',
 'Tags',
 'Languages',
 'Series_or_Movie',
 'Hidden_Gem_Score',
 'Country_Availability',
 'Runtime',
 'Director',
 'Writer',
 'Actors',
 'View_Rating',
 'IMDb_Score',
 'Rotten_Tomatoes_Score',
 'Metacritic_Score',
 'Awards_Received',
 'Awards_Nominated_For',
 'Boxoffice',
 'Release_Date',
 'Netflix_Release_Date',
 'Production_House',
 'Netflix_Link',
 'IMDb_Link',
 'Summary',
 'IMDb_Votes',
 'Image',
 'Poster',
 'TMDb_Trailer',
 'Trailer_Site']

In [11]:
df.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Languages: string (nullable = true)
 |-- Series_or_Movie: string (nullable = true)
 |-- Hidden_Gem_Score: string (nullable = true)
 |-- Country_Availability: string (nullable = true)
 |-- Runtime: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Writer: string (nullable = true)
 |-- Actors: string (nullable = true)
 |-- View_Rating: string (nullable = true)
 |-- IMDb_Score: string (nullable = true)
 |-- Rotten_Tomatoes_Score: string (nullable = true)
 |-- Metacritic_Score: string (nullable = true)
 |-- Awards_Received: string (nullable = true)
 |-- Awards_Nominated_For: string (nullable = true)
 |-- Boxoffice: string (nullable = true)
 |-- Release_Date: string (nullable = true)
 |-- Netflix_Release_Date: string (nullable = true)
 |-- Production_House: string (nullable = true)
 |-- Netflix_Link: string (nullable = true)
 |-- IMDb_Link: string (null

## What is the maximum and average of the overall hidden gem score?

In [12]:
df = df.withColumn('Hidden_Gem_Score', df.Hidden_Gem_Score.cast('float'))

In [13]:
from pyspark.sql.functions import avg, min, max, countDistinct

In [14]:
df.select(max('Hidden_Gem_Score'), avg('Hidden_Gem_Score')).show()

+---------------------+---------------------+
|max(Hidden_Gem_Score)|avg(Hidden_Gem_Score)|
+---------------------+---------------------+
|                  9.8|    5.937551392466598|
+---------------------+---------------------+



## How many movies that are available in Korea?

In [15]:
df.groupby(df['Languages'].contains('Korea')).count().show()

+--------------------------+-----+
|contains(Languages, Korea)|count|
+--------------------------+-----+
|                      null| 1935|
|                      true|  735|
|                     false|12810|
+--------------------------+-----+



## Which director has the highest average hidden gem score?

In [16]:
df.filter(df.Hidden_Gem_Score == '9.8').select('Director', 'Hidden_Gem_Score').show()

+-----------+----------------+
|   Director|Hidden_Gem_Score|
+-----------+----------------+
|Dorin Marcu|             9.8|
+-----------+----------------+



## How many genres are there in the dataset?

In [40]:
from pyspark.sql.functions import split, explode, col
df.withColumn("Genre", explode(split(col("Genre"), ", "))).groupBy("Genre").count().show(28)

+-----------+-----+
|      Genre|count|
+-----------+-----+
|      Crime| 1932|
|    Romance| 2445|
|   Thriller| 2739|
|  Adventure| 1809|
|      Drama| 6359|
|        War|  330|
|Documentary| 1030|
| Reality-TV|  191|
|     Family| 1433|
|    Fantasy| 1594|
|  Game-Show|   52|
|      Adult|   15|
|    History|  527|
|    Mystery| 1190|
|    Musical|  228|
|  Animation| 1665|
|      Music|  426|
|  Film-Noir|    2|
|     Horror| 1070|
|      Short|  422|
|    Western|  109|
|  Biography|  636|
|     Comedy| 5077|
|     Action| 2810|
|      Sport|  367|
|  Talk-Show|   26|
|     Sci-Fi| 1204|
|       News|   20|
+-----------+-----+

