In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
from pyspark.sql.functions import split, col, explode, avg, sum, countDistinct, max, arrays_overlap, rank, regexp_replace, collect_list, size
from pyspark.sql import Window
import pandas as pd
from datetime import datetime
from IPython.display import Markdown, display

# Create Spark session
spark = SparkSession.builder \
    .appName("imdb") \
    .master("local[3]") \
    .getOrCreate()

***
***
# Part 1: analyze the data using the parquet files

In [2]:
parquet_file_load_start = datetime.now()

base_hdfs = "hdfs://localhost:9000/user/student/imdb_spark_output/"
name_df = spark.read.load(base_hdfs + "name").withColumnRenamed("known_for_tiltes", "known_for_titles")
title_akas_df = spark.read.load(base_hdfs + "title_akas")
title_basics_df = spark.read.load(base_hdfs + "title_basics")
title_crew_df = spark.read.load(base_hdfs + "title_crew")
title_episode_df = spark.read.load(base_hdfs + "title_episode")
title_principals_df = spark.read.load(base_hdfs + "title_principals")
title_ratings_df = spark.read.load(base_hdfs + "title_ratings")

parquet_file_load_time = (datetime.now()-parquet_file_load_start).total_seconds()
print("Time taken to process: {}".format(parquet_file_load_time))

Time taken to process: 10.659643


## Get the name of each actor and the average rating for the movies that they are known for

In [3]:
parquet_actor_ratings_start_time = datetime.now()

name_best_known_for = (
    name_df.select('known_for_titles', 'primary_profession', 'nconst', 'primary_name', 'primary_profession')
        .filter("known_for_titles is not null and arrays_overlap(primary_profession, array('actor', 'actress'))")
        .withColumn("tconst", explode("known_for_titles"))
        .drop("known_for_titles")
        .join(title_ratings_df, 'tconst')
        .groupBy('nconst', 'primary_name', 'primary_profession')
        .agg(
            avg('average_rating').alias('avg_rating_best_known_for'),
            sum('num_votes').alias('num_votes')
        )
)
name_best_known_for.show(10, False)
parquet_actor_ratings_total_time = (datetime.now()-parquet_actor_ratings_start_time).total_seconds()
print("Time taken to process: {}".format(parquet_actor_ratings_total_time))

