#**Project: IMDb Movie Explorer**
#**Author: Ashish Kayastha**

# Define global variables

In [None]:
DB_NAME = "imdb_database"
PROJECT_FOLDER = '/opt/data'
MOVIELENS_URL = "http://files.grouplens.org/datasets/movielens/ml-25m.zip"

# Install OpenJDK 8 and Spark

In [None]:
!apt-get update -qq

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget https://apache.osuosl.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz

--2021-05-07 08:37:33--  https://apache.osuosl.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
Resolving apache.osuosl.org (apache.osuosl.org)... 140.211.166.134, 64.50.236.52, 64.50.233.100, ...
Connecting to apache.osuosl.org (apache.osuosl.org)|140.211.166.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 233333392 (223M) [application/x-gzip]
Saving to: ‘spark-2.4.7-bin-hadoop2.7.tgz’


2021-05-07 08:40:50 (1.13 MB/s) - ‘spark-2.4.7-bin-hadoop2.7.tgz’ saved [233333392/233333392]



# Install findspark

In [None]:
!pip install findspark

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/fc/2d/2e39f9a023479ea798eed4351cd66f163ce61e00c717e03c37109f00c0f2/findspark-1.4.2-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.4.2


# Define environment variables

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-2.4.7-bin-hadoop2.7'

# Get movie datasets

In [None]:
!mkdir -p $PROJECT_FOLDER

# IMDb
!wget https://datasets.imdbws.com/title.basics.tsv.gz -O /opt/data/title.basics.tsv.gz
!gzip -d $PROJECT_FOLDER/title.basics.tsv.gz

!wget https://datasets.imdbws.com/title.ratings.tsv.gz -O /opt/data/title.ratings.tsv.gz
!gzip -d $PROJECT_FOLDER/title.ratings.tsv.gz

# MovieLens
!wget http://files.grouplens.org/datasets/movielens/ml-25m.zip -O /opt/data/ml-25m.zip
!unzip $PROJECT_FOLDER/ml-25m.zip -d /opt/data/

# TMDb
!wget https://github.com/askayastha/ITC686-Project/raw/master/data/tmdb_5000_movies.zip -O /opt/data/tmdb_5000_movies.zip
!unzip $PROJECT_FOLDER/tmdb_5000_movies.zip -d /opt/data/

--2021-05-07 08:45:27--  https://datasets.imdbws.com/title.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 52.85.132.80, 52.85.132.70, 52.85.132.19, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|52.85.132.80|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 140066257 (134M) [binary/octet-stream]
Saving to: ‘/opt/data/title.basics.tsv.gz’


2021-05-07 08:45:29 (84.1 MB/s) - ‘/opt/data/title.basics.tsv.gz’ saved [140066257/140066257]

--2021-05-07 08:45:35--  https://datasets.imdbws.com/title.ratings.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 52.85.132.19, 52.85.132.70, 52.85.132.66, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|52.85.132.19|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5686484 (5.4M) [binary/octet-stream]
Saving to: ‘/opt/data/title.ratings.tsv.gz’


2021-05-07 08:45:35 (60.6 MB/s) - ‘/opt/data/title.ratings.tsv.gz’ saved [5686484/5686484]

--2021-05-0

In [None]:
!ls $PROJECT_FOLDER

ml-25m	    title.basics.tsv   tmdb_5000_movies.csv
ml-25m.zip  title.ratings.tsv  tmdb_5000_movies.zip


In [None]:
IMDB_BASICS_FILE = 'title.basics.tsv'
IMDB_RATINGS_FILE = 'title.ratings.tsv'
MOVIELENS_LINKS_FILE = 'ml-25m/links.csv'
TMDB_5000_MOVIES_FILE = 'tmdb_5000_movies.csv'

# Pandas code (Test)

In [None]:
# import pandas as pd

# pd_basics_df = pd.read_csv(f'{PROJECT_FOLDER}/{IMDB_BASICS_FILE}', sep='\t')
# pd_ratings_df = pd.read_csv(f'{PROJECT_FOLDER}/{IMDB_RATINGS_FILE}', sep='\t')

