<a href="https://colab.research.google.com/github/ShaokangYANG/IMDB/blob/main/IMDB_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Import dataset from Kaggle**

In [1]:
from google.colab import files
files.upload() #upload kaggle.json

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"mountainsky","key":"210d17aaec699030b82766a96422d60c"}'}

In [2]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/

!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d ashirwadsangwan/imdb-dataset

Downloading imdb-dataset.zip to /content
 99% 1.43G/1.44G [00:16<00:00, 89.6MB/s]
100% 1.44G/1.44G [00:16<00:00, 90.9MB/s]


In [3]:
from zipfile import ZipFile
file_name = "imdb-dataset.zip"

with ZipFile(file_name, "r") as zip:
  zip.extractall()
  print("The dataset has been unzipped")

The dataset has been unzipped


Check what has been downloaded

In [4]:
!ls

imdb-dataset.zip    title.akas.tsv	  title.principals.tsv.gz
kaggle.json	    title.akas.tsv.gz	  title.ratings.tsv
name.basics.tsv     title.basics.tsv	  title.ratings.tsv.gz
name.basics.tsv.gz  title.basics.tsv.gz
sample_data	    title.principals.tsv


Setting Spark to deal with larger dataset

In [8]:
!pip install pyspark



In [35]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set, col, count
from pyspark.sql import SQLContext

**Create a sparkContext**

In [None]:
conf = SparkConf().setAppName("MarketBasket")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '4G')
        .set('spark.driver.memory', '45G')
        .set('spark.default.parallelism',24)
        .set('spark.driver.maxResultSize', '10G'))
sc = SparkContext(conf=conf)

title_basics contains the movie information


In [11]:
sqlContext = SQLContext(sc)

title_basics = 'title.basics.tsv.gz'
title_basics = sqlContext.read.csv(title_basics, header=True, sep = '\t')
title_basics.show(5) # see what the table looks like originally

# there are 'short' 'video' in titleType, keep 'movie' only
title_basics = title_basics.filter((title_basics.titleType == "movie"))
title_basics.show(5) # see what the table looks like after filtering

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|     \N|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|     \N|            \N|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|     \N|             1|        Comedy

names.basic contains the actors information

In [12]:
name_basics = 'name.basics.tsv.gz'
name_basics = sqlContext.read.csv(name_basics, header=True, sep = '\t')
name_basics.show(5) # see what the table looks like 

+---------+---------------+---------+---------+--------------------+--------------------+
|   nconst|    primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+---------------+---------+---------+--------------------+--------------------+
|nm0000001|   Fred Astaire|     1899|     1987|soundtrack,actor,...|tt0050419,tt00531...|
|nm0000002|  Lauren Bacall|     1924|     2014|  actress,soundtrack|tt0117057,tt00373...|
|nm0000003|Brigitte Bardot|     1934|       \N|actress,soundtrac...|tt0049189,tt00599...|
|nm0000004|   John Belushi|     1949|     1982|actor,writer,soun...|tt0078723,tt00804...|
|nm0000005| Ingmar Bergman|     1918|     2007|writer,director,a...|tt0050986,tt00839...|
+---------+---------------+---------+---------+--------------------+--------------------+
only showing top 5 rows




title_principals contains the principal cast/crew for titles, which is necessary to extract movie and actors information.bold text

In [13]:
title_principals = 'title.principals.tsv.gz'
title_principals = sqlContext.read.csv(title_principals, header=True, sep = '\t')
title_principals.show(5) # see what the table looks like originally

title_principals = title_principals.filter((title_principals.category == "actor") | (title_principals.category == "actress")) # keep only actor and actress
title_principals = title_principals.select(col("tconst"),col("nconst")) # select movie id and actor id 
title_principals.show(5) # see what the table looks like after filtering


+---------+--------+---------+---------------+--------------------+-----------+
|   tconst|ordering|   nconst|       category|                 job| characters|
+---------+--------+---------+---------------+--------------------+-----------+
|tt0000001|       1|nm1588970|           self|                  \N|["Herself"]|
|tt0000001|       2|nm0005690|       director|                  \N|         \N|
|tt0000001|       3|nm0374658|cinematographer|director of photo...|         \N|
|tt0000002|       1|nm0721526|       director|                  \N|         \N|
|tt0000002|       2|nm1335271|       composer|                  \N|         \N|
+---------+--------+---------+---------------+--------------------+-----------+
only showing top 5 rows

