<a href="https://colab.research.google.com/github/abbassix/IMDbPageRank/blob/main/IMDb_PageRank.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
num_iters = 1  # number of iterations
damping_factor = 0.85  # damping factor

In [2]:
# note that versions from 8 to 11 of OpenJDK is required by Spark 3.x
# don't use any other (older or newer) version
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

In [3]:
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz  # download the Spark
!tar xf /content/spark-3.2.1-bin-hadoop3.2.tgz  # unzip it
!pip install -q findspark  # install `findspark` app via pip
!pip install pyspark  # install `pyspark` via pip

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 55.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=d0f31feefaea8d50dc8068c7880994daf0196cf4b6863a08621df23bfae20588
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [4]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [5]:
import findspark

findspark.init()

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("IMDBPageRank").getOrCreate()

In [7]:
spark

In [8]:
# instructions from here: https://www.kaggle.com/general/74235
# install Kaggle API for Python
!pip install -q kaggle

In [9]:
# upload Kaggle API Token file
from google.colab import files

files.upload()  # upload `kaggle.json`

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"remonrezk","key":"c6adebd6f02a2a0f378d65c02d096440"}'}

In [10]:
!mkdir ~/.kaggle  # make a hidden directory
!cp kaggle.json ~/.kaggle/  # copy `kaggle.json` there
!chmod 600 ~/.kaggle/kaggle.json  # give read and write permissions

In [11]:
# note that in the aforementioned instruction we changed
# `competition download -c` to `datasets download -q`.
!kaggle datasets download -q ashirwadsangwan/imdb-dataset

In [12]:
!mkdir imdb  # make new directory
!unzip imdb-dataset.zip -d imdb  # unzip the dataset there

Archive:  imdb-dataset.zip
  inflating: imdb/name.basics.tsv/data.tsv  
  inflating: imdb/title.akas.tsv/data.tsv  
  inflating: imdb/title.basics.tsv/data.tsv  
  inflating: imdb/title.principals.tsv/data.tsv  
  inflating: imdb/title.ratings.tsv/data.tsv  


In [13]:
name_basics = spark.read.csv('/content/imdb/name.basics.tsv', sep=r'\t', header=True)
title_akas = spark.read.csv('/content/imdb/title.akas.tsv', sep=r'\t', header=True)
title_basics = spark.read.csv('/content/imdb/title.basics.tsv', sep='\t' , header=True, inferSchema=True)
title_principals = spark.read.csv('/content/imdb/title.principals.tsv', sep=r'\t', header=True)
title_ratings = spark.read.csv('/content/imdb/title.ratings.tsv', sep=r'\t', header=True)

In [14]:
name_basics.show(5)

+---------+---------------+---------+---------+--------------------+--------------------+
|   nconst|    primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+---------------+---------+---------+--------------------+--------------------+
|nm0000001|   Fred Astaire|     1899|     1987|soundtrack,actor,...|tt0050419,tt00531...|
|nm0000002|  Lauren Bacall|     1924|     2014|  actress,soundtrack|tt0038355,tt00373...|
|nm0000003|Brigitte Bardot|     1934|       \N|actress,soundtrac...|tt0054452,tt00564...|
|nm0000004|   John Belushi|     1949|     1982|actor,soundtrack,...|tt0080455,tt00779...|
|nm0000005| Ingmar Bergman|     1918|     2007|writer,director,a...|tt0060827,tt00839...|
+---------+---------------+---------+---------+--------------------+--------------------+
only showing top 5 rows



In [15]:
# name_basics.count()

In [16]:
title_akas.show(5)

+---------+--------+--------------------+------+--------+-----------+-------------+---------------+
|  titleId|ordering|               title|region|language|      types|   attributes|isOriginalTitle|
+---------+--------+--------------------+------+--------+-----------+-------------+---------------+
|tt0000001|       1|          Карменсіта|    UA|      \N|imdbDisplay|           \N|              0|
|tt0000001|       2|          Carmencita|    DE|      \N|         \N|literal title|              0|
|tt0000001|       3|Carmencita - span...|    HU|      \N|imdbDisplay|           \N|              0|
|tt0000001|       4|          Καρμενσίτα|    GR|      \N|imdbDisplay|           \N|              0|
|tt0000001|       5|          Карменсита|    RU|      \N|imdbDisplay|           \N|              0|
+---------+--------+--------------------+------+--------+-----------+-------------+---------------+
only showing top 5 rows



