In [175]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, lit, trim, regexp_replace, udf, rank, dense_rank, row_number
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, BooleanType, StringType
from datetime import datetime
import numpy as np
import pandas as pd
import plotly.express as px

---
##### E1 - Get the dataset and get an overview of the given information

Info: To do this you need a kaggle account. Don´t worry kaggle.com is completely free and might also be an interesting resource if you haven´t heard of them. 

1. Go to the website: https://www.kaggle.com/beyjin/movies-1990-to-2017. Get the datasets
2. Get ready for the analysis in your prefered environment  
3. Get first overall insights 
4. Use your prefered environment and have fun in your analysis!

Goal:
- Please show the folder structure of your analysis 
- Show how you derive your first insights of the data sets
---

In [2]:
# Start a Spark session
spark = SparkSession.builder.appName('Z-Lab Exercise').getOrCreate()

In [28]:
actorsDf = spark.read.format("csv").option("inferSchema", True).option("header", True)\
    .load("../data/movies/Movie_Actors.csv")

In [98]:
moviesDf = spark.read.format("csv").option("inferSchema", True).option("header", True)\
    .load("../data/movies/Movie_Movies.csv")\
    .withColumn("Year", trim("Year").cast(IntegerType()))

In [181]:
# Load and clean genres dataframe
genresDf = spark.read.format("csv").option("inferSchema", True).option("header", True)\
    .load("../data/movies/Movie_Genres.csv")\
    .withColumn("Genre", regexp_replace(trim("Genre"), '"', ''))\
    .drop("_c0")

In [182]:
# A UDF to parse and normalize movie rating to an integer
def parse_rating_to_int(source, score_str):
    if score_str == "":
        return 0
    if source == "Internet Movie Database" or source == "Metacritic":
        return int(score_str.split("/")[0].replace(".", ""))
    elif source == "Rotten Tomatoes":
        return int(score_str.replace("%", ""))
    else:
        return 0

# Wrap udf
score_parsing_udf = udf(parse_rating_to_int, IntegerType())

# Load and clean genres dataframe
ratingsDf = spark.read.format("csv").option("inferSchema", True).option("header", True)\
    .load("../data/movies/Movie_AdditionalRating.csv")\
    .withColumn("RatingSource", regexp_replace(trim("RatingSource"), '"', ''))\
    .withColumn("Rating", score_parsing_udf(col("RatingSource"), trim(col("Rating"))))\
    .drop("_c0")

In [183]:
ratingsDf.dropDuplicates().show(10, truncate=False)

+------+-----------------------+---------+
|Rating|RatingSource           |imdbID   |
+------+-----------------------+---------+
|58    |Internet Movie Database|tt0050599|
|51    |Internet Movie Database|tt0025651|
|77    |Internet Movie Database|tt0048181|
|71    |Internet Movie Database|tt0042509|
|58    |Internet Movie Database|tt1794725|
|69    |Internet Movie Database|tt0040441|
|91    |Rotten Tomatoes        |tt0016220|
|53    |Internet Movie Database|tt0235731|
|59    |Internet Movie Database|tt5150284|
|57    |Internet Movie Database|tt0489010|
+------+-----------------------+---------+
only showing top 10 rows



---
**E2 - Insights about the director**
- As a user I’d like to know how many movies the Top10 directors in the data set produced. 
- Prepare a list of the top 10 Directors with the following information

__Goal:__
- What steps do you take to achieve the goal?
- What kind of methods you´re using? (Hint: Did you need to clean the data?)

---

In [184]:
# Top 10 Directors that produced the highest number of movies
topTenDirectorsByNumOfMovies = moviesDf\
    .groupBy("Director")\
    .agg(count(col("imdbID")).alias("NumberOfMoviesProduced"))\
    .sort(col("NumberOfMoviesProduced").desc())\
    .filter(col("Director").isNotNull())\
    .limit(10)

In [185]:
# Visualize the data frame
chart = px.bar(topTenDirectorsByNumOfMovies.toPandas(),
             y='Director',
             x=df['NumberOfMoviesProduced'],
             color='Director',
            orientation='h')
chart.show()

---
**E3 - Free Data Mining - Think of something which might be interesting to know for you**

Hint: 
	Since you´ve already started with a small analysis of the director. You might keep
going and dive deeper or take a completely different subject as analysis of the dataset

You can present your results in any kind of format (plain text/console, small presentation, BI reporting tool)

Task: Prepare your analysis and present your findings in your environment of choice!

Goal:
- Show us how you interact if you try to find insights
- How you present the results


In [193]:
# Last decade
thisMilleniumMoviesDf = moviesDf\
    .select("Country", "Director", "Year", "imdbID", "Title" ).dropDuplicates()

moviesDfWithGenres = thisMilleniumMoviesDf\
    .join(genresDf.dropDuplicates(), ["imdbID"], "left")\
    .join(ratingsDf.dropDuplicates(), ["imdbID"], "left")

last10YearsMoviesDfWithGenres.show()

+---------+-----------+---------------+----+--------------------+-----------+------+--------------------+
|   imdbID|    Country|       Director|Year|               Title|      Genre|Rating|        RatingSource|
+---------+-----------+---------------+----+--------------------+-----------+------+--------------------+
|tt3257140|  Australia| Andrew Leavold|2014|Some Jerks: Dark ...|      Music|  null|                null|
|tt3257140|  Australia| Andrew Leavold|2014|Some Jerks: Dark ...|      Short|  null|                null|
|tt4345910|      India|  Anirudh Menon|2014|  Dreams and Destiny|      Drama|  null|                null|
|tt4345910|      India|  Anirudh Menon|2014|  Dreams and Destiny|     Comedy|  null|                null|
|tt4345910|      India|  Anirudh Menon|2014|  Dreams and Destiny|      Short|  null|                null|
|tt3834030|        USA|           null|2014|Camelot's End: Th...|      Short|  null|                null|
|tt3834030|        USA|           null|2014|Ca

In [196]:
windowByGenre =  Window.partitionBy(col("Genre"))
windowByYear =  Window.partitionBy(col("Year"))

# How 10 most-produced categories of all time performed over last 20 years 
res = moviesDfWithGenres\
    .withColumn("totalOverGenre", count(col("imdbID")).over(windowByGenre))\
    .withColumn("rank", dense_rank().over(Window.orderBy(col("totalOverGenre").desc())))\
    .filter(col("rank") <= 10)\
    .filter(col("Year") >= 2000)

res = res\
    .groupBy(col("Year"), col("Genre"))\
    .agg(count("imdbID").alias("nrMoviesByYearAndGender"))\
    .sort(col("Year"), col("nrMoviesByYearAndGender").desc())

fig = px.line(res.toPandas().dropna(), x="Year", y="nrMoviesByYearAndGender", color='Genre')
fig.show()

In [144]:
result = Last10YearsMoviesDfWithGenres\
    .groupBy(col("genre"))\
    .agg(count("imdbID").alias("nrMoviesByConutryAndGender"))\
    .sort(col("nrMoviesByConutryAndGender").desc())

In [145]:
long_df = result.toPandas()

fig = px.bar(long_df.dropna(), x="genre", y="nrMoviesByConutryAndGender", color="genre", title="Long-Form Input")
fig.show()
long_df.head()

Unnamed: 0,genre,nrMoviesByConutryAndGender
0,Short,34069
1,Drama,19345
2,Comedy,12944
3,Documentary,12837
4,Horror,4284