In [None]:
# pd_basics_df.head()

In [None]:
# pd_basics_df[(pd_basics_df['primaryTitle'] == 'Inception') & (pd_basics_df['titleType'] == 'movie')]

In [None]:
# pd_ratings_df.head()

In [None]:
# pd_ratings_df[(pd_ratings_df['tconst'] == 'tt1375666')]

# Spark processing code

In [None]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

pyspark.__version__

'2.4.7'

In [None]:
username = ''
password = ''
spark = SparkSession \
    .builder \
    .appName("Exploring_IMDB_Data") \
    .config("spark.mongodb.output.uri", f"mongodb+srv://{username}:{password}@cluster0.jxstd.mongodb.net/imdb_database?retryWrites=true&w=majority") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.4.3') \
    .getOrCreate()

In [None]:
spark

# Read data into Spark dataframes

In [None]:
sp_basics_df = spark.read.csv(os.path.join(PROJECT_FOLDER, IMDB_BASICS_FILE), header=True, sep='\t', inferSchema=True)
sp_ratings_df = spark.read.csv(os.path.join(PROJECT_FOLDER, IMDB_RATINGS_FILE), header=True, sep='\t', inferSchema=True)
sp_links_df = spark.read.csv(os.path.join(PROJECT_FOLDER, MOVIELENS_LINKS_FILE), header=True, inferSchema=True)

In [None]:
sp_basics_df.printSchema()
sp_ratings_df.printSchema()
sp_links_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- tconst: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)



In [None]:
# Take a peek at the Spark dataframe.
sp_basics_df.show()

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|     \N|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|     \N|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|     \N|             1|        Comedy

In [None]:
sp_ratings_df.show()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1698|
|tt0000002|          6.1|     211|
|tt0000003|          6.5|    1457|
|tt0000004|          6.1|     124|
|tt0000005|          6.1|    2256|
|tt0000006|          5.2|     125|
|tt0000007|          5.4|     687|
|tt0000008|          5.4|    1874|
|tt0000009|          6.0|     155|
|tt0000010|          6.9|    6316|
|tt0000011|          5.2|     281|
|tt0000012|          7.4|   10849|
|tt0000013|          5.7|    1629|
|tt0000014|          7.1|    4832|
|tt0000015|          6.1|     878|
|tt0000016|          5.9|    1254|
|tt0000017|          4.6|     255|
|tt0000018|          5.3|     503|
|tt0000019|          5.3|      19|
|tt0000020|          5.0|     282|
+---------+-------------+--------+
only showing top 20 rows



In [None]:
sp_links_df.show()

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
|      6|113277|   949|
|      7|114319| 11860|
|      8|112302| 45325|
|      9|114576|  9091|
|     10|113189|   710|
|     11|112346|  9087|
|     12|112896| 12110|
|     13|112453| 21032|
|     14|113987| 10858|
|     15|112760|  1408|
|     16|112641|   524|
|     17|114388|  4584|
|     18|113101|     5|
|     19|112281|  9273|
|     20|113845| 11517|
+-------+------+------+
only showing top 20 rows



In [None]:
# Special treatment for TMDb dataset
import pandas as pd
from pyspark.sql.types import *

tmdb_schema = StructType([
    StructField("id", StringType(), False), \
    StructField("budget", LongType(), False), \
    StructField("revenue", LongType(), False)
])

pd_tmdb_df = pd.read_csv(os.path.join(PROJECT_FOLDER, TMDB_5000_MOVIES_FILE))
sp_tmdb_df = spark.createDataFrame(pd_tmdb_df[['id', 'budget', 'revenue']], tmdb_schema)
sp_tmdb_df.printSchema()
sp_tmdb_df.show()

root
 |-- id: string (nullable = false)
 |-- budget: long (nullable = false)
 |-- revenue: long (nullable = false)