In [17]:
# title_akas.count()

In [18]:
# title_basics.count()

In [19]:
title_basics.sample(False, 0.001, seed=6).show()

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0002403|    short|    The Old Reporter|    The Old Reporter|      0|     1912|     \N|            \N|         Drama,Short|
|tt0003017|    short|  An Infernal Tangle|  An Infernal Tangle|      0|     1913|     \N|            11|         Drama,Short|
|tt0003674|    short| The Bells of Rheims| The Bells of Rheims|      0|     1914|     \N|            \N|     Drama,Short,War|
|tt0004384|    movie|     The Naked Truth|       La donna nuda|      0|     1914|     \N|            60|               Drama|
|tt0004626|    movie|The Spirit of the...|The Spirit of the...|      0|     1914|     \N|            \N|              

In [20]:
title_basics.createOrReplaceTempView("movies")

titleType_query = """
  SELECT DISTINCT titleType
  FROM movies
"""

titleType_results = spark.sql(titleType_query)

In [21]:
titleType_results.show()

+------------+
|   titleType|
+------------+
|    tvSeries|
|tvMiniSeries|
|     tvMovie|
|   tvEpisode|
|       movie|
|   tvSpecial|
|       video|
|   videoGame|
|     tvShort|
|       short|
|     tvPilot|
+------------+



In [22]:
title_principals.show()

+---------+--------+---------+---------------+--------------------+--------------+
|   tconst|ordering|   nconst|       category|                 job|    characters|
+---------+--------+---------+---------------+--------------------+--------------+
|tt0000001|       1|nm1588970|           self|                  \N|      ["Self"]|
|tt0000001|       2|nm0005690|       director|                  \N|            \N|
|tt0000001|       3|nm0374658|cinematographer|director of photo...|            \N|
|tt0000002|       1|nm0721526|       director|                  \N|            \N|
|tt0000002|       2|nm1335271|       composer|                  \N|            \N|
|tt0000003|       1|nm0721526|       director|                  \N|            \N|
|tt0000003|       2|nm1770680|       producer|            producer|            \N|
|tt0000003|       3|nm1335271|       composer|                  \N|            \N|
|tt0000003|       4|nm5442200|         editor|                  \N|            \N|
|tt0

In [23]:
# title_principals.count()

In [24]:
title_ratings.show()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1925|
|tt0000002|          5.8|     261|
|tt0000003|          6.5|    1741|
|tt0000004|          5.6|     176|
|tt0000005|          6.2|    2554|
|tt0000006|          5.1|     175|
|tt0000007|          5.4|     797|
|tt0000008|          5.4|    2069|
|tt0000009|          5.3|     200|
|tt0000010|          6.9|    6994|
|tt0000011|          5.3|     357|
|tt0000012|          7.4|   12001|
|tt0000013|          5.7|    1850|
|tt0000014|          7.1|    5381|
|tt0000015|          6.2|    1035|
|tt0000016|          5.9|    1460|
|tt0000017|          4.6|     318|
|tt0000018|          5.3|     583|
|tt0000019|          5.1|      31|
|tt0000020|          4.8|     351|
+---------+-------------+--------+
only showing top 20 rows



In [25]:
title_principals.createOrReplaceTempView("titleprincipals")
title_basics.createOrReplaceTempView("titlebasics")

In [26]:
# import pyspark.sql.functions as f

# title_basics.count()  # 6'326'545

# movies = title_basics.filter(f.col("titleType") == "movie").rdd.map(lambda x: x[0])
# movies.count()  # 536'248

# only 8% of the IMDB `tconst` records are `movie`s

In [27]:
# we are not interested in difference between `actor`
# and `actress` so we only keep `p.nconst` and `m.tconst`
query = """
  SELECT tp.nconst, tb.tconst
  FROM titlebasics as tb
  INNER JOIN titleprincipals as tp
  ON tp.tconst = tb.tconst
  WHERE (category = 'actor' or category = 'actress') and (tb.titleType = 'movie' or tb.titleType = 'tvMovie');
  """

