# Exploration using Pandas

In [None]:
import pandas as pd
import os
import sys
sys.path.insert(1, os.path.abspath('../'))
import util

In [None]:
base_path = os.path.join(os.path.abspath(''), os.pardir, os.pardir)
base_path

In [None]:
save_filepath = 'file://' + os.path.abspath(os.path.join(base_path, 'data', 'delta'))
print(save_filepath)

In [None]:
temp_filepath = 'file://' + os.path.abspath(os.path.join(base_path, 'data', 'tmp'))
temp_filepath

In [None]:
base_url = 'https://datasets.imdbws.com'

files_list = [
        "name.basics.tsv.gz",
        "title.akas.tsv.gz",
        "title.basics.tsv.gz",
        "title.crew.tsv.gz",
        "title.episode.tsv.gz",
        "title.principals.tsv.gz",
        "title.ratings.tsv.gz"]
        
temp_filepath = os.path.abspath(os.path.join(os.path.abspath(''), "../..", 'data', 'tmp'))

In [None]:
data = dict()

for file in files_list:
    data[file] = pd.read_csv(temp_filepath+'/'+file, sep = '\t', nrows=100, compression='gzip',error_bad_lines=False)

In [None]:
data["title.akas.tsv.gz"].head(10)

In [None]:
data["title.basics.tsv.gz"].head(10)

In [None]:
data["title.crew.tsv.gz"].head(10)

In [None]:
data["title.episode.tsv.gz"].head(10)

In [None]:
data["title.principals.tsv.gz"].head(20)

In [None]:
data["title.ratings.tsv.gz"].head(10)

# Exploration using PySpark

In [1]:
%set_env JAVA_HOME=/Users/akshayiyer/Library/Java/JavaVirtualMachines/jdk8u222-b10/Contents/Home

env: JAVA_HOME=/Users/akshayiyer/Library/Java/JavaVirtualMachines/jdk8u222-b10/Contents/Home


In [2]:
import configparser
import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
import sys

sys.path.insert(1, os.path.abspath('../'))
import util

In [3]:
def create_spark_session(master,endpoint=None):
    spark = SparkSession \
            .builder \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,io.delta:delta-core_2.11:0.3.0") \
            .config("fs.s3a.endpoint",endpoint)\
            .config("spark.sql.autoBroadcastJoinThreshold",-1) \
            .appName("udacity-dend-capstone-etl-proj")\
            .master(master)\
            .getOrCreate()
    
    return spark

In [4]:
#spark://127.0.0.1:7077
spark = create_spark_session("spark://127.0.0.1:7077","s3.us-west-2.amazonaws.com")
spark

## Download files to local directory

In [None]:
base_url = 'https://datasets.imdbws.com'

files_list = [
        "name.basics.tsv.gz",
        "title.akas.tsv.gz",
        "title.basics.tsv.gz",
        "title.crew.tsv.gz",
        "title.episode.tsv.gz",
        "title.principals.tsv.gz",
        "title.ratings.tsv.gz"]
        
download_directory = os.path.abspath(os.path.join(os.path.abspath(''), '../..', 'data', 'tmp'))

In [None]:
util.download_files_to_local(base_url, files_list, download_directory)

## Process name.basics file

In [5]:
'''
names_schema = StructType([
                    StructField('nconst', StringType(), True),
                    StructField('primaryName', StringType(), True),
                    StructField('birthYear', IntegerType(), True),
                    StructField('deathYear', IntegerType(), True),
                    StructField('primaryProfession', StringType(), True),
                    StructField('knownForTitles', StringType(), True),
                    StructField('broken', StringType(), True)
                        ])
'''

temp_filepath = 'file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/tmp'
file = 'name.basics.tsv.gz'

names_df_raw = spark.read.load(
    temp_filepath+'/'+file,
    format="csv", 
    sep="\t", 
    inferSchema="true", 
    header="true",
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
    nullValue = '\\N',
    quote = '' # this will ignore using quotes as a qualifier. This helps reduce malformed records. 
)

### Data Exploration

In [6]:
names_df_raw.count()

9863863

In [7]:
names_df_raw.printSchema()
names_df = names_df_raw.withColumnRenamed("nconst","artistId")
names_df.show(5)

root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: integer (nullable = true)
 |-- deathYear: integer (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)