+---------+---------+
|   tconst|   nconst|
+---------+---------+
|tt0000005|nm0443482|
|tt0000005|nm0653042|
|tt0000007|nm0179163|
|tt0000007|nm0183947|
|tt0000008|nm0653028|
+---------+---------+
only showing top 5 rows



In [14]:
title_principals = title_principals.join(title_basics, ["tconst"],)
title_principals.show(5) # print example

+---------+---------+---------+------------+-------------+-------+---------+-------+--------------+------+
|   tconst|   nconst|titleType|primaryTitle|originalTitle|isAdult|startYear|endYear|runtimeMinutes|genres|
+---------+---------+---------+------------+-------------+-------+---------+-------+--------------+------+
|tt0002591|nm0029806|    movie|     Zu spät|      Zu spät|      0|     1912|     \N|            \N|    \N|
|tt0002591|nm0509573|    movie|     Zu spät|      Zu spät|      0|     1912|     \N|            \N|    \N|
|tt0003689|nm0694718|    movie|  Born Again|   Born Again|      0|     1914|     \N|            \N| Drama|
|tt0003689|nm0101071|    movie|  Born Again|   Born Again|      0|     1914|     \N|            \N| Drama|
|tt0003689|nm0910564|    movie|  Born Again|   Born Again|      0|     1914|     \N|            \N| Drama|
+---------+---------+---------+------------+-------------+-------+---------+-------+--------------+------+
only showing top 5 rows



Now I have three tables: title_basics with only movie information, name_basics with only actor informaiton, title_principals with information of principal cast in a movie.

A-prior Algorithom

In [15]:
import itertools

In [16]:
def sum(a,b):
  return a+b

# checks if a set of item is a subset of a basket.
def check_subset(rddlist, filt):
  for item in filt:
    if set(list(item)).issubset(set(rddlist)):
      return ((item, 1))

In [17]:
def apriori(rdd, threshold):
  
  flat_list  = rdd.flatMap(list) 

  singleton = flat_list.map(lambda item: (item , 1)) #add one for each actor appearence
  singleton_summed = singleton.reduceByKey(sum) #sum of values by actor as key
  singleton_filtered = singleton_summed.filter(lambda item: item[1] >= threshold ) #consider items that appear singularly a number of time larger than the threshold in the baskets.

  #obtain a list of the codes of the items
  frequent_actors = singleton_filtered.map(lambda item: (item[0]))

  #Obtain all the pairs of frequent items:
  pairs_list = list(itertools.combinations(frequent_actors.toLocalIterator(),2))

  #Create the support table for the pairs of items by applying the filtering function previously created.
  support_table_pairs = rdd.map(lambda x : check_subset(x, pairs_list)).filter(lambda x: x is not None).cache() #Apply filtering function to check if a pairs appear in the movies
  support_table_pairs_summed = support_table_pairs.reduceByKey(sum) # sum of values by actor as key
  support_table_pairs_filtered = support_table_pairs_summed.filter(lambda item: item[1] >= threshold) #consider just the actor that performed in more than support value

  return (support_table_pairs_filtered)

SON algorithm

In [32]:
#create a list containing baskets: each basket is a movie and it is a list containing the actors that performed in that film
baskets = title_principals.groupBy("tconst").agg(collect_set("nconst").alias("actors"))

#list of baskets containing actors divided by movies
basket_list = baskets.select('actors').rdd.flatMap(list)

#Parallelize the list in RDD
basket_list = sc.parallelize(basket_list.collect(),10)

print(basket_list.collect()[:10]) #print example