In [28]:
results = spark.sql(query)

In [29]:
results.show()

+---------+---------+
|   nconst|   tconst|
+---------+---------+
|nm0624446|tt0000630|
|nm5289318|tt0000862|
|nm5289829|tt0000862|
|nm0264569|tt0000862|
|nm0386036|tt0000862|
|nm0511080|tt0000862|
|nm5188470|tt0000862|
|nm0034453|tt0000941|
|nm0140054|tt0000941|
|nm0243918|tt0000941|
|nm0294022|tt0000941|
|nm0923594|tt0001101|
|nm0135493|tt0001112|
|nm0143332|tt0001112|
|nm0630641|tt0001115|
|nm0064953|tt0001115|
|nm0085066|tt0001115|
|nm0169878|tt0001115|
|nm0299757|tt0001115|
|nm1834296|tt0001115|
+---------+---------+
only showing top 20 rows



In [30]:
# results.count()  # 1'694'722

In [31]:
rdd = results.rdd.map(lambda x: (x[0], x[1]))

In [32]:
# there are 36'499'704 `jobs` in our IMDB dataset; one person can have multiple
# jobs (actress, producer, composer, etc.) even in one signle movie.
# title_principals.count()  # 36'499'704

# only 14'830'233 out of 36'499'704 job are
# actor/actress; this is roughly 41%
# rdd.count()  # 14'830'233 -- 8m

In [33]:
# list the movies an actor/actress participated in
rdd = rdd.map(lambda x: (x[0], [x[1]])).reduceByKey(lambda x, y: x+y)

In [34]:
# we are interested in actors/actresses that participated
# in more than one movies, because it means that it creates
# a link between two movies in our network
rdd = rdd.filter(lambda x: len(x[1]) > 1)

In [35]:
# we only keep the list of movies that share a specific `actor/actress`
rdd = rdd.map(lambda x: x[1])

In [36]:
import itertools

# make all the possible pairs of movies who share an `actor/actress`
# note that we want to have a bidirectional network, so `combinations`
# instead of `permutations` is not a good idea
# note also, since many movies share more than one `actor/actress`
# removing duplicates using `.distinct()` is necessary
rdd = rdd.flatMap(lambda x:itertools.permutations(x, 2)).distinct()

# rdd.count()   # 3m, 47,420,200

In [37]:
def calculate_contributions(outneighbors, rank):
    """calculates vertex contributions to the rank of other out-neighbors."""
    num_outneighbors = len(outneighbors)
    for outneighbor in outneighbors:
        yield (outneighbor, rank / num_outneighbors)

In [38]:
# group the all links by their origin nodes
adjacency_list = rdd.groupByKey().cache()

# adjacency_list.count()  # 435'486

In [39]:
# adjacency_list.map(lambda x: len(x[1])).sum()  # 17m, 47,420,200

In [40]:
# make a tuple for all vertices and initialize ranks of them to one
ranks = adjacency_list.map(lambda outneighbors: (outneighbors[0], 1.0))

In [41]:
from operator import add

# calculates and updates node ranks continuously using PageRank algorithm
# since checking the results is costly, becuase we need to collect them
# in order to be able to check the convergence, we iterate a certain number
# of times, say 10 times
for iteration in range(num_iters):

    # calculates vertex contributions to the rank of out-neighbors
    # vnr stands for vertex_neighbors_rank
    contributions = adjacency_list.join(ranks).flatMap(lambda vnr: calculate_contributions(vnr[1][0], vnr[1][1]))

    # re-calculates vertex ranks based on in-neighbors' contributions
    ranks = contributions.reduceByKey(add).mapValues(lambda rank: rank * damping_factor + (1 - damping_factor))

In [42]:
ranks_sorted = ranks.sortBy(lambda x: -x[1]).collect()

In [43]:
import csv

with open(f'/content/IMDB-PageRank-{num_iters}.csv', 'w', newline='') as outputfile:
     wr = csv.writer(outputfile, quoting=csv.QUOTE_ALL)
     wr.writerow(ranks_sorted)

In [44]:
from google.colab import files

files.download(f'/content/IMDB-PageRank-{num_iters}.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>