# Market basket analysis: IMDb Dataset

#### Lo Vecchio Gianluca (961536)
##### DSE(2020-2021)

## Download the dataset through Kaggle API

In [1]:
!pip install kaggle



!mkdir ~/.kaggle

*Is necessary to upload the kaggle.json file manualy in the enviroment*

In [1]:
!cp ~/kaggle.json ~/.kaggle/kaggle.json

In [4]:
!kaggle datasets download -d 'ashirwadsangwan/imdb-dataset'

Downloading imdb-dataset.zip to /home/jupyter
 98%|██████████████████████████████████████ | 1.41G/1.44G [00:05<00:00, 313MB/s]
100%|███████████████████████████████████████| 1.44G/1.44G [00:05<00:00, 273MB/s]


In [2]:
!unzip \*.zip && rm *.zip

Archive:  imdb-dataset.zip
replace name.basics.tsv.gz? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C


## Setting up the Jupyter Notebook environment 

In [3]:
!pip install pyspark
import pyspark
from pyspark import SparkContext,SparkConf
conf = SparkConf().setAppName("MB Analysis")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '4G')
        .set('spark.driver.memory', '45G')
        .set('spark.driver.maxResultSize', '10G')
        .set('spark.reducer.maxSizeInFlight', '96m'))
sc = pyspark.SparkContext('local[*]', conf=conf)
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set, col, count
spark = SparkSession.builder.appName("MB Analysis").getOrCreate()
import itertools

import pandas as pd

import time

import matplotlib.pyplot as plt



## Loading the three datasets

Importing the three necessary tables.

In [4]:
FilmTitle = spark.read.csv("title.basics.tsv", sep=r'\t', header=True)
Job = spark.read.csv("title.principals.tsv", sep=r'\t', header=True)
NamesCast = spark.read.csv("name.basics.tsv", sep=r'\t', header=True)

Film table:

In [3]:
FilmTitle.show(10)

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|     \N|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|     \N|            \N|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|     \N|             1|        Comedy

Job table:

In [4]:
Job.show(10)

+---------+--------+---------+---------------+--------------------+-----------+
|   tconst|ordering|   nconst|       category|                 job| characters|
+---------+--------+---------+---------------+--------------------+-----------+
|tt0000001|       1|nm1588970|           self|                  \N|["Herself"]|
|tt0000001|       2|nm0005690|       director|                  \N|         \N|
|tt0000001|       3|nm0374658|cinematographer|director of photo...|         \N|
|tt0000002|       1|nm0721526|       director|                  \N|         \N|
|tt0000002|       2|nm1335271|       composer|                  \N|         \N|
|tt0000003|       1|nm0721526|       director|                  \N|         \N|
|tt0000003|       2|nm5442194|       producer|            producer|         \N|
|tt0000003|       3|nm1335271|       composer|                  \N|         \N|
|tt0000003|       4|nm5442200|         editor|                  \N|         \N|
|tt0000004|       1|nm0721526|       dir

Actress and Actors table:

In [5]:
NamesCast.show(10)

+---------+---------------+---------+---------+--------------------+--------------------+
|   nconst|    primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+---------------+---------+---------+--------------------+--------------------+
|nm0000001|   Fred Astaire|     1899|     1987|soundtrack,actor,...|tt0050419,tt00531...|
|nm0000002|  Lauren Bacall|     1924|     2014|  actress,soundtrack|tt0071877,tt01170...|
|nm0000003|Brigitte Bardot|     1934|       \N|actress,soundtrac...|tt0054452,tt00491...|
|nm0000004|   John Belushi|     1949|     1982|actor,writer,soun...|tt0077975,tt00725...|
|nm0000005| Ingmar Bergman|     1918|     2007|writer,director,a...|tt0069467,tt00509...|
|nm0000006| Ingrid Bergman|     1915|     1982|actress,soundtrac...|tt0038109,tt00368...|
|nm0000007|Humphrey Bogart|     1899|     1957|actor,soundtrack,...|tt0043265,tt00338...|
|nm0000008|  Marlon Brando|     1924|     2004|actor,soundtrack,...|tt0070849,tt00787...|
|nm0000009

## PreProcessing

SparkSQL views are necessary to make queries and create new tables with joined data from the original ones:

In [5]:
FilmTitle.createOrReplaceTempView("FilmTitle")
Job.createOrReplaceTempView("Job")

Get item (Actors and Actress) query:

In [7]:
actors =  """ SELECT *
              FROM Job
              WHERE (category = 'actor') OR (category = 'actress')
"""
actors = spark.sql(actors)
actors.show(10)