+---------+---------------+---------+---------+--------------------+--------------------+
| artistId|    primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+---------------+---------+---------+--------------------+--------------------+
|nm0000001|   Fred Astaire|     1899|     1987|soundtrack,actor,...|tt0053137,tt00504...|
|nm0000002|  Lauren Bacall|     1924|     2014|  actress,soundtrack|tt0037382,tt00383...|
|nm0000003|Brigitte Bardot|     1934|     null|actress,soundtrac...|tt0049189,tt00573...|
|nm0000004|   John Belushi|     1949|     1982|actor,soundtrack,...|tt0072562,tt00779...|
|nm0000005| Ingmar Bergman|     1918|     2007|writer,director,a...|tt0083922,tt00694...|
+---------+---

In [None]:
# For very small number of cases, the birthYear and deathYear is less than 1000 (15 and 18 respectively)
# Spot checking a few entries, this mostly seems to be an error in the dataset
# Rather than removing these entries, marking these fields as null seems appropriate

In [8]:
names_df.filter(names_df.birthYear < 1000).count()

17

In [9]:
names_df.filter(names_df.deathYear < 1000).count()

17

In [10]:
def fix_year(col):
    # Get today's date
    now = datetime.datetime.now()
    
    fix_year_func = (
                F.when(F.col(col)<1000,None)
                 .when(F.col(col)>now.year,None)
                 .otherwise(F.col(col))
                )
    return fix_year_func


names_df = names_df.withColumn("birthYear_fixed",fix_year('birthYear'))\
            .drop("birthYear")\
            .withColumnRenamed("birthYear_fixed", "birthYear")

names_df = names_df.withColumn("deathYear_fixed",fix_year('deathYear'))\
             .drop("deathYear")\
             .withColumnRenamed("deathYear_fixed", "deathYear")

names_df.show(10, False)

+---------+---------------+------------------------------+---------------------------------------+---------+---------+
|artistId |primaryName    |primaryProfession             |knownForTitles                         |birthYear|deathYear|
+---------+---------------+------------------------------+---------------------------------------+---------+---------+
|nm0000001|Fred Astaire   |soundtrack,actor,miscellaneous|tt0053137,tt0050419,tt0043044,tt0072308|1899     |1987     |
|nm0000002|Lauren Bacall  |actress,soundtrack            |tt0037382,tt0038355,tt0071877,tt0117057|1924     |2014     |
|nm0000003|Brigitte Bardot|actress,soundtrack,producer   |tt0049189,tt0057345,tt0059956,tt0054452|1934     |null     |
|nm0000004|John Belushi   |actor,soundtrack,writer       |tt0072562,tt0077975,tt0078723,tt0080455|1949     |1982     |
|nm0000005|Ingmar Bergman |writer,director,actor         |tt0083922,tt0069467,tt0050986,tt0050976|1918     |2007     |
|nm0000006|Ingrid Bergman |actress,soundtrack,pr

In [11]:
artists_df = names_df.select("artistId","primaryName","birthYear","deathYear")
artists_prmry_prfsn_df = names_df.select("artistId",\
                                         F.explode(F.split(F.col("primaryProfession"),",")).alias("primaryProfession"))
artists_knwn_fr_ttls_df = names_df.select("artistId",
                                          F.explode(F.split(F.col("knownForTitles"),",")).alias("knownForTitles"))

In [12]:
artists_df.show(5)
artists_prmry_prfsn_df.show(5)
artists_knwn_fr_ttls_df.show(5)

+---------+---------------+---------+---------+
| artistId|    primaryName|birthYear|deathYear|
+---------+---------------+---------+---------+
|nm0000001|   Fred Astaire|     1899|     1987|
|nm0000002|  Lauren Bacall|     1924|     2014|
|nm0000003|Brigitte Bardot|     1934|     null|
|nm0000004|   John Belushi|     1949|     1982|
|nm0000005| Ingmar Bergman|     1918|     2007|
+---------+---------------+---------+---------+
only showing top 5 rows

+---------+-----------------+
| artistId|primaryProfession|
+---------+-----------------+
|nm0000001|       soundtrack|
|nm0000001|            actor|
|nm0000001|    miscellaneous|
|nm0000002|          actress|
|nm0000002|       soundtrack|
+---------+-----------------+
only showing top 5 rows

+---------+--------------+
| artistId|knownForTitles|
+---------+--------------+
|nm0000001|     tt0053137|
|nm0000001|     tt0050419|
|nm0000001|     tt0043044|
|nm0000001|     tt0072308|
|nm0000002|     tt0037382|
+---------+--------------+
only 

In [13]:
artists_df.describe().show(5)