+------+---------+----------+
|    id|   budget|   revenue|
+------+---------+----------+
| 19995|237000000|2787965087|
|   285|300000000| 961000000|
|206647|245000000| 880674609|
| 49026|250000000|1084939099|
| 49529|260000000| 284139100|
|   559|258000000| 890871626|
| 38757|260000000| 591794936|
| 99861|280000000|1405403694|
|   767|250000000| 933959197|
|209112|250000000| 873260194|
|  1452|270000000| 391081192|
| 10764|200000000| 586090727|
|    58|200000000|1065659812|
| 57201|255000000|  89289910|
| 49521|225000000| 662845518|
|  2454|225000000| 419651413|
| 24428|220000000|1519557910|
|  1865|380000000|1045713802|
| 41154|225000000| 624026776|
|122917|250000000| 956019788|
+------+---------+----------+
only showing top 20 rows



# Filter in non-adult movies that don't have start year '\\N'

In [None]:
sp_basics_df = sp_basics_df.filter((sp_basics_df['isAdult'] == '0') & (sp_basics_df['startYear'] != r'\N'))
sp_basics_ratings_df = sp_basics_df.join(sp_ratings_df, ['tconst'], 'inner')
sp_basics_ratings_df.show()

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+-------------------+-------------+--------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|             genres|averageRating|numVotes|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+-------------------+-------------+--------+
|tt0000658|    short|The Puppet's Nigh...|Le cauchemar de F...|      0|     1908|     \N|             2|    Animation,Short|          6.4|     193|
|tt0001732|    short|The Lighthouse Ke...|The Lighthouse Ke...|      0|     1911|     \N|            \N|        Drama,Short|          7.1|       8|
|tt0002253|    short|          Home Folks|          Home Folks|      0|     1912|     \N|            17|        Drama,Short|          4.0|       7|
|tt0002473|    short|    The Sands of Dee|    The Sands of Dee|      0|     1912|     \N|            17|      Ro

# Check for duplicate rows

In [None]:
print(sp_basics_df.count())
print(sp_ratings_df.count())
print(sp_basics_ratings_df.count())
print()
print(sp_basics_df.distinct().count())
print(sp_ratings_df.distinct().count())
print(sp_basics_ratings_df.distinct().count())

6783048
1148911
1129516

6783048
1148911
1129516


# Dataframe that holds only movies

In [None]:
sp_movie_ratings_df = sp_basics_ratings_df.filter((sp_basics_ratings_df['titleType'] == 'movie'))

# Dataframe that holds finance data

In [None]:
sp_tmdb_finance_df = sp_tmdb_df.join(sp_links_df, sp_tmdb_df['id'] == sp_links_df['tmdbId'], 'inner')
sp_tmdb_finance_df = sp_tmdb_finance_df.drop('id', 'movieId', 'tmdbId')
sp_tmdb_finance_df.show()
sp_tmdb_finance_df.count()

+---------+----------+-------+
|   budget|   revenue| imdbId|
+---------+----------+-------+
|237000000|2787965087| 499549|
|300000000| 961000000| 449088|
|245000000| 880674609|2379713|
|250000000|1084939099|1345836|
|260000000| 284139100| 401729|
|258000000| 890871626| 413300|
|260000000| 591794936| 398286|
|280000000|1405403694|2395427|
|250000000| 933959197| 417741|
|250000000| 873260194|2975590|
|270000000| 391081192| 348150|
|200000000| 586090727| 830515|
|200000000|1065659812| 383574|
|255000000|  89289910|1210819|
|225000000| 662845518| 770828|
|225000000| 419651413| 499448|
|220000000|1519557910| 848228|
|380000000|1045713802|1298650|
|225000000| 624026776|1409024|
|250000000| 956019788|2310332|
+---------+----------+-------+
only showing top 20 rows



4602

# Fix the ids and join the movie and finance dataframe

In [None]:
sp_movie_finance_df = sp_movie_ratings_df.withColumn('imdbId', F.regexp_replace('tconst', r't+0*', '').cast('Integer'))
sp_movie_finance_df = sp_movie_finance_df.join(sp_tmdb_finance_df, ['imdbId'], 'inner')
sp_movie_finance_df.show()
sp_movie_finance_df.count()