+---------+-------------------+-------------------------------------+-------------------------+---------+
|nconst   |primary_name       |primary_profession                   |avg_rating_best_known_for|num_votes|
+---------+-------------------+-------------------------------------+-------------------------+---------+
|nm0000050|Groucho Marx       |[soundtrack, actor, writer]          |7.699999999999999        |107440   |
|nm0001492|Kyle MacLachlan    |[actor, soundtrack, director]        |7.075                    |927748   |
|nm0001585|Kelly Packard      |[actress, soundtrack]                |6.025                    |27881    |
|nm0004713|Jodi Applegate Kay |[actress]                            |7.6                      |420899   |
|nm0008344|Hikari Abe         |[actress]                            |6.7                      |12101    |
|nm0009736|Miguel Aceves Mejía|[actor, soundtrack, music_department]|6.25                     |5223     |
|nm0010050|David Ackert       |[actor, produce

## Show the top rated episode from every TV show

In [None]:
parquet_top_episode_start_time = datetime.now()

episode_window = Window.partitionBy('parent_tconst').orderBy(col('num_votes').desc(), col('average_rating').desc())
episode_rating = (
    title_episode_df.join(title_ratings_df, 'tconst')
        # .filter("parent_tconst = 'tt0108778'")
        .join(
            title_basics_df.select('tconst', 'original_title')
                .withColumnRenamed('tconst', 'parent_tconst')
                .withColumnRenamed('original_title', 'series_title')
            , 'parent_tconst'
            , 'left'
            )
        .join(
            title_basics_df.select('tconst', 'original_title')
                .withColumnRenamed('original_title', 'episode_title')
            , 'tconst'
            , 'left'
            )
        .withColumn('ranks', rank().over(episode_window))
        .filter('ranks = 1')
        .drop('ranks')
)
episode_rating.show(10, False)
parquet_top_episode_total_time = (datetime.now()-parquet_top_episode_start_time).total_seconds()
print("Time taken to process: {}".format(parquet_crew_total_time))

## Show Count of crew members for each title

In [6]:
parquet_crew_start_time = datetime.now()
crew_count = (
    title_crew_df.withColumn("cd", size('directors'))
        .withColumn('cw', size('writers'))
        .drop('directors', 'writers')
        .withColumnRenamed('cw', 'writers')
        .withColumnRenamed('cd', 'directors')
        .join(title_principals_df.groupBy('tconst').pivot('category').count(), 'tconst', 'left')
)
crew_count.show(10, False)
parquet_crew_total_time = (datetime.now()-parquet_crew_start_time).total_seconds()
print("Time taken to process: {}".format(parquet_crew_total_time))

+---------+---------+-------+-----+-------+---------------+-------------+----+
|tconst   |directors|writers|actor|actress|archive_footage|archive_sound|self|
+---------+---------+-------+-----+-------+---------------+-------------+----+
|tt0000658|1        |-1     |null |null   |null           |null         |null|
|tt0000839|1        |-1     |null |null   |null           |null         |null|
|tt0001170|1        |1      |3    |1      |null           |null         |null|
|tt0001581|1        |-1     |null |null   |null           |null         |null|
|tt0001664|1        |-1     |2    |1      |null           |null         |null|
|tt0001732|1        |-1     |4    |2      |null           |null         |null|
|tt0001887|1        |-1     |null |null   |null           |null         |null|
|tt0002253|1        |1      |4    |2      |null           |null         |null|
|tt0002347|1        |-1     |null |null   |null           |null         |null|
|tt0002473|1        |1      |3    |4      |null     

***
***

***
***
# Part 2: analyze the data using the TSV files

## Get all the data

In [7]:
tsv_file_load_start = datetime.now()

tsv_name =  (
    spark.read.format("com.databricks.spark.csv").option("header", "true")
        .option("sep", "\t")\
        .option("inferSchema", "true")\
        .option("nullValue", "\\N")\
        .load("hdfs://localhost:9000/user/student/imdb_tsv/name.basics.tsv.gz")\
        .withColumn("primary_profession", split(col("primaryProfession"), ",").cast("array<string>"))\
        .withColumn("known_for_titles", split(col("knownForTitles"), ",").cast("array<string>"))\
        .withColumnRenamed("primaryName", "primary_name")\
        .withColumnRenamed("birthYear", "birth_year")\
        .withColumnRenamed("deathYear", "death_year")\
        .drop("primaryProfession", "knownForTitles")
)

tsv_title_aka = (
    spark.read.format("com.databricks.spark.csv")
        .option("header", "true")
        .option("sep", "\t")
        .option("inferSchema", "true")
        .option("nullValue", "\\N")
        .load("hdfs://localhost:9000/user/student/imdb_tsv/title.akas.tsv.gz")
        .withColumnRenamed("titleId", "title_id")
        .withColumnRenamed("isOriginalTitle", "is_original_title")
)

tsv_title_basics = (
    spark.read.format("com.databricks.spark.csv")
        .option("header", "true")
        .option("sep", "\t")
        .option("inferSchema", "true")
        .option("nullValue", "\\N")
        .option("quote", "")
        .load("hdfs://localhost:9000/user/student/imdb_tsv/title.basics.tsv.gz")
        .withColumnRenamed("titleType", "title_type")
        .withColumnRenamed("primaryTitle", "primary_title")
        .withColumnRenamed("originalTitle", "original_title")
        .withColumnRenamed("isAdult", "is_adult")
        .withColumnRenamed("startYear", "start_year")
        .withColumnRenamed("endYear", "end_year")
        .withColumnRenamed("runtimeMinutes", "runtime_minutes")
        .withColumn("genre_list", split(col("genres"), ",").cast("array<string>"))
        .drop("genres")
        .withColumnRenamed("genre_list", "genres")
)

tsv_title_crew = (
    spark.read.format("com.databricks.spark.csv")
        .option("header", "true")
        .option("sep", "\t")
        .option("inferSchema", "true")
        .option("nullValue", "\\N")
        .option("quote", "")
        .load("hdfs://localhost:9000/user/student/imdb_tsv/title.crew.tsv.gz")
        .withColumn("directors_list", split(col("directors"), ",").cast("array<string>"))
        .withColumn("writers_list", split(col("writers"), ",").cast("array<string>"))
        .drop("directors", "writers")
        .withColumnRenamed("directors_list", "directors")
        .withColumnRenamed("writers_list", "writers")
)

tsv_title_episode = (
    spark.read.format("com.databricks.spark.csv")
        .option("header", "true")
        .option("sep", "\t")
        .option("inferSchema", "true")
        .option("nullValue", "\\N")
        .option("quote", "")
        .load("hdfs://localhost:9000/user/student/imdb_tsv/title.episode.tsv.gz")
        .withColumnRenamed("parentTconst", "parent_tconst")
)

tsv_title_principal = (
    spark.read.format("com.databricks.spark.csv")
        .option("header", "true")
        .option("sep", "\t")
        .option("inferSchema", "true")
        .option("nullValue", "\\N")
        .option("quote", "")
        .load("hdfs://localhost:9000/user/student/imdb_tsv/title.principals.tsv.gz")
        .withColumn("characters_exploded", explode(split( regexp_replace(col("characters"), r'\[|\]', '')   , '","').cast("array<string>")))
        .withColumn("characters_clean", regexp_replace(col("characters_exploded"), r'\"', ''))
        .groupBy("tconst","ordering", "nconst", "category", "job", "characters")
        .agg(collect_list("characters_clean").alias("characters_array"))
        .drop("characters")
        .withColumnRenamed("characters_exploded", "characters")
)

tsv_title_rating = (
    spark.read.format("com.databricks.spark.csv")
        .option("header", "true")
        .option("sep", "\t")
        .option("inferSchema", "true")
        .option("nullValue", "\\N")
        .option("quote", "")
        .load("hdfs://localhost:9000/user/student/imdb_tsv/title.ratings.tsv.gz")
        .withColumnRenamed("averageRating", "average_rating")
        .withColumnRenamed("numVotes", "num_votes")
)

tsv_file_load_time = (datetime.now()-tsv_file_load_start).total_seconds()
print("Time taken to process: {}".format(tsv_file_load_time))

Time taken to process: 319.170284


## Get the name of each actor and the average rating for the movies that they are known for

In [8]:
tsv_actor_ratings_start_time = datetime.now()

tsv_name_best_known_for = (
    tsv_name.select('known_for_titles', 'primary_profession', 'nconst', 'primary_name', 'primary_profession')
        .filter("known_for_titles is not null and arrays_overlap(primary_profession, array('actor', 'actress'))")
        .withColumn("tconst", explode("known_for_titles"))
        .drop("known_for_titles")
        .join(tsv_title_rating, 'tconst')
        .groupBy('nconst', 'primary_name', 'primary_profession')
        .agg(
            avg('average_rating').alias('avg_rating_best_known_for'),
            sum('num_votes').alias('num_votes')
        )
)
tsv_name_best_known_for.show(10, False)
tsv_actor_ratings_total_time = (datetime.now()-tsv_actor_ratings_start_time).total_seconds()
print("Time taken to process: {}".format(tsv_actor_ratings_total_time))

+---------+-----------------+---------------------------------+-------------------------+---------+
|nconst   |primary_name     |primary_profession               |avg_rating_best_known_for|num_votes|
+---------+-----------------+---------------------------------+-------------------------+---------+
|nm0000050|Groucho Marx     |[soundtrack, actor, writer]      |7.699999999999999        |107440   |
|nm0000205|Parker Posey     |[actress, soundtrack, writer]    |6.225                    |601966   |
|nm0000295|Kate Beckinsale  |[actress, producer, soundtrack]  |6.575                    |822675   |
|nm0001316|Oliver Hardy     |[actor, soundtrack, director]    |7.125                    |16775    |
|nm0001492|Kyle MacLachlan  |[actor, soundtrack, director]    |7.075                    |927748   |
|nm0001585|Kelly Packard    |[actress, soundtrack]            |6.025                    |27881    |
|nm0001710|David Schwimmer  |[actor, director, soundtrack]    |7.725                    |1218722  |


## Show the top rated episode from every TV show

In [9]:
tsv_top_episode_start_time = datetime.now()

tsv_episode_window = Window.partitionBy('parent_tconst').orderBy(col('num_votes').desc(), col('average_rating').desc())
tsv_episode_rating = (
    tsv_title_episode.join(tsv_title_rating, 'tconst')
        .join(
            tsv_title_basics.select('tconst', 'original_title')
                .withColumnRenamed('tconst', 'parent_tconst')
                .withColumnRenamed('original_title', 'series_title')
            , 'parent_tconst'
            , 'left'
            )
        .join(
            tsv_title_basics.select('tconst', 'original_title')
                .withColumnRenamed('original_title', 'episode_title')
            , 'tconst'
            , 'left'
            )
        .withColumn('ranks', rank().over(tsv_episode_window))
        .filter('ranks = 1')
        .drop('ranks')
)
tsv_episode_rating.show(10, False)
tsv_top_episode_total_time = (datetime.now()-tsv_top_episode_start_time).total_seconds()
print("Time taken to process: {}".format(tsv_top_episode_total_time))

+---------+-------------+------------+-------------+--------------+---------+------------------------------+--------------------+
|tconst   |parent_tconst|seasonNumber|episodeNumber|average_rating|num_votes|series_title                  |episode_title       |
+---------+-------------+------------+-------------+--------------+---------+------------------------------+--------------------+
|tt0613146|tt0051286    |1           |1            |7.8           |5        |Ivanhoe                       |Freeing the Serfs   |
|tt0593904|tt0061259    |1           |7            |7.9           |28       |The Guns of Will Sonnett      |A Son for a Son     |
|tt0624468|tt0062578    |1           |1            |7.5           |92       |Land of the Giants            |The Crash           |
|tt0074772|tt0075524    |1           |0            |6.5           |19       |Lanigan's Rabbi               |Pilot               |
|tt0514281|tt0078562    |2           |1            |8.4           |50       |Archie Bunker

# Show Count of crew members for each title

In [10]:
tsv_crew_start_time = datetime.now()
tsv_crew_count = (
    tsv_title_crew.withColumn("cd", size('directors'))
        .withColumn('cw', size('writers'))
        .drop('directors', 'writers')
        .withColumnRenamed('cw', 'writers')
        .withColumnRenamed('cd', 'directors')
        .join(tsv_title_principal.groupBy('tconst').pivot('category').count(), 'tconst', 'left')
    )
tsv_crew_count.show(10, False)
tsv_crew_total_time = (datetime.now()-tsv_crew_start_time).total_seconds()
print("Time taken to process: {}".format(tsv_crew_total_time))

+---------+---------+-------+-----+-------+---------------+-------------+----+
|tconst   |directors|writers|actor|actress|archive_footage|archive_sound|self|
+---------+---------+-------+-----+-------+---------------+-------------+----+
|tt0000658|1        |-1     |null |null   |null           |null         |null|
|tt0000839|1        |-1     |null |null   |null           |null         |null|
|tt0001170|1        |1      |3    |1      |null           |null         |null|
|tt0001581|1        |-1     |null |null   |null           |null         |null|
|tt0001664|1        |-1     |2    |1      |null           |null         |null|
|tt0001732|1        |-1     |4    |2      |null           |null         |null|
|tt0001887|1        |-1     |null |null   |null           |null         |null|
|tt0002253|1        |1      |4    |2      |null           |null         |null|
|tt0002347|1        |-1     |null |null   |null           |null         |null|
|tt0002473|1        |1      |3    |4      |null     

# Final Result

## Case 1

In [11]:
time_result = pd.DataFrame(
    columns=['Task', 'Parquet', 'TSV'],
    data=[['Avg Actor Rating', parquet_actor_ratings_total_time, tsv_actor_ratings_total_time],
            ['Top Rated Episode', parquet_top_episode_total_time, tsv_top_episode_total_time],
            ['Count Crew Members',parquet_crew_total_time, tsv_crew_total_time ],
            ['File Loading', parquet_file_load_time, tsv_file_load_time],
            [
                'Total Processing Time',
                parquet_actor_ratings_total_time + parquet_top_episode_total_time + parquet_crew_total_time,
                tsv_actor_ratings_total_time + tsv_top_episode_total_time + tsv_crew_total_time
            ],
            [
                'Overall Total Time',
                parquet_actor_ratings_total_time + parquet_top_episode_total_time + parquet_crew_total_time + parquet_file_load_time,
                tsv_actor_ratings_total_time + tsv_top_episode_total_time + tsv_crew_total_time + tsv_file_load_time
            ]
    ])
display(time_result)

Unnamed: 0,Task,Parquet,TSV
0,Avg Actor Rating,34.151825,95.98023
1,Top Rated Episode,29.295635,100.308339
2,Count Crew Members,41.034453,726.72443
3,File Loading,10.659643,319.170284
4,Total Processing Time,104.481913,923.012999
5,Overall Total Time,115.141556,1242.183283


## Case 2

In [15]:
time_result = pd.DataFrame(
    columns=['Task', 'Parquet', 'TSV'],
    data=[['Avg Actor Rating', parquet_actor_ratings_total_time, tsv_actor_ratings_total_time],
            ['Top Rated Episode', parquet_top_episode_total_time, tsv_top_episode_total_time],
            ['Count Crew Members',parquet_crew_total_time, tsv_crew_total_time ],
            ['File Loading', parquet_file_load_time, tsv_file_load_time],
            [
                'Total Processing Time',
                parquet_actor_ratings_total_time + parquet_top_episode_total_time + parquet_crew_total_time,
                tsv_actor_ratings_total_time + tsv_top_episode_total_time + tsv_crew_total_time
            ],
            [
                'Overall Total Time',
                parquet_actor_ratings_total_time + parquet_top_episode_total_time + parquet_crew_total_time + parquet_file_load_time,
                tsv_actor_ratings_total_time + tsv_top_episode_total_time + tsv_crew_total_time + tsv_file_load_time
            ]
    ])
display(time_result)

Unnamed: 0,Task,Parquet,TSV
0,Avg Actor Rating,43.629378,90.497702
1,Top Rated Episode,41.142197,60.247065
2,Count Crew Members,53.458683,827.916913
3,File Loading,7.298965,315.670217
4,Total Processing Time,138.230258,978.66168
5,Overall Total Time,145.529223,1294.331897
