##***Market Basket Analysis (IMDB) - Algorithms for Massive Datasets***

###**Dataset download via kaggle API and Spark Context set-up**

Upload Kaggle key

In [None]:
from google.colab import files
files.upload() 

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"alessandrofoffo","key":"a63ce383deeeaeeca762490920b37640"}'}

Install pySpark and initialize SparkContext

In [None]:
!pip install pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 62kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=96ca0b39ea106bc3078082c54a9290969c1f13802dc309ddbfbdcc24e3de7fe7
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [None]:
sc = SparkContext('local[*]')

In [None]:
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

In [None]:
SQLContext = SQLContext(sc)

Import Kaggle and download the dataset

In [None]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/

!chmod 600 ~/.kaggle/kaggle.json

import kaggle  
! pip install -q kaggle

In [None]:
!kaggle datasets download -d ashirwadsangwan/imdb-dataset

Downloading imdb-dataset.zip to /content
 99% 1.43G/1.44G [00:13<00:00, 128MB/s]
100% 1.44G/1.44G [00:13<00:00, 117MB/s]


Extract the dataset

In [None]:
import zipfile 
dataset = "imdb-dataset.zip"

with zipfile.ZipFile(dataset, "r") as zipdataset:
  zipdataset.extractall()

###***Data pre-processing***

Reading title-principals.csv file. It will be stored as a DataFrame through the spark SQL context.

In [None]:
titleprincipals = SQLContext.read.csv('title.principals.tsv.gz', header=True, sep = '\t')

titleprincipals.show(10)

+---------+--------+---------+---------------+--------------------+-----------+
|   tconst|ordering|   nconst|       category|                 job| characters|
+---------+--------+---------+---------------+--------------------+-----------+
|tt0000001|       1|nm1588970|           self|                  \N|["Herself"]|
|tt0000001|       2|nm0005690|       director|                  \N|         \N|
|tt0000001|       3|nm0374658|cinematographer|director of photo...|         \N|
|tt0000002|       1|nm0721526|       director|                  \N|         \N|
|tt0000002|       2|nm1335271|       composer|                  \N|         \N|
|tt0000003|       1|nm0721526|       director|                  \N|         \N|
|tt0000003|       2|nm5442194|       producer|            producer|         \N|
|tt0000003|       3|nm1335271|       composer|                  \N|         \N|
|tt0000003|       4|nm5442200|         editor|                  \N|         \N|
|tt0000004|       1|nm0721526|       dir

Filtering the table through a simple SQL query, in order to consider only the columns referring to the title of the movie and the name of the **actor** or **actress** appearing in them. 

In [None]:
titleprincipals.createOrReplaceTempView("titleprincipals")

In [None]:
query = """SELECT tconst, nconst, category
FROM titleprincipals
WHERE category == "actor" OR category == "actress"
"""
titleprincipals_1 = spark.sql(query)
titleprincipals_1.show()

+---------+---------+--------+
|   tconst|   nconst|category|
+---------+---------+--------+
|tt0000005|nm0443482|   actor|
|tt0000005|nm0653042|   actor|
|tt0000007|nm0179163|   actor|
|tt0000007|nm0183947|   actor|
|tt0000008|nm0653028|   actor|
|tt0000009|nm0063086| actress|
|tt0000009|nm0183823|   actor|
|tt0000009|nm1309758|   actor|
|tt0000011|nm3692297|   actor|
|tt0000014|nm0166380|   actor|
|tt0000014|nm0244989|   actor|
|tt0000017|nm3691272|   actor|
|tt0000017|nm3692829| actress|
|tt0000018|nm3692071|   actor|
|tt0000026|nm2350007|   actor|
|tt0000026|nm0525907|   actor|
|tt0000026|nm1151424|   actor|
|tt0000026|nm2354154|   actor|
|tt0000032|nm3692479| actress|
|tt0000036|nm0420198|   actor|
+---------+---------+--------+
only showing top 20 rows



Reading title-basics.csv file. It will be stored as a DataFrame through the spark SQL context.

In [None]:
titlebasics = SQLContext.read.csv('title.basics.tsv.gz', header=True, sep = '\t')

titlebasics.show(10) 

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|     \N|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|     \N|            \N|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|     \N|             1|        Comedy

Filtering the table through a simple SQL query, in order to consider only the rows referring to movies and the columns referring to the id of the movie.

In [None]:
titlebasics.createOrReplaceTempView("titlebasics")

In [None]:
query = """SELECT tconst, titleType
FROM titlebasics
WHERE titleType="movie"
"""
titlebasics_1 = spark.sql(query)
titlebasics_1.show()