+---------+--------+---------+--------+---+--------------------+
|   tconst|ordering|   nconst|category|job|          characters|
+---------+--------+---------+--------+---+--------------------+
|tt0000005|       1|nm0443482|   actor| \N|      ["Blacksmith"]|
|tt0000005|       2|nm0653042|   actor| \N|       ["Assistant"]|
|tt0000007|       1|nm0179163|   actor| \N|                  \N|
|tt0000007|       2|nm0183947|   actor| \N|                  \N|
|tt0000008|       1|nm0653028|   actor| \N|    ["Sneezing Man"]|
|tt0000009|       1|nm0063086| actress| \N|["Miss Geraldine ...|
|tt0000009|       2|nm0183823|   actor| \N|    ["Mr. Hamilton"]|
|tt0000009|       3|nm1309758|   actor| \N|["Chauncey Depew ...|
|tt0000011|       1|nm3692297|   actor| \N|        ["Acrobats"]|
|tt0000014|       1|nm0166380|   actor| \N|    ["The Gardener"]|
+---------+--------+---------+--------+---+--------------------+
only showing top 10 rows



Get baskets (Movies) query:

In [8]:
movies =  """ SELECT *
              FROM FilmTitle
              WHERE titleType = 'movie'
         """
movies = spark.sql(movies)
movies.show(10)

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000009|    movie|          Miss Jerry|          Miss Jerry|      0|     1894|     \N|            45|             Romance|
|tt0000147|    movie|The Corbett-Fitzs...|The Corbett-Fitzs...|      0|     1897|     \N|            20|Documentary,News,...|
|tt0000335|    movie|Soldiers of the C...|Soldiers of the C...|      0|     1900|     \N|            \N|     Biography,Drama|
|tt0000502|    movie|            Bohemios|            Bohemios|      0|     1905|     \N|           100|                  \N|
|tt0000574|    movie|The Story of the ...|The Story of the ...|      0|     1906|     \N|            70|Biography,Crim

Create a new table that unify idenfiers for the item (nconst<->actors) and for the basket (tconst<->movies) with a join:

In [9]:
B =  """  SELECT Job.tconst, Job.nconst
          FROM Job LEFT JOIN FilmTitle ON Job.tconst = FilmTitle.tconst
          WHERE (Job.category == 'actor' OR Job.category == 'actress') AND FilmTitle.titleType == 'movie'
"""
a = spark.sql(B)
baskets = a.groupBy('tconst').agg(collect_set('nconst').alias('nconst'))
baskets.createOrReplaceTempView('baskets')
baskets.show(5)

+---------+--------------------+
|   tconst|              nconst|
+---------+--------------------+
|tt0002591|[nm0029806, nm050...|
|tt0003689|[nm0910564, nm052...|
|tt0004272|[nm0368875, nm009...|
|tt0004336|[nm0268437, nm081...|
|tt0005209|[nm0593671, nm039...|
+---------+--------------------+
only showing top 5 rows



Code line necessary to create an RDD:

In [10]:
act_bask = baskets.select('nconst').rdd.flatMap(list)
print(act_bask.collect()[:3])

[['nm0029806', 'nm0509573'], ['nm0910564', 'nm0527801', 'nm0399988', 'nm0101071', 'nm0694718', 'nm0728289', 'nm0585503'], ['nm0368875', 'nm0092665', 'nm0492302', 'nm0445507', 'nm0776747', 'nm0383278', 'nm0192062', 'nm0285643', 'nm0793189']]


Here two dictionaries to translate identifiers to readeable names and titles based on the original datasets

In [11]:
Key_act = {row['nconst']:row['primaryName'] for row in (NamesCast.select("nconst","primaryName")).collect()}
Key_title={row['tconst']:row['primaryTitle'] for row in (FilmTitle.select("tconst","primaryTitle")).collect()}

Setting the threshold after which an itemset is consider frequent based on a row count:

In [12]:
count = act_bask.count()
threshold = count*0.0003
print(count, threshold)

393656 118.09679999999999


To build the A-priori algorithm PySpark native function are used:
flatMap transform rdd to list, map maps the item and its frequency inside a tuple. ReduceByKey uses nconst to aggregate the frequency while filter filters nconst according to frequency and threshold. The procedure is used on both singletons and couples. The input of apriori are an rdd and the threshold used in the filter. 

In [13]:
def apriori (rdd, threshold):
  
  singleton = rdd.flatMap(list).map(lambda item: (item , 1)).reduceByKey(lambda actor1, actor2: actor1+actor2).filter(lambda item: item[1] >= threshold ) 

  freq_act = singleton.map(lambda item : (item[0])) # all the pair of frequent items 

  pairs_of_freq_act = list(itertools.combinations(freq_act.toLocalIterator(),2)) # finding all the possible pairs
  
  table_of_pairs = rdd.flatMap(lambda x : [(tuple(item), 1) for item in pairs_of_freq_act if set(list(item)).issubset(set(x))]).cache() \
                  .reduceByKey(lambda actor1, actor2: actor1+actor2) \
                  .filter(lambda item: item[1] >= threshold)

  return (table_of_pairs)

The A-Priori algorithm is run on the dataset and converted to a dataframe for better visualization: 

In [14]:
AprioriResults = apriori(act_bask, threshold)
result = spark.createDataFrame(AprioriResults).toDF("Pairs of actors and actresses", "Number of movies")
result.show()

+-----------------------------+----------------+
|Pairs of actors and actresses|Number of movies|
+-----------------------------+----------------+
|         {nm0623427, nm000...|             237|
|         {nm0006982, nm061...|             122|
|         {nm0006982, nm041...|             162|
|         {nm0046850, nm000...|             169|
|         {nm2082516, nm064...|             147|
|         {nm2373718, nm064...|             126|
+-----------------------------+----------------+



Time required to run the algorithm:

In [15]:
startTime = time.time()
apriori(act_bask, threshold)
APrioriTime = time.time()-startTime
print(time.time()-startTime)

23.999507904052734


Applying translating dict:

In [25]:
print(Key_act[(result.select('Pairs of actors and actresses')).collect()[0][0][0]])
print((result.select('Pairs of actors and actresses')).collect()[0][0][0])
print(Key_act[(result.select('Pairs of actors and actresses')).collect()[0][0][1]])
print((result.select('Pairs of actors and actresses')).collect()[0][0][1])
print((result.select('Pairs of actors and actresses')).collect()[0][0][:2])

Prem Nazir
nm0623427
Adoor Bhasi
nm0006982
('nm0623427', 'nm0006982')


## FP Growth Algorithm

FP Growth Algorithm is run using the library and calculatin the running time all together:

In [35]:
from pyspark.ml.fpm import FPGrowth
startTime = time.time()
fp = FPGrowth(itemsCol = "nconst", minSupport = 0.0003)
startTime = time.time()
FPGrowth_model = fp.fit(baskets)
FPGrowthTime = time.time()- startTime
print(time.time()- startTime)

13.464464664459229


We display the 10 most frequent items

In [36]:
FPGrowth_model.freqItemsets.show(10)

+-----------+----+
|      items|freq|
+-----------+----+
|[nm1388202]| 153|
|[nm0430646]| 120|
|[nm0103977]| 798|
|[nm0006982]| 585|
|[nm0436922]| 152|
|[nm0408381]| 120|
|[nm0648803]| 565|
|[nm0405977]| 152|
|[nm0576495]| 120|
|[nm0579663]| 120|
+-----------+----+
only showing top 10 rows



In [37]:
FPGrowth_model= FPGrowth_model.freqItemsets
FPGrowth_model.createOrReplaceTempView("FPGrowth_model")

In [38]:
q = """ SELECT *
        FROM FPGrowth_model 
        WHERE size (items) = 2
        ORDER BY freq DESC
        """
spark.sql(q).show(10)

+--------------------+----+
|               items|freq|
+--------------------+----+
|[nm0623427, nm000...| 237|
|[nm0046850, nm000...| 169|
|[nm0419653, nm000...| 162|
|[nm2082516, nm064...| 147|
|[nm2373718, nm064...| 126|
|[nm0619779, nm000...| 122|
+--------------------+----+



In [39]:
print(Key_act[(spark.sql(q).select('fpgrowth_model.items')).collect()[0][0][0]])
print((spark.sql(q).select('fpgrowth_model.items')).collect()[0][0][0])
print(Key_act[(spark.sql(q).select('fpgrowth_model.items')).collect()[0][0][1]])
print((spark.sql(q).select('fpgrowth_model.items')).collect()[0][0][1])
print((spark.sql(q).select('fpgrowth_model.items')).collect()[0][0][:2])

Prem Nazir
nm0623427
Adoor Bhasi
nm0006982
['nm0623427', 'nm0006982']


## The SON Algortihm

First of all we create an RDD with the ID of the actors using a sample (due to a problem with the RAM: using all the Job table the sparkContext would be shut down) and then we parallelize it

In [17]:
basket_list = baskets.select("nconst").rdd.flatMap(list)
rdd_actors = sc.parallelize(basket_list.collect(), 8)

We assume a minimum support at 200 and subsequently we adjust it using the number of partitions previously created

In [18]:
#180
Tot_support = 180
num_Parts = rdd_actors.getNumPartitions()
par_support = Tot_support/num_Parts
print(num_Parts)
par_support
#spark.reducer.maxSizeInFlight = 512 

8


22.5

In the cell below we check the possible candidates in each chunk without filtering them and then we put the results into a data frame in order to show them in a pretty way

In [25]:
def son1 (rdd, num_partitions, support) :
  tot_possible_cands = sc.parallelize([])
  g_rdd_act = rdd.glom().collect()
  print("starting...")
  for i in range(0, num_partitions):
    partition = sc.parallelize(g_rdd_act[i])
    acceptable_tables = apriori(partition, support)
    Possible_cands_chunks = acceptable_tables.map(lambda item: (item[0], 1))
    tot_possible_cands =tot_possible_cands.union(Possible_cands_chunks)
  print("Completed!")
  return (tot_possible_cands)

In [26]:
Son1Result = son1(rdd_actors, num_Parts, par_support)

starting...
Completed!


In [27]:
#Son_to_show = Son1Result.map(lambda x: x.split( )[1]) 
print(Son1Result.take(1))
#Son1Result_show = Son1Result.toDF()
#Son1Result_show.count()
#print(Son1Result.collect()[:3])

[(('nm0623427', 'nm0006982'), 1)]


Here we calculate the running time for the first part of the SON algorithm 

In [28]:
startTime = time.time()
son1(rdd_actors, num_Parts, par_support)
SonTime = time.time()-startTime
print(time.time()-startTime)

starting...
Completed!
11.118002653121948


Finally we create the final, filtered result

Is necessary to filter according to the presence in the set of items (baskets) with both custom function "filtering" and native filter function.

In [29]:
def filtering(rddlist, filt):
  for item in filt:
    if set(list(item)).issubset(set(rddlist)):
      return ((item, 1))

In [30]:
def son2 (rdd, tot_possible_cands, support):
  act_id_list = (tot_possible_cands.map(lambda item : item[0])).collect()
  result_filtered = rdd.map(lambda x : filtering(x, act_id_list)).filter(lambda x: x is not None).cache().reduceByKey(lambda actor1, actor2: actor1 + actor2).filter(lambda item: item[1] >= support)
  return (result_filtered)

In [31]:
Son2Result = son2(rdd_actors, Son1Result, Tot_support)
#print(Son2Result.take(1))
print(Son2Result.collect())
Final_results_SON = spark.createDataFrame(Son2Result).toDF("Pairs of actors and actresses","Number of movies")
Final_results_SON.show()

[(('nm0623427', 'nm0006982'), 237)]
+-----------------------------+----------------+
|Pairs of actors and actresses|Number of movies|
+-----------------------------+----------------+
|         {nm0623427, nm000...|             237|
+-----------------------------+----------------+



Here we calculate the running time for the second part of the SON algorithm

In [32]:
startTime = time.time()
son2(rdd_actors, Son1Result, par_support)
SonTime = time.time()-startTime
print(time.time()-startTime)

0.6981103420257568


In [33]:
print(Key_act[(Final_results_SON.select('Pairs of actors and actresses')).collect()[0][0][0]])
print(Key_act[(Final_results_SON.select('Pairs of actors and actresses')).collect()[0][0][1]])

Prem Nazir
Adoor Bhasi


Filtering the FP models result is possible to extract a complete list of singletons.

In [40]:
q_1 = """ SELECT *
        FROM FPGrowth_model 
        WHERE size (items) = 1
        ORDER BY freq DESC
        """
spark.sql(q_1).show(10)

+-----------+----+
|      items|freq|
+-----------+----+
|[nm0103977]| 798|
|[nm0006982]| 585|
|[nm0648803]| 565|
|[nm0305182]| 506|
|[nm0623427]| 438|
|[nm0793813]| 411|
|[nm0246703]| 391|
|[nm0619107]| 387|
|[nm0007123]| 381|
|[nm7390393]| 355|
+-----------+----+
only showing top 10 rows



In [41]:
Prob_Bhasi_towards_Nazir = round(237/585, 2)
Prob_Nazir_towards_Bhasi = round(237/438, 2)
print(Prob_Bhasi_towards_Nazir)
print(Prob_Nazir_towards_Bhasi)

0.41
0.54