+-------+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+-------------+--------+---------+---------+
| imdbId|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|averageRating|numVotes|   budget|  revenue|
+-------+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+-------------+--------+---------+---------+
|  92644|tt0092644|    movie|Beverly Hills Cop II|Beverly Hills Cop II|      0|     1987|     \N|           100| Action,Comedy,Crime|          6.5|  109909| 20000000|299965036|
| 102798|tt0102798|    movie|Robin Hood: Princ...|Robin Hood: Princ...|      0|     1991|     \N|           143|Action,Adventure,...|          6.9|  180472| 48000000|390493908|
| 116259|tt0116259|    movie|    Extreme Measures|    Extreme Measures|      0|     1996|     \N|           118| Cr

4577

# Dataframe that holds only tv shows

In [None]:
sp_tvshow_ratings_df = sp_basics_ratings_df.filter((sp_basics_ratings_df['titleType'] == 'tvSeries'))

In [None]:
sp_tvshow_ratings_df.show()

In [None]:
sp_movie_ratings_df.filter(sp_movie_ratings_df['primaryTitle'] == 'Inception').show()

# Test code

In [None]:
sp_movie_ratings_df.groupBy('startYear').count().orderBy(sp_movie_ratings_df['startYear'].desc()).show()

In [None]:
sp_tvshow_ratings_df.groupBy('startYear').count().orderBy(sp_tvshow_ratings_df['startYear'].desc()).show()

# Add a new column 'totalRatings' (popularity).

In [None]:
sp_movie_ratings_df = sp_movie_ratings_df.withColumn('totalRatings', (F.col('averageRating') * F.col('numVotes')).cast('Integer'))

In [None]:
sp_movie_ratings_df.filter(sp_movie_ratings_df['startYear'] == '2001').sort(F.desc('totalRatings')).show()

In [None]:
sp_tvshow_ratings_df = sp_tvshow_ratings_df.withColumn('totalRatings', (F.col('averageRating') * F.col('numVotes')).cast('Integer'))

In [None]:
sp_tvshow_ratings_df.filter(sp_tvshow_ratings_df['startYear'] == '2015').sort(F.desc('totalRatings')).show()

# Define helper functions

In [None]:
def write_to_mongodb(df, coll_name):
    df.write.format("mongo") \
        .mode("append") \
        .option("database", DB_NAME) \
        .option("collection", coll_name) \
        .save()

def get_all_years(df):
    years_list = df.select('startYear').distinct().rdd.flatMap(lambda x: x).collect()
    years_list.sort(reverse=True)
    # years_list.remove('\\N')

    return years_list

def write_df(df, coll_name, sort_column):
    for year in get_all_years(df):
        year_df = df.filter(df['startYear'] == year).sort(F.desc(sort_column)).limit(20)
        write_to_mongodb(year_df, coll_name)

# Write top movies by year to MongoDB

In [None]:
write_df(sp_movie_ratings_df, "top_movies", "totalRatings")

# Write top tv shows by year to MongoDB

In [None]:
write_df(sp_tvshow_ratings_df, "top_tvshows", "totalRatings")

# Check movie_finance dataframe

In [None]:
sp_movie_finance_df.filter(sp_movie_finance_df['startYear'] == '2015').sort(F.desc('budget')).show()

# Write top budgets by year to MongoDB

In [None]:
write_df(sp_movie_finance_df, "top_budgets", "budget")

# Write top revenues by year to MongoDB

In [None]:
write_df(sp_movie_finance_df, "top_revenues", "revenue")

# Write movies count per year to MongoDB

In [None]:
sp_movies_count_df = sp_movie_ratings_df.groupBy('startYear', 'titleType').count().orderBy(sp_movie_ratings_df['startYear'].desc())
write_to_mongodb(sp_movies_count_df, "titles_count")

# Write tv shows count per year to MongoDB

In [None]:
sp_tvshows_count_df = sp_tvshow_ratings_df.groupBy('startYear', 'titleType').count().orderBy(sp_tvshow_ratings_df['startYear'].desc())
write_to_mongodb(sp_tvshows_count_df, "titles_count")