+---------+---------+
|   tconst|titleType|
+---------+---------+
|tt0000009|    movie|
|tt0000147|    movie|
|tt0000335|    movie|
|tt0000502|    movie|
|tt0000574|    movie|
|tt0000615|    movie|
|tt0000630|    movie|
|tt0000675|    movie|
|tt0000676|    movie|
|tt0000679|    movie|
|tt0000739|    movie|
|tt0000793|    movie|
|tt0000812|    movie|
|tt0000814|    movie|
|tt0000838|    movie|
|tt0000842|    movie|
|tt0000846|    movie|
|tt0000850|    movie|
|tt0000859|    movie|
|tt0000862|    movie|
+---------+---------+
only showing top 20 rows



Generation of a dataframe consisting of two columns: one for the id of the movie and the other one for the id of the actor. Since the dataframe is the result of a join operation between the two tables obtained before, there is one entry for every actor appearing in each movie (notice that the filtering operations performed previously allows us to focus exclusively on actor and actresses appearing in movies).
This dataframe will be next used to obtain the RDD containing one entry for each basket (represented by a single movie and its corresponding actors).

In [None]:
titlebasics_1.createOrReplaceTempView("titlebasics_1")
titleprincipals_1.createOrReplaceTempView("titleprincipals_1")

In [None]:
query = """SELECT b.tconst, nconst
FROM titlebasics_1 AS b INNER JOIN titleprincipals_1 AS p ON b.tconst=p.tconst
"""
joined_table = spark.sql(query)
joined_table.show()

+---------+---------+
|   tconst|   nconst|
+---------+---------+
|tt0002591|nm0029806|
|tt0002591|nm0509573|
|tt0003689|nm0694718|
|tt0003689|nm0101071|
|tt0003689|nm0910564|
|tt0003689|nm0527801|
|tt0003689|nm0585503|
|tt0003689|nm0728289|
|tt0003689|nm0399988|
|tt0004272|nm0368875|
|tt0004272|nm0192062|
|tt0004272|nm0285643|
|tt0004272|nm0776747|
|tt0004272|nm0793189|
|tt0004272|nm0445507|
|tt0004272|nm0092665|
|tt0004272|nm0383278|
|tt0004272|nm0492302|
|tt0004336|nm0102718|
|tt0004336|nm0478359|
+---------+---------+
only showing top 20 rows



Reading name-basics.csv file. It will be stored as a DataFrame through the spark SQL context.

In [None]:
namebasics = SQLContext.read.csv('name.basics.tsv.gz', header = True, sep = '\t')
namebasics.show(10)

+---------+---------------+---------+---------+--------------------+--------------------+
|   nconst|    primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+---------------+---------+---------+--------------------+--------------------+
|nm0000001|   Fred Astaire|     1899|     1987|soundtrack,actor,...|tt0050419,tt00531...|
|nm0000002|  Lauren Bacall|     1924|     2014|  actress,soundtrack|tt0117057,tt00373...|
|nm0000003|Brigitte Bardot|     1934|       \N|actress,soundtrac...|tt0049189,tt00599...|
|nm0000004|   John Belushi|     1949|     1982|actor,writer,soun...|tt0078723,tt00804...|
|nm0000005| Ingmar Bergman|     1918|     2007|writer,director,a...|tt0050986,tt00839...|
|nm0000006| Ingrid Bergman|     1915|     1982|actress,soundtrac...|tt0077711,tt00387...|
|nm0000007|Humphrey Bogart|     1899|     1957|actor,soundtrack,...|tt0033870,tt00373...|
|nm0000008|  Marlon Brando|     1924|     2004|actor,soundtrack,...|tt0078788,tt00708...|
|nm0000009

###***A-Priori Algorithm***

The A-Priori algorithm was implemented to find only the frequent pairs. The algorithm performs two passes over the data: in the first one, a list containing all the frequent singleton items is produced; in the second pass instead, the baskets file is scanned to output the count of the pairs in which both items are frequent: among these, the ones with a number of occurrences larger than the threshold are selected, to form the set of frequent pairs.

First of all, an rdd where every entry is a basket is created: it is obtained enforcing the groupByKey transformation on the rdd whose entries are the single entries of the joined table obtained previously, and encapsulating in a list the set of actors corresponding to each movie. In this way, each entry is formed by a tuple containing a movie and the corresponding list of actors. 

In [None]:
baskets_rdd = (joined_table.rdd
                           .map(tuple)
                           .groupByKey()
                           .mapValues(list))

Importing combinations function

In [None]:
from itertools import combinations

This function implements the two passes of the A-Priori algorithm described previously. It takes as input the RDD of baskets defined before and a threshold that is defined by the user, and returns as output the list of frequent pairs of actors (along with their absolute frequency).

