<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Importing-Library" data-toc-modified-id="Importing-Library-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Importing Library</a></span></li><li><span><a href="#Reading-Data" data-toc-modified-id="Reading-Data-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Reading Data</a></span></li></ul></div>

# Importing Library

In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import time
from multiprocessing import  Pool
import multiprocessing
import math

# Reading Data

All the data is taken from https://datasets.imdbws.com/. Documentation can be found on https://www.imdb.com/interfaces/ . 

In [2]:
path = 's3://audible-insights/karteuku/IMDB/'

In [3]:
def read_data(path, file_name):
    '''
    This function read a tsv.gz file and replace "\\N" with np.nan
    '''
    df = pd.read_csv(path + file_name, sep = '\t')
    df = df.replace("\\N",np.nan)
    
    return df

In [4]:
title_ratings_df = read_data(path,'title.ratings.tsv.gz')

title_ratings_df = title_ratings_df.sample(n=100000,random_state=123)

In [5]:
#names_df = read_data(path, 'name.basics.tsv.gz')

In [6]:
title_akas_df = read_data(path, 'title.akas.tsv.gz')

mask = title_akas_df['titleId'].isin(title_ratings_df['tconst'])

title_akas_df = title_akas_df[mask]

  if (await self.run_code(code, result,  async_=asy)):


In [7]:
title_basics_df = read_data(path, 'title.basics.tsv.gz')

mask = title_basics_df['tconst'].isin(title_ratings_df['tconst'])

title_basics_df = title_basics_df[mask]

  if (await self.run_code(code, result,  async_=asy)):


In [8]:
title_crew_df = read_data(path, 'title.crew.tsv.gz')

mask = title_crew_df['tconst'].isin(title_ratings_df['tconst'])

title_crew_df = title_crew_df[mask]

In [9]:
title_episode_df = read_data(path, 'title.episode.tsv.gz')

mask = title_episode_df['tconst'].isin(title_ratings_df['tconst'])

title_episode_df = title_episode_df[mask]

In [10]:
title_principals_df = read_data(path, 'title.principals.tsv.gz')

mask = title_principals_df['tconst'].isin(title_ratings_df['tconst'])

title_principals_df = title_principals_df[mask]

# Data Wrangling

In [11]:
def create_join_data():
    '''
    This function performs data wrangling to combine all of the 5 tables 
    '''
    ## Copying the title_ratings_df table and merging with another tables with title granularity
    data = title_ratings_df.copy()
    data = data.merge(title_crew_df, on ='tconst', how ='left')
    data = data.merge(title_basics_df, on ='tconst', how ='left')
    data = data.merge(title_episode_df, on ='tconst', how = 'left')
    
    ## Creating directors_writers columns containing the list of director and writers
    data['directors_writers'] = data['directors'] +',' + data['writers']
    
    ## Calculating the number of maximum episode and seasons a show runs
    title_episode_summary = pd.pivot_table(title_episode_df, index = 'parentTconst', 
                                           values= ['seasonNumber','episodeNumber'], 
                                           aggfunc = np.nanmax
                                          )
    title_episode_summary.columns = ['max_' + col for col in title_episode_summary.columns]
    title_episode_summary.reset_index(inplace=True)
    title_episode_summary.rename(columns = {'parentTconst':'tconst'}, inplace =True)
    
    data = data.merge(title_episode_summary, on ='tconst', how ='left')
    
    ## Calculating the number of unique title, ordering, region, and language each title has
    title_akas_summary = pd.pivot_table(title_akas_df, index= 'titleId', 
                                        values = ['title', 'ordering', 'region', 'types', 'language'], 
                                        aggfunc = lambda x: len(x.unique())
                                        )
    title_akas_summary.columns = ['unique_' + col for col in title_akas_summary.columns]
    title_akas_summary.reset_index(inplace =True)
    title_akas_summary.rename(columns = {'titleId':'tconst'}, inplace =True)
    title_akas_summary.fillna(1, inplace = True)
    
    data = data.merge(title_akas_summary, on ='tconst', how ='left')
    
    ## Calculating the number of listed crews (director, produced, actors, etc.) each show has
    title_principals_summary = pd.pivot_table(title_principals_df, index = 'tconst', 
                                              columns = 'category', values = 'nconst',
                                              aggfunc = lambda x: len(x.unique())
                                             )
    title_principals_summary.columns = ['listed_' + col for col in title_principals_summary.columns]
    title_principals_summary.fillna(0, inplace = True)
    title_principals_summary['listed_total'] = title_principals_summary.sum(axis = 1)
    title_principals_summary.reset_index(inplace = True)
    
    data = data.merge(title_principals_summary, on ='tconst', how ='left')
    
    ## Creating one-hot encoding of titleType
    data['titleType'].fillna('no_type', inplace=True)
    for titleType in data['titleType'].unique():
        data['type_' + titleType] = data['titleType']==titleType
        data['type_' + titleType] = data['type_' + titleType]*1
        
    ## Creating one-hot encoding of genres    
    data['genres'].fillna('no_genre', inplace=True)
    genre_list = []

    for row in data['genres'].dropna().values:
        genres = row.split(',')
        for item in genres:
            if item not in genre_list:
                genre_list.append(item)
                
    genre_col_list = []

    for genre in genre_list:
        data['genre_' + genre] = [genre in data['genres'].loc[row] for row in data.index]
        data['genre_' + genre] = data['genre_' + genre]*1
        genre_col_list.append('genre_' + genre)
        
    data['total_genres'] = data[genre_col_list].sum(axis = 1)
    
    return data