+-------+---------+--------------------+------------------+------------------+
|summary| artistId|         primaryName|         birthYear|         deathYear|
+-------+---------+--------------------+------------------+------------------+
|  count|  9863863|             9863863|            495715|            173945|
|   mean|     null|            Infinity|1950.7750844739417|1988.7498807094196|
| stddev|     null|                 NaN| 33.20074219447096| 29.87583815392508|
|    min|nm0000001|!'aru Ikhuisi Pie...|              1048|              1022|
|    max|nm9993719|þórunn Ósk Morinó...|              2019|              2020|
+-------+---------+--------------------+------------------+------------------+



### Write dataframes to Parquet/Delta tables

In [14]:
save_file_path = 'file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/parquet'
save_file_path_delta = 'file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/delta'

#Write to parquet files
artists_df.write.mode('overwrite').parquet(save_file_path+"artists.parquet")
artists_prmry_prfsn_df.write.mode('overwrite').parquet(save_file_path+"artists_prmry_profession.parquet")
artists_knwn_fr_ttls_df.write.mode('overwrite').parquet(save_file_path+"artists_knwnfor_titles.parquet")

# Write artist dataframes to delta tables
artists_df.write.format('delta').partitionBy("birthYear").mode('overwrite').save(
    os.path.join(save_file_path_delta, "artists"))

artists_prmry_prfsn_df.write.format("delta").mode('overwrite').save(
    os.path.join(save_file_path_delta, "artists_prmry_profession"))

artists_knwn_fr_ttls_df.write.format("delta").mode('overwrite').save(
    os.path.join(save_file_path_delta, "artists_knwnfor_titles"))

# Process title.basics file

In [None]:
temp_filepath = 'file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/tmp'
file = 'title.basics.tsv.gz'

title_basics_df_raw = spark.read.load(
    temp_filepath+'/'+file,
    format="csv", 
    sep="\t", 
    inferSchema="true", 
    header="true",
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
    nullValue = '\\N',
    quote = '' # this will ignore using quotes as a qualifier. This helps reduce malformed records. 
)

### Data Exploration

In [None]:
title_basics_df_raw.count()

In [None]:
title_basics_df_raw.printSchema()
title_basics_df = title_basics_df_raw.withColumnRenamed("tconst","titleId")
title_basics_df.show(5)

In [None]:
title_basics_df.describe("startYear","endYear","runtimeMinutes").show()

In [None]:
titles_df = title_basics_df.select("titleId","titleType","primaryTitle","originalTitle",\
                                   "isAdult","startYear","endYear","runtimeMinutes")

titles_genres_df = title_basics_df.select("titleId",F.explode(F.split(F.col("genres"),",")).alias("genres"))

In [None]:
titles_df.show(5)
titles_genres_df.show(5)

### Write dataframes to Parquet/Delta tables

In [None]:
save_file_path = 'file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/parquet'
save_file_path_delta = 'file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/delta/'

# Write to Parquet
titles_df.write.mode('overwrite').partitionBy("startYear").parquet(save_file_path+"titles.parquet")
titles_genres_df.write.mode('overwrite').parquet(save_file_path+"titles_genres.parquet")

# Write to Delta
titles_df.write.format("delta").mode('overwrite').partitionBy(
    "startYear").save(os.path.join(save_file_path_delta, "titles"))

titles_genres_df.write.format("delta").mode('overwrite').save(
    os.path.join(save_file_path_delta, "titles_genres"))

# Process title.ratings file

In [None]:
temp_filepath = 'file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/tmp'
file = 'title.ratings.tsv.gz'

title_ratings_df_raw = spark.read.load(
    temp_filepath+'/'+file,
    format="csv", 
    sep="\t", 
    inferSchema="true", 
    header="true",
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
    nullValue = '\\N',
    quote = '' # this will ignore using quotes as a qualifier. This helps reduce malformed records. 
)

In [None]:
title_ratings_df_raw.count()

In [None]:
title_ratings_df_raw.printSchema()
title_ratings_df = title_ratings_df_raw.withColumnRenamed("tconst","titleId")
title_ratings_df.show(5)

In [None]:
save_file_path = 'file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/'

title_ratings_df.write.mode('overwrite').parquet(save_file_path+"title_ratings.parquet")

# Joining titles, title_ratings and title_genres datasets

In [None]:
title_ratings_filepath = "file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/title_ratings.parquet"
titles_filepath = "file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/titles.parquet"
title_genres_filepath = "file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/titles_genres.parquet"