In [None]:
def apriori(baskets_rdd, threshold):
  single_actors_countval=(baskets_rdd #the rdd of baskets defined before
                         .flatMap(lambda s:s[1]) #only consider the actors
                         .map(lambda s:(s,1))
                         .reduceByKey(lambda a,b:a+b) #obtain each actor with the total number of occurrencies
                         .collect())
  apriori.frequent_single_actors=list(filter(lambda s: s[1]>threshold, single_actors_countval)) #filter only the frequent actors
  frequent_single_actors_list=[x[0] for x in apriori.frequent_single_actors] #this list stores the ids of the frequent actors
  frequent_actors_pairs_list=(baskets_rdd
                              .map(lambda s:s[1])
                              .map(lambda s: [x for x in s if x in frequent_single_actors_list])
                              .filter(lambda s: len(s)>=2)
                              .flatMap(lambda s: list(combinations(s, 2)))
                              .map(lambda s:tuple(sorted(s)))
                              .map(lambda s:(s,1))
                              .reduceByKey(lambda a,b:a+b)
                              .collect())

  frequent_pairs_list=list(filter(lambda s: s[1]>threshold, frequent_actors_pairs_list)) #filter frequent pairs
  return frequent_pairs_list

Applying A-Priori (with a threshold of 125) on the RDD of baskets, there seem to be 5 frequent pairs.


In [None]:
apriori(baskets_rdd, 125)

[(('nm0006982', 'nm0046850'), 169),
 (('nm0648803', 'nm2082516'), 147),
 (('nm0006982', 'nm0623427'), 236),
 (('nm0648803', 'nm2373718'), 126),
 (('nm0006982', 'nm0419653'), 162)]

###***SON Algorithm***

In the SON-Algorithm instead, the input file is partitioned into chunks and the set of candidate pairs is obtained by applying the A-Priori algorithm (using a threshold proportional to the dimension of the sample) on each chunk and taking the union of the pairs that were found to be frequent in them.
In a second pass through the baskets file, a filtering operations to remove false positives is carried out: namely, for each pair in the candidate set, its number of occurencies in the whole dataset is computed, and only the pairs that are found to occur a number of times larger than the threshold are picked to form the set of frequent pairs.


To generate the chunks, first of all, the rdd with the baskets file is collected in main memory to generate a list of 2-tuples, containing for each movie id the list of corresponding actors. 

In [None]:
baskets_list=(joined_table.rdd
              .map(tuple)
              .groupByKey()
              .mapValues(list)
              .collect())

The baskets list is partitioned in n-sized chunks using the following function. It takes as input the list containing all the baskets, and gives back as output a list of lists, containing one list for each chunk.

In [None]:
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

Since we want to partition the baskets file into 10 chunks, we pick n=number of baskets/10.

In [None]:
list_of_chunks=list(chunks(baskets_list,round(len(baskets_list)/10)))

for i in range(len(list_of_chunks)):
  print(len(list_of_chunks[i]))

39376
39376
39376
39376
39376
39376
39376
39376
39376
39375


This function generates the set of candidate pairs obtained from the first pass of the SON Algorithm. In particular, it takes as input the list of chunks output by the function previously defined, and outputs the list of candidate pairs.

In [None]:
def get_candidates_SON(list_of_chunks, threshold):
  apriori_results=[]
  
  for i in range(len(list_of_chunks)):   #with this for loop, the lists containing the frequent pairs and the corresponding frequency in each tuple is stored in apriori_results
    apriori_results.append(apriori(sc.parallelize(list_of_chunks[i]), round(threshold/len(list_of_chunks)))) 
    
  frequent_pairs_in_chunks=[apriori_results[i][j][0] for i in range(len(apriori_results)) for j in range(len(apriori_results[i]))] #here the frequency is discarded
  candidates=list(set(frequent_pairs_in_chunks)) #here duplicates are removed

  
  return candidates

In [None]:
candidates=get_candidates_SON(list_of_chunks,125)

In [None]:
candidates

[('nm0019382', 'nm0103977'),
 ('nm0648803', 'nm2077739'),
 ('nm0945427', 'nm2394215'),
 ('nm0046850', 'nm0623427'),
 ('nm0006982', 'nm0046850'),
 ('nm0246703', 'nm0688093'),
 ('nm1984183', 'nm1990182'),
 ('nm0419653', 'nm0623427'),
 ('nm0006982', 'nm0534867'),
 ('nm1770187', 'nm2373718'),
 ('nm0046850', 'nm0419653'),
 ('nm1698868', 'nm2384746'),
 ('nm0006982', 'nm0616102'),
 ('nm0648803', 'nm1770187'),
 ('nm1698868', 'nm2366585'),
 ('nm1770187', 'nm2077739'),
 ('nm2366585', 'nm2384746'),
 ('nm2077739', 'nm2373718'),
 ('nm0659173', 'nm1006879'),
 ('nm0006982', 'nm0623427'),
 ('nm0648803', 'nm2082516'),
 ('nm0006982', 'nm0080246'),
 ('nm0619779', 'nm0623427'),
 ('nm0006982', 'nm0619779'),
 ('nm2082516', 'nm2373718'),
 ('nm0006982', 'nm0419653'),
 ('nm0648803', 'nm2373718'),
 ('nm0006982', 'nm1467390')]