[['nm0029806', 'nm0509573'], ['nm0910564', 'nm0527801', 'nm0399988', 'nm0101071', 'nm0694718', 'nm0728289', 'nm0585503'], ['nm0368875', 'nm0092665', 'nm0492302', 'nm0445507', 'nm0776747', 'nm0383278', 'nm0192062', 'nm0285643', 'nm0793189'], ['nm0268437', 'nm0811293', 'nm0102718', 'nm0190516', 'nm0478359'], ['nm0593671', 'nm0394389', 'nm0203439', 'nm0909648', 'nm0863863', 'nm0269671'], ['nm0075601', 'nm0364218', 'nm0668525', 'nm0322794', 'nm0304236', 'nm0061746'], ['nm0606530', 'nm0493083'], ['nm0071601', 'nm0071618', 'nm0673814', 'nm0507655', 'nm0000909', 'nm0607056', 'nm0071992', 'nm0000910'], ['nm0356267', 'nm0235634', 'nm0285950', 'nm0183994', 'nm0135474', 'nm0512164'], ['nm0546121', 'nm0909648', 'nm0668084', 'nm0455748']]


In [21]:
minSupport = 100
numPartitions = basket_list.getNumPartitions()
adjSupport = minSupport/numPartitions
adjSupport

10.0

Combine A-prior and SON

In [22]:
rdd_object = sc.parallelize([]) #initialize a RDD 

for i in range(0, 2): # since the whole data is too large, I keep only the first two chunks 
  partition = sc.parallelize(basket_list.glom().collect()[i])
  support_table_pairs_filtered = apriori(partition, adjSupport)
  rdd_chunk = support_table_pairs_filtered.map(lambda item: (item[0],1))
  rdd_object = rdd_object.union(rdd_chunk)

In [23]:
#Convert the RDD to dataframe
columns = ["actor_pair","movies"]
rdd_sets = rdd_object.toDF(columns)
rdd_sets.createOrReplaceTempView("rdd_sets")
sqlContext.sql("SELECT * FROM rdd_sets ORDER BY movies DESC").show(truncate = False)

+----------------------+------+
|actor_pair            |movies|
+----------------------+------+
|{nm2366585, nm2367854}|1     |
|{nm0289960, nm0409049}|1     |
|{nm1770187, nm2082516}|1     |
|{nm2366585, nm2384746}|1     |
+----------------------+------+



In [25]:
actors = rdd_object.map(lambda item : item[0])
actors_list = actors.collect()


In [26]:
rdd_check = basket_list.map(lambda x : check_subset(x, actors_list)).filter(lambda x: x is not None).cache()
rdd_summed = rdd_check.reduceByKey(sum)
rdd_filtered = rdd_summed.filter(lambda item: item[1] >= minSupport)

print(rdd_filtered.collect())

[]


In [None]:
#convert the  RDD to dataframe
columns = ["actor_pair","movies"]
results = rdd_filtered.toDF(columns)
results.createOrReplaceTempView("results")
sqlContext.sql("SELECT * FROM results ORDER BY movies DESC").show(truncate = False)

Result

In [None]:
#list of all actors    
flat_list  = basket_list.flatMap(list)

#obtain the actors that performed in a number of movies larger than the threshold:
singleton = flat_list.map(lambda item: (item , 1)) #add one for each actor appearence
singleton_summed = singleton.reduceByKey(sum_actors) #sum of values by actor as key
singleton_filtered = singleton_summed.filter(lambda item: item[1] >= minSupport ) #consider just the actor that performed in more than support value

#convert the first support RDD to dataframe
columns = ["actors", "movies"]
first_df = singleton_filtered.toDF(columns)
first_df.createOrReplaceTempView("first_df")
sqlContext.sql("SELECT * FROM first_df ORDER BY movies DESC").show(5, truncate = False)

In [None]:
name_basics.createOrReplaceTempView("name_basics")

#retrieve the name of the most frequent actors from the name_basics table
sqlContext.sql("SELECT primaryName, actors, movies  FROM (first_df INNER JOIN name_basics ON actors = nconst) WHERE actors = 'nm2082516'").show(truncate = False)
sqlContext.sql("SELECT primaryName, actors, movies  FROM (first_df INNER JOIN name_basics ON actors = nconst) WHERE actors = 'nm0648803'").show(truncate = False)
sqlContext.sql("SELECT primaryName, actors, movies  FROM (first_df INNER JOIN name_basics ON actors = nconst) WHERE actors = 'nm0623427'").show(truncate = False)
sqlContext.sql("SELECT primaryName, actors, movies  FROM (first_df INNER JOIN name_basics ON actors = nconst) WHERE actors = 'nm0006982'").show(truncate = False)