title_ratings_df = spark.read.parquet(title_ratings_filepath)
titles_df = spark.read.parquet(titles_filepath)
title_genres_df = spark.read.parquet(title_genres_filepath)

In [None]:
title_genres_ratings_df = titles_df.join(title_ratings_df, "titleId").join(title_genres_df,"titleId")
title_genres_ratings_df = title_genres_ratings_df\
                            .groupBy("startYear","genres")\
                            .agg({"averageRating":"mean","numVotes":"sum","titleId":"count"})\
                            .withColumnRenamed("avg(averageRating)","averageRating")\
                            .withColumnRenamed("sum(numVotes)","numVotes")\
                            .withColumnRenamed("count(titleId)","numTitles")

In [None]:
title_ratings_df.count()

In [None]:
titles_df.join(title_ratings_df, "titleId").count()

In [None]:
title_genres_df.count()

In [None]:
titles_df.join(title_genres_df,"titleId").count()

In [None]:
title_genres_ratings_df.count()

In [None]:
title_ratings_sql_df = title_ratings_df.createOrReplaceTempView("title_ratings_df")
titles_sql_df = titles_df.createOrReplaceTempView("titles")
title_genres_sql_df = title_genres_df.createOrReplaceTempView("title_genres")

In [None]:
spark.sql('''
select count(distinct titleId)
from title_genres
''').show()

spark.sql('''
select count(distinct titleId)
from titles
''').show()

spark.sql('''
select count(distinct titleId)
from title_ratings_df
''').show()

In [None]:
spark.sql('''
select tr.*
from title_ratings_df tr
left join titles t on tr.titleId = t.titleId
where t.titleId is null
''').show()

In [None]:
title_genres_ratings_df = spark.sql('''
select tr.titleId, tr.averageRating, tr.numVotes, tg.genres, t.startYear
from title_ratings_df tr
inner join title_genres tg on tr.titleId = tg.titleId
inner join titles t on tg.titleId = t.titleId
''')

In [None]:
title_genres_ratings_df.count()

In [None]:
title_genres_ratings_df = title_genres_ratings_df\
                            .groupBy("startYear","genres")\
                            .agg({"averageRating":"mean","numVotes":"sum","titleId":"count"})\
                            .withColumnRenamed("avg(averageRating)","averageRating")\
                            .withColumnRenamed("sum(numVotes)","numVotes")\
                            .withColumnRenamed("count(titleId)","numTitles")

In [None]:
title_genres_ratings_df.count()

In [None]:
save_file_path = 'file:///Users/akshayiyer/Dev/GitHub/udacity-dend-capstone-etl/data/'

title_genres_ratings\
    .write.mode('overwrite')\
    .partitionBy("startYear")\
    .parquet(save_file_path+"title_genres_ratings.parquet")

# Join title and title_ratings to get top 3 movies by year

In [None]:
base_path = os.path.join(os.path.abspath(''), os.pardir, os.pardir)

save_filepath = os.path.abspath(os.path.join(base_path, 'data', 'delta'))

temp_filepath = os.path.abspath(os.path.join(base_path, 'data', 'tmp'))

In [None]:
title_df = spark.read.format("delta").load(os.path.join(save_filepath,'titles'))
title_ratings_df = spark.read.format("delta").load(os.path.join(save_filepath,'title_ratings'))

In [None]:
title_df.createOrReplaceTempView("titles")
title_ratings_df.createOrReplaceTempView("title_ratings")

In [None]:
title_df.printSchema()

In [None]:
title_ratings_df.printSchema()

In [None]:
spark.sql('''
select distinct titleType
from titles
''').show()

In [None]:
top_5_movies_by_year_df = spark.sql('''
select 
    titleId, 
    titleType,
    startYear,
    primaryTitle,
    averageRating,
    numVotes,
    rank
from (
    select 
        t.titleId, 
        t.titleType,
        t.startYear,
        t.primaryTitle,
        tr.averageRating,
        tr.numVotes,
        rank() over (partition by t.titleType, t.startYear order by averageRating desc) as rank
    from titles t
    inner join title_ratings tr 
        on t.titleId = tr.titleId
    where t.titleType in ('movie', 'tvMovie')) tmp
where rank < 6
''')

In [None]:
title_df.select(F.countDistinct("titleId")).show()

In [None]:
count_source = title_df.select("titleId").distinct().count()
print(count_source)

In [None]:
count_dest = title_ratings_df.select("titleId").distinct().count()
print(count_dest)

In [None]:
title_ratings_df.join(title_df, "titleId").select("titleId").distinct().count()