This function performs the filtering operation characterizing the second pass of the SON Algorithm: it takes as input the rdd containing all the baskets, the list of candidates output in the first pass and the threshold; it outputs the set of frequent pairs and relative frequency.

In [None]:
def filter_false_positives_SON(baskets_rdd, candidates, threshold):
  frequent_pairs=(baskets_rdd
                  .map(lambda s: s[1])
                  .flatMap(lambda s: list(combinations(s, 2)))
                  .map(lambda s:tuple(sorted(s)))
                  .filter(lambda s: s in candidates)
                  .map(lambda s:(s,1))
                  .reduceByKey(lambda a,b:a+b)
                  .filter(lambda s: s[1]>threshold)
                  .collect())
  return frequent_pairs

In [None]:
frequent_pairs=filter_false_positives_SON(baskets_rdd, candidates, 125)

In [None]:
frequent_pairs

[(('nm0006982', 'nm0046850'), 169),
 (('nm0648803', 'nm2082516'), 147),
 (('nm0006982', 'nm0623427'), 236),
 (('nm0648803', 'nm2373718'), 126),
 (('nm0006982', 'nm0419653'), 162)]

This is the same result achieved with the A-priori algorithm

In [None]:
apriori(baskets_rdd, 125)

[(('nm0006982', 'nm0046850'), 169),
 (('nm0648803', 'nm2082516'), 147),
 (('nm0006982', 'nm0623427'), 236),
 (('nm0648803', 'nm2373718'), 126),
 (('nm0006982', 'nm0419653'), 162)]

###***Analysis of the frequent pairs found***

We found 5 pairs of actors that appear together in the same movie. Let us discover their names from the name_basic table.

In [None]:
frequent_pairs=sc.parallelize(frequent_pairs)

In [None]:
frequent_names=(frequent_pairs
                .flatMap(lambda s:s[0])
                .collect())

In [None]:
frequent_names=tuple(set(frequent_names)) #remove duplicates
frequent_names

('nm0006982',
 'nm0648803',
 'nm2082516',
 'nm2373718',
 'nm0419653',
 'nm0046850',
 'nm0623427')

Here the namebasics table is queried in order to retrieve the name of the actors that figure in the pairs output as frequent by the previous algorithms.

In [None]:
namebasics.createOrReplaceTempView("namebasics")

In [None]:
query = """SELECT nconst, primaryName
FROM namebasics
WHERE nconst IN ('nm0419653',
 'nm0623427',
 'nm0046850',
 'nm2373718',
 'nm2082516',
 'nm0006982',
 'nm0648803')
"""
actors_name = spark.sql(query)
actors_name.show()

+---------+----------------+
|   nconst|     primaryName|
+---------+----------------+
|nm0006982|     Adoor Bhasi|
|nm0046850|         Bahadur|
|nm0419653|     Jayabharati|
|nm0623427|      Prem Nazir|
|nm0648803|Matsunosuke Onoe|
|nm2082516|    Kijaku Ôtani|
|nm2373718|Kitsuraku Arashi|
+---------+----------------+



Let us make two examples of an association rule, and the corresponding confidence. We consider the most frequent pair, that is the one with Adoor Bhasi and Prem Nazir and the least frequent one, that is the one with Matsunosuke Onoe and Kitsuraku Arashi.

In [None]:
frequent_pairs=frequent_pairs.collect()

In [None]:
supp_Iuj=frequent_pairs[2][1]
supp_Iuj

236

The support of the single actors was found from the first pass of the A-priori algorithm and stored in a list of tuples. Let us cast that list in a dictionary and retrieve the support of Ador Bhasi ('nm0006982').

In [None]:
frequent_single_actors={i:j for (i,j) in apriori.frequent_single_actors}

In [None]:
supp_I=frequent_single_actors['nm0006982']
supp_I

585

In [None]:
confidence=supp_Iuj/supp_I
confidence

0.40341880341880343

So with probability 0.4 Nazir will appear in a movie where Bhasi is also playing.

Let us compute the confidence of the least frequent association rule, the one between Onoe('nm0648803') and Arashi('nm2373718')

In [None]:
supp_I=frequent_single_actors['nm0648803']
supp_I

565

In [None]:
supp_Iuj=frequent_pairs[3][1]
supp_Iuj

126

In [None]:
confidence=supp_Iuj/supp_I
confidence

0.22300884955752212

With probability 0.22 Arashi will appear in a movie where Onoe is also playing.