In [12]:
data = create_join_data()

data.shape

(100000, 77)

## Calculate Past Rating

### Calculate Director Past Ratings

In [13]:
print(title_principals_df.shape)

title_principals_df = title_principals_df.merge(data[['tconst','averageRating','startYear']], 
                                                on = 'tconst', how = 'left'
                                                )

print(title_principals_df.shape)

(832116, 6)
(832116, 8)


In [14]:
title_principals_df = title_principals_df[title_principals_df['startYear'].notna()]
title_principals_df['startYear'] = title_principals_df['startYear'].astype(int)

In [15]:
mask = ((title_principals_df['category']=='director') &
        (title_principals_df['averageRating'].notna()) &
        (title_principals_df['startYear'].notna())
        )

title_principals_ref = title_principals_df[mask]
title_principals_ref = title_principals_ref.reset_index().drop(columns = ['index'])

In [16]:
def parallelize_dataframe(df, func, n_cores=8):
    '''
    This functions parallize a DataFrame operation. It takes a DataFrame object (df), 
    applies a function object (func), and parallize the job across n_cores
    '''
    df_split = np.array_split(df, n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

In [17]:
title_principals_ref.head()

Unnamed: 0,tconst,ordering,nconst,category,job,characters,averageRating,startYear
0,tt0000011,2,nm0804434,director,,,5.2,1895
1,tt0000019,1,nm0932055,director,,,5.3,1898
2,tt0000026,5,nm0525910,director,,,5.7,1896
3,tt0000060,1,nm0005690,director,,,7.7,1896
4,tt0000072,1,nm0617588,director,,,3.8,1896


In [18]:
def calculate_past_ratings(data):
    '''
    This functions calculate the past rating and number of movies that has been produced by a crew member 
    (e.g. director), in the year before the title is released. Return a list of dictionary of the output
    '''
    output_list = []
    
    for row in data.index:
        mask = (## Filtering only the relevant crew
                (title_principals_ref['nconst'] == data['nconst'].loc[row]) 
                ## Filtering only the year before the title is released
                & (title_principals_ref['startYear'] < data['startYear'].loc[row]) 
               )
    
        output = dict()
    
        output['number_past_movies'] = len(title_principals_ref['tconst'][mask].unique())
        output['median_past_rating'] = title_principals_ref['averageRating'][mask].median()
        
        output_list.append(output)
    
    return pd.DataFrame(output_list)

In [19]:
def calculate_past_ratings_by_role(data,role):
    '''
    Apply the function calculate_past_ratings(data) to the data and specific role only (e.g. director)/
    '''
    
    ## Creating title_principals_role DataFrame object that only include relevant rows
    mask = ((title_principals_df['category']==role) &
            (title_principals_df['averageRating'].notna()) &
            (title_principals_df['startYear'].notna())
            )

    title_principals_role = title_principals_df[mask]
    title_principals_role = title_principals_role.reset_index().drop(columns = ['index'])
    
    print("Start calculate past rating")
    start = time.time()
    role_past_rating_df = parallelize_dataframe(title_principals_role, calculate_past_ratings, n_cores=8)
    end = time.time()
    running_time = end - start
    print(f"Runtime of the program is {end - start}")
    role_past_rating_df.reset_index(inplace=True)
    
    ## Joining calculated past rating into the title_principals_role DataFrame
    title_principals_role = title_principals_role.join(role_past_rating_df[['number_past_movies',
                                                                            'median_past_rating']]
                                                      )
    
    ## Calculate the median past rating of a crew category (e.g. director, writer). If there are more than 1 director
    ## then the median of median past rating of the directors is reported
    past_rating_df = title_principals_role.groupby(['tconst'])[['median_past_rating']].median().reset_index()
    data = data.merge(past_rating_df, on ='tconst', how ='left')
    
    ## Calculate the sum of number of past movies of a crew category (e.g. director, writer). 
    ## If there are more than 1 director then the sum of number of past movies of the directors is reported
    past_movies_df = title_principals_role.groupby(['tconst'])['number_past_movies'].sum().reset_index()
    data = data.merge(past_movies_df, on ='tconst', how ='left')
    
    data = data.rename(columns = {'number_past_movies':role + '_number_past_movies',
                                  'median_past_rating':role + '_median_past_rating'})
    
    return data

In [34]:
data = calculate_past_ratings_by_role(data,'director')

Start calculate past rating
Runtime of the program is 1024.9653849601746


### Calculate Show Past Episode Rating

In [21]:
## Creating reference DataFrame for past show 
print(title_episode_df.shape)

title_episode_df = title_episode_df.merge(data[['tconst','averageRating','startYear']], 
                                          on = 'tconst', how = 'left'
                                        )

print(title_episode_df.shape)

(47916, 4)
(47916, 6)


In [22]:
mask = ((title_episode_df['parentTconst'].notna()) &
        (title_episode_df['averageRating'].notna()) &
        (title_episode_df['startYear'].notna())
        )

title_episode_df = title_episode_df[mask]
title_episode_df['startYear'] = title_episode_df['startYear'].astype(int)
title_episode_df_ref = title_episode_df.copy()
title_episode_df_ref = title_episode_df_ref.reset_index().drop(columns = ['index'])

In [23]:
title_episode_df.shape

(47909, 6)

In [24]:
def calculate_past_show_ratings(data):
    '''
    This function calculate the average past ratings for each episode in the tv show. 
    it does so by filtering the episode from the same show (same parentTconst) that was released
    before the current episode was released (startYear)
    '''
    output_list = []
    
    for row in data.index:
        
        mask = (## Filtering only the relevant show
                (title_episode_df_ref['parentTconst'] == data['parentTconst'].loc[row]) 
                ## Filtering only the year before the title is released
                & (title_episode_df_ref['startYear'] < data['startYear'].loc[row]) 
               )
    
        output = dict()
        output['parentTconst'] = data['parentTconst'].loc[row]
        output['tconst'] = data['tconst'].loc[row]
        output['number_past_episode'] = len(title_episode_df_ref['tconst'][mask].unique())
        output['average_show_past_rating'] = title_episode_df_ref['averageRating'][mask].mean()
        
        output_list.append(output)
        
    output_df = pd.DataFrame(output_list)    
    
    return output_df

In [26]:
def merge_past_show_ratings(data):
    '''
    This function calculate the ratings of past episode using calculate_past_show_ratings function
    and merge to the output into the original DataFrame
    '''
    print("Start calculate past show rating")
    start = time.time()
    past_show_ratings_df = parallelize_dataframe(title_episode_df, calculate_past_show_ratings, n_cores=8)
    end = time.time()
    running_time = end - start
    print(f"Runtime of the program is {end - start}")
    
    data = data.merge(past_show_ratings_df, on = ['tconst','parentTconst'], how ='left')
    
    return data

In [27]:
data = merge_past_show_ratings(data)

Start calculate past show rating
Runtime of the program is 349.6205430030823


In [35]:
data.to_csv('s3://audible-insights/karteuku/IMDB/data_3.csv', index = False)