<a href="https://colab.research.google.com/github/GiuliaPais/PageRank-IMDb/blob/master/PR_for_imdb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **PageRank applications for IMDb dataset**
### Final assignment for Algorithms for Massive Datasets course at Università Statale di Milano 2020/2021
##### Author: Giulia Pais, student ID number T15156


## **References**



*   GitHub repository of the project: https://github.com/GiuliaPais/PageRank-IMDb
*   Project Report: todo



## **Preliminary steps: download and install packages**



For this project we're going to use Spark DataFrames and GraphFrames.
To work with DataFrames we'll need, instead of a typical Spark context, a SparkSession object.

In [None]:
# Install Java 8, Spark, findspark and pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar -xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark
# Set env variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell'
# Initialize with findspark to import pyspark as a regular library
import findspark
findspark.init("spark-3.1.2-bin-hadoop2.7")
# Obtain a Spark session reference
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 63 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 41.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=9fd8cbc8e03227230021fb2f7532943b347e14ec7922e8c02574c85d1ffecb3a
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [None]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Window
from graphframes import *

For data visualization we're going to use Dash (with Cytoscape extension for graph visualization) and Plotly.

In [None]:
!pip install plotly==5.1.0
!pip install dash --upgrade
!pip install jupyter-dash
!pip install dash-cytoscape==0.2.0
!pip install dash-bootstrap-components



In [None]:
import plotly.express as px
from jupyter_dash import JupyterDash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import dash_cytoscape as cyto
import dash_html_components as html
import plotly.graph_objs as go
import plotly
import dash
from dash.exceptions import PreventUpdate
import dash_bootstrap_components as dbc

## PageRank implementations

In this section we're going to provide several different implementation of the PageRank algorithm based on Spark DataFrames and GraphFrames. For a more detailed dissertation please refer to the project report.

### Classical PageRank algorithm

This is the classical random-surfer algorithm implementation of PageRank. The function takes as input a graph (GraphFrame) and returns a Spark DataFrame containing all nodes ids and their corresponding PageRank value. The user can set one (or both) of the arguments `n_iter` (maximum number of iterations) and `tolerance` (threshold for distance). If both are set, the algorithm will terminate when at least one of the 2 exit conditions is matched.

In [None]:
def page_rank(graph : GraphFrame, 
              tolerance = 10e-6, 
              n_iter = 100, 
              beta = 0.85, 
              verbose = True):
  ''' Computes the PageRank for each node in the input graph.

  This is the implementation of the PageRank original algorithm with 
  the teleportation technique using Spark DataFrames.

  Parameters
  ----------
  graph: GraphFrame
    A graph object
  tolerance: positive number
    Represents the tolerance threshold when checking the distance
    between the pageRank at the previous step and the values at the
    current step
  n_iter: positive number
    Maximum number of iterations. At least one between 'tolerance' and
    'n_iter' must be provided.
  beta: value between 0 and 1
    Corresponds to the probability to follow an outgoing link,
    the damping factor.
  verbose: logical
    If True displays messages on function status during execution

  Returns
  -------
  A Spark dataframe containing all vertices with the corresponding
  pageRank value
  '''
  from math import sqrt
  # Arg validation
  if (tolerance == None and n_iter == None):
    raise ValueError("At least one argument between 'tolerance' and 'n_iter' must be set")
  if (beta > 1 or beta < 0):
    raise ValueError("'beta' must be a value between 0 and 1")
  # Get the out degree of each node. Note: only nodes with at least 1 out edge are returned by this method
  out_deg = graph.outDegrees
  # Get the transition matrix in form of triples by applying a transformation to the edges df
  if verbose:
    print("Computing transition matrix...")
  transition_matrix = graph.edges \
  .join(out_deg.withColumnRenamed("id", "dst"), "dst", how = "left")
  transition_matrix = transition_matrix.withColumn("t_value", (1 / F.col("outDegree")*beta)) \
  .select("src", "dst", "t_value").cache() # beta*M
  # Obtain the total number of nodes
  n_nodes = graph.vertices.count()
  # Separate isolated nodes from connected nodes
  if verbose:
    print("Computing initial vectors for nodes...")
  # Get ids of isolated nodes
  connected_t0 = graph.vertices.select("id") \
    .join(graph.edges.select(F.col("src").alias("id")), 
          on = "id", how = "leftsemi") \
    .withColumn("rank", F.lit(1 / n_nodes)).cache()
  isolated_t0 = graph.vertices.select("id") \
    .join(connected_t0, "id", how = "leftanti") \
    .withColumn("rank", F.lit(1 / n_nodes)).cache()
  isolated_nodes_n = isolated_t0.count()
  sdiff_isolated = 0
  if isolated_nodes_n > 0:
    # Compute rank for isolated components and squared diff
    isolated_t1 = isolated_t0.withColumn("pageRank", F.lit((1-beta)*1/n_nodes))
    sdiff_isolated = isolated_t1.withColumn("row", F.lit(1)) \
    .withColumn("sdiff", F.sum(F.pow(F.col("pageRank") - F.col("rank"), 2)).over(Window.partitionBy("row"))) \
    .select("sdiff").first()[0]
    isolated_t1 = isolated_t1.drop("rank").cache()
  # Iterate
  iteration = 0
  if verbose:
    print("Starting cycle")
  while True:
    iteration += 1
    if verbose:
      print("Iteration: " + str(iteration))
    # Join transition matrix with connected_t0 (on dst)
    t_connected_0 = transition_matrix.join(connected_t0.withColumnRenamed("id", "dst"), 
                                            "dst", 
                                            how = "left")
    # Compute rank for connected components
    t_connected_1 = t_connected_0.withColumn("pageRank_i", F.col("t_value")*F.col("rank"))
    window = Window.partitionBy("id").orderBy("id")
    connected_t1 = t_connected_1.select(t_connected_1.src.alias("id"), 
                                        t_connected_1.pageRank_i) \
                                        .withColumn("row", F.row_number().over(window)) \
                                        .withColumn("pageRank", F.sum(F.col("pageRank_i")) \
                                                    .over(window) + ((1-beta)*1/n_nodes)) \
                                        .drop("pageRank_i") \
                                        .where(F.col("row")==1).select("id", "pageRank")                                   
    # Compute distance
    if iteration > 1:
      sdiff_isolated = 0
    delta = connected_t1.join(connected_t0, on = "id", how = "left") \
    .withColumn("row", F.lit(1)) \
    .withColumn("delta",
                F.sum(F.pow(F.col("rank") - F.col("pageRank"), 2)).over(Window.partitionBy("row"))) \
                .select("delta").first()[0] + sdiff_isolated
    delta = sqrt(delta)
    # Check end conditions
    max_iter_reached = True if n_iter is not None and iteration == n_iter else False
    tol_ok = True if tolerance is not None and delta <= tolerance else False
    if max_iter_reached or tol_ok:
      if isolated_nodes_n > 0:
        final = graph.vertices.join(connected_t1.union(isolated_t1), "id").persist()
        isolated_t1.unpersist()
      else:
        final = graph.vertices.join(connected_t1, "id").persist()
      if verbose:
        print("Finished")
      transition_matrix.unpersist()
      isolated_t0.unpersist()
      connected_t0.unpersist()
      return final
    else:
      # Swap
      connected_t0 = connected_t1.withColumnRenamed("pageRank", "rank")
  return None

### Topic-sensitive PageRank
Modification of the classical algorithm to favor nodes that are about the given topics. In the context of this project, topics are movie genres: a topic-biased rank vector will be computed for each chosen genre.

In [None]:
# Function for each worker (internal)
def _topic_rank(tm, nodes, edges, topic, beta, n_iter, tolerance, verbose):
  from math import sqrt
  if verbose:
    print("Worker '" + topic + "' started")
  # Initialization
  topic_df = nodes.withColumn("intopic", F.when(F.array_contains(F.col("genres"), topic), 1).otherwise(0))
  tm_mod = tm.join(topic_df.select(F.col("id").alias("src"), "intopic"), "src", how="left").cache()
  set_count = topic_df.where(F.col("intopic") == 1).count()
  topic_df = topic_df.withColumn("rank", F.col("intopic") / set_count) \
    .select("id","intopic", "rank")
  # Separating connected components
  connected_t0 = topic_df \
    .join(edges.withColumnRenamed("src", "id"), 
          on = "id", how = "leftsemi").drop("intopic")
  isolated_t0 = topic_df \
    .join(connected_t0, "id", how = "leftanti").cache()
  isolated_nodes_n = isolated_t0.count()
  sdiff_isolated = 0
  if isolated_nodes_n > 0:
    # Compute rank for isolated components and squared diff
    isolated_t1 = isolated_t0.withColumn("pageRank", F.when(F.col("intopic") == 0, 0).otherwise(F.lit((1-beta)*1/set_count)))
    sdiff_isolated = isolated_t1.withColumn("row", F.lit(1)) \
    .withColumn("sdiff", F.sum(F.pow(F.col("pageRank") - F.col("rank"), 2)).over(Window.partitionBy("row"))) \
    .select("sdiff").first()[0]
    isolated_t1 = isolated_t1.drop("rank").drop("intopic")
    isolated_t0 = isolated_t0.drop("intopic")
  else:
    isolated_t0 = isolated_t0.drop("intopic")
  # Iterate
  iteration = 0
  while True:
    iteration += 1
    if verbose:
      print("Worker '" + topic + "': iteration " + str(iteration))
    # Join transition matrix with connected_t0 (on dst)
    t_connected_0 = tm_mod.join(connected_t0.withColumnRenamed("id", "dst"), 
                                        "dst", 
                                        how = "left")
    # Compute rank for connected components
    t_connected_1 = t_connected_0.withColumn("pageRank_i", F.col("t_value")*F.col("rank"))
    window = Window.partitionBy("id").orderBy("id")
    connected_t1 = t_connected_1.select(t_connected_1.src.alias("id"), 
                                        t_connected_1.intopic,
                                        t_connected_1.pageRank_i) \
                                        .withColumn("row", F.row_number().over(window)) \
                                        .withColumn("pageRank", F.sum(F.col("pageRank_i")).over(window))                                   
    connected_t1 = connected_t1 \
      .where(F.col("row")==1).select("id", "intopic", "pageRank") \
      .withColumn("pageRank", F.when(F.col("intopic") == 1, 
                                     F.col("pageRank") + ((1-beta)*1/set_count)) \
                              .otherwise(F.col("pageRank"))) \
      .drop("intopic")                          
    # Compute distance
    if iteration > 1:
      sdiff_isolated = 0
    delta = connected_t1.join(connected_t0, on = "id", how = "left") \
    .withColumn("row", F.lit(1)) \
    .withColumn("delta",
                F.sum(F.pow(F.col("rank") - F.col("pageRank"), 2)).over(Window.partitionBy("row"))) \
                .select("delta").first()[0] + sdiff_isolated
    delta = sqrt(delta)
    # Check end conditions
    max_iter_reached = True if n_iter is not None and iteration == n_iter else False
    tol_ok = True if tolerance is not None and delta <= tolerance else False
    if max_iter_reached or tol_ok:
      if isolated_nodes_n > 0:
        final = connected_t1.union(isolated_t1).select(F.col("id"), F.col("pageRank").alias(topic)).cache()
      else:
        final = connected_t1.select(F.col("id"), F.col("pageRank").alias(topic)).cache()
      if verbose:
        print("Worker '" + topic + "' done")
      tm_mod.unpersist()
      isolated_t0.unpersist()
      return final
    else:
      # Swap
      connected_t0 = connected_t1.withColumnRenamed("pageRank", "rank")
  return None

In [None]:
def topic_sensitive_page_rank(graph : GraphFrame,
                              topics = None, 
                              tolerance = 10e-6, 
                              n_iter = 100, 
                              beta = 0.85, 
                              verbose = True):
  ''' Computes the topic-sensitive PageRank for each node in the input graph.

  Implementation of the topic-sensitive version of PageRank. Topics should be
  chosen between available movie genres.

  Parameters
  ----------
  graph: GraphFrame
    A graph object
  topics: list
    A list of movie genres of interest. If 'None' corresponds to
    classical pageRank.
  tolerance: positive number
    Represents the tolerance threshold when checking the distance
    between the pageRank at the previous step and the values at the
    current step
  n_iter: positive number
    Maximum number of iterations. At least one between 'tolerance' and
    'n_iter' must be provided.
  beta: value between 0 and 1
    Corresponds to the probability to follow an outgoing link,
    the damping factor.
  verbose: logical
    If True displays messages on function status during execution

  Returns
  -------
  A Spark DataFrame with nodes info and one column of PageRank values for
  each topic in input
  '''
  import multiprocessing as ms
  from functools import reduce
  # Arg validation
  if (tolerance == None and n_iter == None):
    raise ValueError("At least one argument between 'tolerance' and 'n_iter' must be set")
  if (beta > 1 or beta < 0):
    raise ValueError("'beta' must be a value between 0 and 1")

  if topics is None or len(topics) == 0:
    return page_rank(graph, tolerance=tolerance, n_iter=n_iter, beta=beta, verbose=verbose)
  # Get the out degree of each node
  out_deg = graph.outDegrees
  # Get the transition matrix in form of triples by applying a transformation to the edges df
  if verbose:
    print("Computing transition matrix...")
  transition_matrix = graph.edges \
  .join(out_deg.withColumnRenamed("id", "dst"), 
                                   "dst", how = "left")
  transition_matrix = transition_matrix.withColumn("t_value", (1 / F.col("outDegree")*beta)) \
  .select("src", "dst", "t_value").cache() # beta*M
  if verbose:
    print("Launching workers...")
  pool = ms.pool.ThreadPool(len(topics))
  arg_list = [(transition_matrix, graph.vertices, graph.edges, t, beta, n_iter, tolerance, verbose) for t in topics]
  res = pool.starmap(_topic_rank, arg_list)
  if verbose:
    print("Workers finished, producing final data frame...")
  res = reduce(lambda x,y: x.join(y, "id", "left"), res)
  res = graph.vertices.join(res, "id", how="left").persist()
  pool.close()
  if verbose:
    print("Finished!")
  transition_matrix.unpersist()
  return res

### Edge-weighted topic-sensitive PageRank
Modification of the PageRank algorithm that allows PageRank computation on graphs with weighted edges.

In [None]:
def weighted_page_rank(graph : GraphFrame, 
                       weights = None,
                       na_politic = 'min_value',
                       tolerance = 10e-6, 
                       n_iter = 100, 
                       beta = 0.85, 
                       verbose = True):
  ''' Computes an edge-weighted version of PageRank.

  Weights are provided as a Spark DataFrame, if no weights are provided
  the algorithm is equivalent to the base version of PageRank.

  Parameters
  ----------
  graph: GraphFrame
    A graph object
  weights: either None (for non-weighted PageRank) or a Spark DataFrame
    If a DataFrame is provided it should contain 2 columns named 'movie_id'
    and 'weight'
  na_politic: one between 'min_value' and 'drop'.
    Influences how missing weights are treated - if 'min_value', NAs are 
    replaced with the minimum weight value found, otherwise the associatied movie
    is filtered out, possibly removing associated edges
  tolerance: positive number
    Represents the tolerance threshold when checking the distance
    between the pageRank at the previous step and the values at the
    current step
  n_iter: positive number
    Maximum number of iterations. At least one between 'tolerance' and
    'n_iter' must be provided.
  beta: value between 0 and 1
    Corresponds to the probability to follow an outgoing link,
  verbose: logical
    If True displays messages on function status during execution

  Returns
  -------
  A Spark DataFrame
  '''
  from pyspark.sql import DataFrame
  import multiprocessing as ms
  from functools import reduce
  from math import sqrt
  # Arg validation
  if (tolerance == None and n_iter == None):
    raise ValueError("At least one argument between 'tolerance' and 'n_iter' must be set")
  if (beta > 1 or beta < 0):
    raise ValueError("'beta' must be a value between 0 and 1")
  if weights is None:
    weighted = False
  elif isinstance(weights, DataFrame):
    weighted = True
  else:
    raise ValueError("'weights' must be either 'None' or a DataFrame")
  if not weighted:
    ## Internally call pr function
    result = page_rank(graph = graph,
                        tolerance = tolerance, 
                        n_iter = n_iter, 
                        beta = beta, 
                        verbose = verbose)
    return result
  if na_politic not in ['min_value', 'drop']:
    print("Warning: unknown 'na_politic', using default value")
    na_politic = 'min_value'
  if verbose:
    print("Computing transition matrix...")
  # Assign weights to edges and compute transition matrix
  window1 = Window.partitionBy("src", "dst").orderBy("src", "dst")
  window2 = Window.partitionBy("dst")
  transition_matrix = graph.edges \
  .withColumn("movie_id", F.explode(F.col("movie_ids"))) \
  .withColumn("row", F.row_number().over(window1)) \
  .join(weights, "movie_id", "left") 
  ## Dealing with missing weights
  if na_politic == 'min_value':
    min_weight = weights.agg({'weight':'min'}).collect()[0][0]
    transition_matrix = transition_matrix.na.fill(value=min_weight, subset=['weight'])
  else:
    transition_matrix = transition_matrix.where(F.col("weight").isNotNull())
  transition_matrix = transition_matrix \
  .withColumn("weight", F.sum(F.col("weight")).over(window1)) \
  .where(F.col("row") == 1) \
  .withColumn("out_weight", F.sum(F.col("weight")).over(window2)) \
  .withColumn("t_value", beta*(F.col("weight") / F.col("out_weight"))) \
  .select("src", "dst", "t_value").cache()
  # Separating connected components
  n_nodes = graph.vertices.count()
  connected_t0 = graph.vertices.select("id") \
    .join(graph.edges.select(F.col("src").alias("id")), 
          on = "id", how = "leftsemi") \
    .withColumn("rank", F.lit(1 / n_nodes)).cache()
  isolated_t0 = graph.vertices.select("id") \
    .join(connected_t0, "id", how = "leftanti") \
    .withColumn("rank", F.lit(1 / n_nodes)).cache()
  isolated_nodes_n = isolated_t0.count()
  sdiff_isolated = 0
  if isolated_nodes_n > 0:
    # Compute rank for isolated components and squared diff
    isolated_t1 = isolated_t0.withColumn("pageRank", F.lit((1-beta)*1/n_nodes))
    sdiff_isolated = isolated_t1.withColumn("row", F.lit(1)) \
    .withColumn("sdiff", F.sum(F.pow(F.col("pageRank") - F.col("rank"), 2)).over(Window.partitionBy("row"))) \
    .select("sdiff").first()[0]
    isolated_t1 = isolated_t1.drop("rank").cache()
  # Iterate
  iteration = 0
  while True:
    iteration += 1
    if verbose:
      print("Iteration: " + str(iteration))
    # Join transition matrix with connected_t0 (on dst)
    t_connected_0 = transition_matrix.join(connected_t0.withColumnRenamed("id", "dst"), "dst", how = "left")
    # Compute rank for connected components
    t_connected_1 = t_connected_0.withColumn("pageRank_i", F.col("t_value")*F.col("rank"))
    window = Window.partitionBy("id").orderBy("id")
    connected_t1 = t_connected_1.select(t_connected_1.src.alias("id"), 
                                        t_connected_1.pageRank_i) \
                                        .withColumn("row", F.row_number().over(window)) \
                                        .withColumn("pageRank", F.sum(F.col("pageRank_i")).over(window))                                   
    connected_t1 = connected_t1 \
      .drop("pageRank_i") \
      .where(F.col("row")==1).select("id", "pageRank") \
      .withColumn("pageRank", F.col("pageRank") + (1-beta)*1/n_nodes)                                
    # Compute distance
    if iteration > 1:
      sdiff_isolated = 0
    delta = connected_t1.join(connected_t0, on = "id", how = "left") \
    .withColumn("row", F.lit(1)) \
    .withColumn("delta",
                F.sum(F.pow(F.col("rank") - F.col("pageRank"), 2)).over(Window.partitionBy("row"))) \
                .select("delta").first()[0] + sdiff_isolated
    delta = sqrt(delta)
    # Check end conditions
    max_iter_reached = True if n_iter is not None and iteration == n_iter else False
    tol_ok = True if tolerance is not None and delta <= tolerance else False
    if max_iter_reached or tol_ok:
      if isolated_nodes_n > 0:
        final = graph.vertices.join(connected_t1.union(isolated_t1), "id").persist()
        isolated_t1.unpersist()
      else:
        final = graph.vertices.join(connected_t1, "id").persist()
      if verbose:
        print("Finished")
      transition_matrix.unpersist()
      connected_t0.unpersist()
      isolated_t0.unpersist()
      return final
    else:
      # Swap
      connected_t0 = connected_t1.withColumnRenamed("pageRank", "rank")
  return None

## Algorithm validation

### Test data set
Minimal data set used solely to validate algorithms

In [None]:
edges = spark.createDataFrame([
   (1, 2, ["m1"]),
   (2, 1, ["m1"]),
   (1, 5, ["m1", "m4"]),
   (5, 1, ["m1", "m4"]),
   (5, 8, ["m4"]),
   (8, 5, ["m4"]),
   (8, 10, ["m3"]),
   (10, 8, ["m3"]),
   (2, 3, ["m2", "m5"]),
   (3, 2, ["m2", "m5"]),
   (2, 6, ["m5"]), 
   (6, 2, ["m5"]), 
   (6, 8, ["m3"]),
   (8, 6, ["m3"]),
   (6, 10, ["m3"]),
   (10, 6, ["m3"]),
   (6, 3, ["m5"]),
   (3, 6, ["m5"]),
   (3, 7, ["m2"]),
   (7, 3, ["m2"]),
   (2, 7, ["m2"]),
   (7, 2, ["m2"]),
   (3, 4, ["m2"]),
   (4, 3, ["m2"]),
   (4, 7, ["m2"]),
   (7, 4, ["m2"]),
   (2, 4, ["m2"]),
   (4, 2, ["m2"]),
   (1, 8, ["m4"]),
   (8, 1, ["m4"])                      
], ["src", "dst", "movie_ids"]).coalesce(1).persist()
nodes = spark.createDataFrame([
   (1, "n01", "ACTOR1", ["Drama", "Romance"]),
   (2, "n02", "ACTOR2", ["Drama", "Crime", "Thriller", "Horror"]),
   (3, "n03", "ACTOR3", ["Crime", "Thriller", "Horror"]),
   (4, "n04", "ACTOR4", ["Crime", "Thriller"]),
   (5, "n05", "ACTOR5", ["Drama", "Romance"]),
   (6, "n06", "ACTOR6", ["Crime", "Thriller", "Horror", "Comedy", "Animation", "Fantasy"]),
   (7, "n07", "ACTOR7", ["Crime", "Thriller"]),
   (8, "n08", "ACTOR8", ["Romance", "Drama", "Comedy", "Animation", "Fantasy"]),
   (9, "n09", "ACTOR9", ["Drama"]),
   (10, "n10", "ACTOR10", ["Comedy", "Animation", "Fantasy"])                            
], ["id", "nconst", "primaryName", "genres"]).coalesce(1).persist()
weights_df = spark.createDataFrame([
    ("m1", 43),
    ("m2", 35),
    ("m3", 14),
    ("m4", 6),
    ("m5", 70)                               
], ["movie_id", "weight"]).coalesce(1).persist()
test_graph = GraphFrame(v = nodes, e = edges).persist()

### Validation of `page_rank`

Algorithm validation with test data and comparison with values calculated by `GraphFrames.pageRank()`.

In [None]:
pr_classic = page_rank(test_graph, n_iter=10, tolerance=10e-6)

Computing transition matrix...
Computing initial vectors for nodes...
Starting cycle
Iteration: 1
Iteration: 2
Iteration: 3
Iteration: 4
Iteration: 5
Iteration: 6
Iteration: 7
Iteration: 8
Iteration: 9
Iteration: 10
Finished


In [None]:
pr_gf = test_graph.pageRank(maxIter=10)

In [None]:
p_pr_gf = pr_gf.vertices.withColumn("pageRank", pr_gf.vertices.pagerank / pr_gf.vertices.count()).toPandas()
p_pr_class = pr_classic.toPandas()
p_pr_gf = (p_pr_gf.loc[:, ["id", "pageRank"]]).assign(alg = "PR GraphFrames")
p_pr_class = (p_pr_class.loc[:, ["id", "pageRank"]]).assign(alg = "PR Custom")
res_df = p_pr_gf.append(p_pr_class).sort_values(['id'])
fig = px.line(res_df, x = "id", y = "pageRank", color = "alg", 
              title="Differences between PageRank values obtained with custom algorithm vs GraphFrames", 
              labels = {'id' : 'Node id', 'pageRank': 'PageRank value', 'alg': 'Algorithm'})
fig.show()

### Validation of `topic_sensitive_page_rank`

In [None]:
ts_pr = topic_sensitive_page_rank(test_graph, topics = ['Drama', 'Thriller'], n_iter = 10, tolerance=10e-6)

Computing transition matrix...
Launching workers...
Worker 'Drama' startedWorker 'Thriller' started

Worker 'Thriller': iteration 1
Worker 'Drama': iteration 1
Worker 'Drama': iteration 2
Worker 'Thriller': iteration 2
Worker 'Thriller': iteration 3
Worker 'Drama': iteration 3
Worker 'Drama': iteration 4
Worker 'Thriller': iteration 4
Worker 'Drama': iteration 5
Worker 'Thriller': iteration 5
Worker 'Drama': iteration 6
Worker 'Thriller': iteration 6
Worker 'Thriller': iteration 7
Worker 'Drama': iteration 7
Worker 'Drama': iteration 8
Worker 'Thriller': iteration 8
Worker 'Thriller': iteration 9
Worker 'Drama': iteration 9
Worker 'Thriller': iteration 10
Worker 'Drama': iteration 10
Worker 'Drama' done
Worker 'Thriller' done
Workers finished, producing final data frame...
Finished!


In [None]:
ts_valid = pr_classic.join(ts_pr.drop("nconst", "primaryName", "genres"), "id")
ts_valid = ts_valid.withColumn("Pos_class", F.row_number().over(Window.orderBy(F.col("pageRank").desc()))) \
.withColumn("Pos_drama", F.row_number().over(Window.orderBy(F.col("Drama").desc()))) \
.withColumn("Pos_thriller", F.row_number().over(Window.orderBy(F.col("Thriller").desc()))) \
.drop("nconst", "primaryName", "genres") 
ts_valid_pd = ts_valid.toPandas()
ts_valid_pd_1 = ts_valid_pd[['id', 'pageRank', 'Drama', 'Thriller']].rename(columns = {"pageRank" : "Classic PageRank", "Drama": "Topic -  Drama", "Thriller": "Topic - Thriller"})
ts_valid_pd_1 = ts_valid_pd_1.melt(id_vars="id", var_name="Value type", value_name="PageRank")
ts_valid_pd_2 = ts_valid_pd[['id', 'Pos_class', 'Pos_drama', 'Pos_thriller']] \
.rename(columns={'Pos_class': "Classic PageRank", 'Pos_drama': "Topic -  Drama", 'Pos_thriller': "Topic - Thriller"}) \
.melt(id_vars='id', var_name="Value type", value_name='Position')
ts_valid_pd = ts_valid_pd_1.merge(ts_valid_pd_2, on = ['id', 'Value type']).sort_values('Position')

In [None]:
fig2 = px.bar(ts_valid_pd, x="PageRank", y="Position",
             orientation = 'h',
             color='Value type', barmode='group', title = "Comparison between classical PageRank values and topic-biased values",
             text = "id",
             labels = {'PageRank' : 'PageRank value'}, width=900)
fig2.update_yaxes(type='category', autorange="reversed")
fig2.update_traces(textposition='outside')
fig2.update_layout(legend=dict(
    orientation="v",
    yanchor="bottom",
    y=0,
    xanchor="right",
    x=1
))
fig2.show()

### Validation of `weighted_page_rank`

In [None]:
w_pr = weighted_page_rank(test_graph, weights=weights_df,  n_iter=10, tolerance=10e-6)

Computing transition matrix...
Iteration: 1
Iteration: 2
Iteration: 3
Iteration: 4
Iteration: 5
Iteration: 6
Iteration: 7
Iteration: 8
Iteration: 9
Iteration: 10
Finished


In [None]:
window_rank = Window.orderBy(F.col("pageRank").desc())
w_comp = w_pr.withColumn("alg", F.lit("Weighted PR")) \
.withColumn("Position", F.row_number().over(window_rank)) \
.union(pr_classic.withColumn("alg", F.lit("Classic PR")) \
       .withColumn("Position", F.row_number().over(window_rank))) 
w_comp = w_comp.toPandas().sort_values(["Position"], ascending = True)

In [None]:
fig3 = px.bar(w_comp, x="pageRank", y="Position",
             orientation = 'h',
             color='alg', barmode='group', title = "Comparison between classical PageRank values and weighted values",
             text = "id",
             labels = {'pageRank' : 'PageRank value', 'alg':'Algorithm'}, width=900)
fig3.update_yaxes(type='category', autorange="reversed")
fig3.update_traces(textposition='outside')
fig3.update_layout(legend=dict(
    orientation="v",
    yanchor="bottom",
    y=0,
    xanchor="right",
    x=1
))
fig3.show()

In [None]:
spark.stop()

## Data import and data cleaning



In the context of this project we're going to use the [IMDb dataset](https://www.kaggle.com/ashirwadsangwan/imdb-dataset) from [Kaggle](https://www.kaggle.com/). To download it from the website we're going to use the [Kaggle API](https://github.com/Kaggle/kaggle-api).

To be able to download the dataset we need credentials stored in a file called `kaggle.json`. Here are the steps to obtain the credentials file:


1.   Login to your Kaggle account
2.   On the top-right part of the screen, click on your profile image and select "Account"
3. Scroll down the page until you find the section "API" and click on "Create new API token". A download of a json file with your credentials will start. 

Now run the next code chunk and upload the file.


In [None]:
from google.colab import files

uploaded = files.upload()
  
!mkdir -p ~/.kaggle/ && mv kaggle.json ~/.kaggle/ && chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle.json


In [None]:
!kaggle datasets download "ashirwadsangwan/imdb-dataset"
!unzip -jq imdb-dataset.zip -d .

Downloading imdb-dataset.zip to /content
100% 1.43G/1.44G [00:14<00:00, 124MB/s]
100% 1.44G/1.44G [00:15<00:00, 103MB/s]


Let us now import the datasets we're going to use for this project. The original IMDb dataset contains 5 separate tables:


*   name.basics: contains information about people (actors, writers, producers...)
*   title.basics: contains information about works (movies, books, ...)
*   title.akas: contains additional information about works
*   title.principals: contains relationships between people and works
*   title.ratings: contains user ratings for works

For the purpose of this project we're going to take into consideration only:

* **name.basics**, in particular only relevant information about actors
* **title.basics**, in particular only relevant information about movies
* **title.principals**, in particular only relevant relationships between actors and movies
* **title.ratings**, for usage in a weighted implementation of PageRank algorithm

In [None]:
# Get an optimized SparkSession for bigger dataset
spark = SparkSession.builder \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", "100") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.getOrCreate()

Let us now read the *.tsv files directly in Spark DataFrames.

In [None]:
# Contains info about movies
movies_df_schema = StructType() \
.add("tconst", StringType(), False) \
.add("titleType", StringType(), False) \
.add("primaryTitle", StringType(), False) \
.add("originalTitle", StringType(), False) \
.add("isAdult", ByteType(), False) \
.add("startYear", IntegerType(), True) \
.add("endYear", IntegerType(), True) \
.add("runtimeMinutes", IntegerType(), True) \
.add("genres", StringType(), True)
movies_df = spark.read.format("csv") \
                      .option("header", "true") \
                      .option("delimiter", "\t") \
                      .schema(movies_df_schema) \
                      .load("title.basics.tsv.gz")
movies_df = movies_df \
  .replace({'\\N': None}) \
  .filter(movies_df.titleType.isin(["movie", "tvMovie"])).cache()

In [None]:
# Contains info about actors
names_df_schema = StructType() \
.add("nconst", StringType(), False) \
.add("primaryName", StringType(), False) \
.add("birthYear", IntegerType(), True) \
.add("deathYear", IntegerType(), True) \
.add("primaryProfession", StringType(), True) \
.add("knownForTitles", StringType(), True)
names_df = spark.read.format("csv") \
                      .option("header", "true") \
                      .option("delimiter", "\t") \
                      .schema(names_df_schema) \
                      .load("name.basics.tsv.gz")

As previously mentioned, from the names dataset we're going to retain only relevant information for our needs in order to reduce memory consumption and exclude potentially problematic data. Thus we're going to set a filter on profession, in order to have only actors (or actresses), and we're going to select only columns `nconst`, which contains a unique identifier for the person, and `primaryName`, which contains the name the person is known for. Morevoer, we're going to assign a progressive numeric id to each actor, since it is easier to iterate over numeric ids: we're going to use this data as unique identifier for our nodes in the graph.

In [None]:
names_df = names_df \
.replace({'\\N': None}) \
.filter(names_df.primaryProfession.contains("actor") | names_df.primaryProfession.contains("actress")) \
.select("nconst", "primaryName") \
.withColumn("id", F.row_number().over(Window.orderBy("nconst"))-1).cache()

In [None]:
# Contains relationships between actors and movies
rel_df_schema = StructType() \
.add("tconst", StringType(), False) \
.add("ordering", IntegerType(), False) \
.add("nconst", StringType(), False) \
.add("category", StringType(), False) \
.add("job", StringType(), True) \
.add("characters", StringType(), True)
rel_df = spark.read.format("csv") \
                      .option("header", "true") \
                      .option("delimiter", "\t") \
                      .schema(rel_df_schema) \
                      .load("title.principals.tsv.gz")
rel_df = rel_df \
.replace({'\\N': None}) \
.join(other=names_df, on="nconst") \
.select("id", "tconst") 

The data frame we just read contains actor-movie relationships. Since we are interested in actor-actor relationships, we need to perform some kind of transformation on our data. In particular, two actors (nodes) are connected by an edge if they co-acted in at least one movie. The edge label will contain a list of the movie ids in which the two actors played a role together.

To obtain an actor-actor relationship data frame, we're going to perform a grouping-aggregate operation: for each movie we will list all the actors that participated in said movie.

In [None]:
cast_by_mov = rel_df \
.groupby(rel_df.tconst) \
.agg(F.collect_set(rel_df.id).alias("cast")) \
.join(other=movies_df, on="tconst", how="inner").cache()

We will now define a function that can be applied to each row of the data frame to obtain the data we're interested in. The function takes as input a single row (a tuple) of the data frame and computes permutations of 2 elements on the list of actors that composes the cast of the movie via the package `itertools`, after that it reshapes the remaining information, obtaining for each row a <u>**set**</u> of rows in the form of:

| src | dst | movie_id  | movie_genres        |
|-----|-----|-----------|---------------------|
| x   | y   | m1        |[Drama, Fantasy,...] |

Computing permutations of 2 elements of all the actors is a way of simulating an undirected graph: since PageRank works on directed graphs only, we need to simulate the symmetric relationship "CO_ACTED" by adding 2 edges with opposite directions for each pair of nodes.


In [None]:
import itertools
def co_rows(row) :
  comb = list(itertools.permutations(row[1], 2))
  new_rows = []
  movie_genres = row[9].split(",") if row[9] is not None else None
  for c in comb:
    new_rows.append((c[0], c[1], row[0], movie_genres))
  return new_rows
actor_actor_rel_df = cast_by_mov.rdd.flatMap(lambda row: co_rows(row)) \
.toDF(["src", "dst", "movie_id", "movie_genres"]).cache()

Since movie genres are relevant for topic-sensitive PageRank, we want to retain this information on nodes rather than edges. To do that, we perform a group by operation on the `src` column of the data frame just obtained and we merge all the lists of associated movie genres removing duplicates. We finally store this information in the `names_df` data frame with a join operation. This is equivalent to finding all movie genres an actor has been associated with in his/her career. 
Another step we're adding is that we perform the join as an *inner join*: in this way, we drop isolated nodes from our data. This is beneficial to us because:


1.   We're mainly interested in the top ranking nodes, isolated nodes will always be the lowest ranking
2.   We're interested in the relations between actors, a node with no edges has very limited meaning in these terms
3.  The percentage of isolated nodes in this dataset is quite high and it would lead to a cumulative PageRank value greatly lower than one


In [None]:
names_df = names_df.join(
    other = actor_actor_rel_df \
    .groupby(F.col("src").alias("id")) \
    .agg(F.array_distinct(F.flatten(F.collect_set(F.col("movie_genres")))).alias("genres")),
    on = "id", how = "inner"
).persist()

We can finally remove genres from the edges data frame. However, if we stopped here we would have a ***multigraph***, aka we would have multiple edges between the same pair of nodes. To obtain a single edge between a pair of nodes we need to collect all the edges into a single one and label it with the list of movies two actors participated in, obtaining this kind of structure:

| src | dst | movie_ids       |
|-----|-----|-----------------|
| x   | y   | [m1, m2,..., mn]|

In [None]:
edges_df = actor_actor_rel_df \
.drop("movie_genres") \
.groupBy([F.col("src"), F.col("dst")]) \
.agg(F.collect_set(F.col("movie_id")).alias("movie_ids")).persist()

Finally, we're going to import the ratings data set.

In [None]:
# Contains user ratings
ratings_df_schema = StructType() \
.add("tconst", StringType(), False) \
.add("averageRating", DoubleType(), False) \
.add("numVotes", IntegerType(), False) 
ratings_df = spark.read.format("csv") \
                      .option("header", "true") \
                      .option("delimiter", "\t") \
                      .schema(ratings_df_schema) \
                      .load("title.ratings.tsv.gz") \
                      .cache()

To obtain a single weight value associated with a movie (its "impact") we perform a series of calculations better described in the dedicated section of the project report.

In [None]:
wind = Window.partitionBy("row")
edge_weights_df = ratings_df \
.repartition(200) \
.join(movies_df, on ="tconst", how="leftsemi") \
.withColumn("row", F.lit(1)) \
.withColumn("r_mean", F.avg(F.col("averageRating")).over(wind)) \
.withColumn("r_sd", F.stddev(F.col("averageRating")).over(wind)) \
.withColumn("r_zscore", F.col("averageRating") - F.col("r_mean") / F.col("r_sd")) \
.drop("r_mean", "r_sd") \
.withColumn("v_mean", F.avg(F.col("numVotes")).over(wind)) \
.withColumn("v_sd", F.stddev(F.col("numVotes")).over(wind)) \
.withColumn("v_zscore", F.col("numVotes") - F.col("v_mean") / F.col("v_sd")) \
.drop("v_mean", "v_sd") \
.withColumn("min_r_zscore", F.min("r_zscore").over(wind)) \
.withColumn("r_zscore", F.col("r_zscore") - F.col("min_r_zscore") + 1) \
.drop("min_r_zscore") \
.withColumn("min_v_zscore", F.min("v_zscore").over(wind)) \
.withColumn("v_zscore", F.col("v_zscore") - F.col("min_v_zscore") + 1) \
.drop("min_v_zscore", "row") \
.withColumn("A", F.lit(0.40)) \
.withColumn("B", F.lit(0.60)) \
.withColumn("weight", F.col("r_zscore")*F.col("A") + F.col("v_zscore")*F.col("B")) \
.select(F.col("tconst").alias("movie_id"), "weight").coalesce(100).persist()

We can finally use the obtained dataframes to build a graph in `graphframes`.

In [None]:
graph = GraphFrame(v = names_df, e = edges_df).persist()

## Applications

In [None]:
#@title ### Function parameters { run: "auto" }
#@markdown #### Arguments for all functions
n_iter = 15 #@param {type:"slider", min:1, max:1000, step:1}
tolerance = 1e-4 #@param {type:"number"}
beta = 0.85 #@param {type:"number"}
verbose = True #@param {type:"boolean"}
#@markdown #### Arguments for `weighted_page_rank`
na_politic = 'min_value' #@param ['min_value', 'drop']


### Arguments for `topic_sensitive_page_rank`
To select multiple topics: `Cmd + click` or `Ctrl + click`

In [None]:
all_genres = actor_actor_rel_df.select("movie_genres").distinct().groupby().agg(F.collect_set(F.col('movie_genres'))).collect()
from itertools import chain
all_genres = set(chain.from_iterable(all_genres[0][0]))

In [None]:
import ipywidgets as widgets
from IPython.display import display
topics = widgets.SelectMultiple(
    options=all_genres,
    description='Topics',
    disabled=False
)
display(topics)

SelectMultiple(description='Topics', options=('Thriller', 'Romance', 'Action', 'Music', 'Sport', 'Mystery', 'S…

### Application of the classical PageRank algorithm to the actors network

In [None]:
PR_classic = page_rank(graph=graph,
                       tolerance=tolerance,
                       n_iter=n_iter,
                       beta=beta,
                       verbose=verbose)

Computing transition matrix...
Computing initial vectors for nodes...
Starting cycle
Iteration: 1
Iteration: 2
Iteration: 3
Iteration: 4
Finished


### Application of the topic-sensitive PageRank algorithm to the actors network

In [None]:
PR_ts = topic_sensitive_page_rank(graph=graph,
                                  tolerance=tolerance,
                                  n_iter=n_iter,
                                  beta=beta,
                                  topics=list(topics.value),
                                  verbose=verbose)

Computing transition matrix...
Launching workers...
Worker 'Action' started
Worker 'Horror' started
Worker 'Drama' started
Worker 'Comedy' started
Worker 'Drama': iteration 3
Worker 'Action': iteration 3
Worker 'Horror': iteration 3
Worker 'Drama': iteration 1
Worker 'Comedy': iteration 1
Worker 'Horror': iteration 1
Worker 'Action': iteration 1
Worker 'Comedy': iteration 3
Worker 'Drama': iteration 4
Worker 'Action': iteration 4
Worker 'Horror': iteration 4
Worker 'Drama': iteration 2
Worker 'Comedy': iteration 2
Worker 'Horror': iteration 2
Worker 'Action': iteration 2
Worker 'Comedy': iteration 4
Worker 'Drama' done
Worker 'Action': iteration 5
Worker 'Horror': iteration 5
Worker 'Drama': iteration 3
Worker 'Comedy': iteration 3
Worker 'Horror': iteration 3
Worker 'Action': iteration 3
Worker 'Comedy' done
Worker 'Action' done
Worker 'Horror' done
Worker 'Drama': iteration 4
Worker 'Comedy': iteration 4
Worker 'Horror': iteration 4
Worker 'Action': iteration 4
Worker 'Drama' done
Wo

### Application of the weighted topic-sensitive PageRank algorithm to the actors network

In [None]:
PR_ew = weighted_page_rank(graph=graph,
                           weights = edge_weights_df,
                           na_politic = 'min_value',
                           tolerance=tolerance,
                           n_iter=n_iter,
                           beta=beta,
                           verbose=verbose)

Computing transition matrix...
Iteration: 1
Iteration: 2
Iteration: 3
Iteration: 4
Iteration: 5
Iteration: 6
Iteration: 7
Iteration: 8
Iteration: 9
Iteration: 10
Iteration: 11
Finished


## Results and data visualization

For each PageRank variation we will derive the top 100 ranking nodes and plot them.

### Getting the top 100 ranking nodes

In [None]:
# Top 100 ranking nodes - classic PR
top_class_nodes = PR_classic.withColumn("Ranking", F.row_number().over(Window.orderBy(F.col('pageRank').desc()))) \
  .where(F.col("Ranking") <= 100)
top_class_edges = graph.edges.join(top_class_nodes, F.col("src") == F.col("id"), how="leftsemi") \
  .join(top_class_nodes, F.col("dst") == F.col("id"), how="leftsemi")
top_class_nodes_pd = top_class_nodes.toPandas()
top_class_edges_pd = top_class_edges.toPandas()

In [None]:
# Top 100 ranking nodes - topic-sensitive PR
if (len(list(topics.value)) > 0):
  top_ts_nodes = PR_ts.withColumn("map", F.create_map(list(itertools.chain.from_iterable((F.lit(t), F.col(t)) for t in list(topics.value))))) \
    .drop(*[t for t in list(topics.value)]) \
    .select('*', F.explode(F.col("map")).alias('topic','pageRank')) \
    .drop("map") \
    .withColumn("Ranking", F.row_number().over(Window.partitionBy("topic").orderBy(F.col("pageRank").desc()))) \
    .filter(F.col("Ranking") <= 100)
  top_ts_edges = graph.edges.join(top_ts_nodes, F.col("src") == F.col("id"), how="leftsemi") \
    .join(top_ts_nodes, F.col("dst") == F.col("id"), how="leftsemi")
  top_ts_nodes_pd = top_ts_nodes.toPandas()
  top_ts_edges_pd = top_ts_edges.toPandas()
else:
  top_ts_nodes = PR_ts.withColumn("Ranking", F.row_number().over(Window.orderBy(F.col('pageRank').desc()))) \
    .where(F.col("Ranking") <= 100)
  top_ts_edges = graph.edges.join(top_ts_nodes, F.col("src") == F.col("id"), how="leftsemi") \
    .join(top_ts_nodes, F.col("dst") == F.col("id"), how="leftsemi")
  top_ts_nodes_pd = top_ts_nodes.toPandas()
  top_ts_edges_pd = top_ts_edges.toPandas()

In [73]:
topics.value

('Action', 'Horror', 'Drama', 'Comedy')

In [None]:
# Top 100 ranking nodes - weighted PR
top_w_nodes = PR_ew.withColumn("Ranking", F.row_number().over(Window.orderBy(F.col('pageRank').desc()))) \
.where(F.col("Ranking") <= 100)
top_w_edges = graph.edges.join(top_w_nodes, F.col("src") == F.col("id"), how="leftsemi") \
  .join(top_w_nodes, F.col("dst") == F.col("id"), how="leftsemi")
top_w_nodes_pd = top_w_nodes.toPandas()
top_w_edges_pd = top_w_edges.toPandas()

### Transformations for data visualization

In [74]:
# Renaming cols
top_class_nodes_pd.rename(columns={'primaryName': 'label'}, inplace=True)
top_class_edges_pd.rename(columns={'src':'source', 'dst':'target'}, inplace=True)
top_ts_nodes_pd.rename(columns={'primaryName': 'label'}, inplace=True)
top_ts_edges_pd.rename(columns={'src':'source', 'dst':'target'}, inplace=True)
top_w_nodes_pd.rename(columns={'primaryName': 'label'}, inplace=True)
top_w_edges_pd.rename(columns={'src':'source', 'dst':'target'}, inplace=True)

In [75]:
# Assign fill colors to PR values
def assign_fill(df):
  # Sample single color from value
  def get_fill_color(value, max):
    val_resc = value/max
    return plotly.colors.sample_colorscale(plotly.colors.diverging.Temps, val_resc)[0]
  max_pr = df['pageRank'].max()
  return df.assign(fill = lambda df: df['pageRank'].map(lambda pr: get_fill_color(pr, max_pr)))

top_class_nodes_pd = assign_fill(top_class_nodes_pd)
top_w_nodes_pd = assign_fill(top_w_nodes_pd)

In [76]:
# Convert in dictionaries for cytoscape
def convert_to_dict(nodes, edges, topic = None):
  if topic is None:
    nodes_dict = nodes.to_dict('records')
    edges_dict = edges.to_dict('records')
    elements = []
    for n in nodes_dict:
      elements.append({'data': n, 'group':'nodes'})
    for e in edges_dict:
      elements.append({'data': e, 'group':'edges'})
    return elements
  else:
    topic_df = nodes[nodes['topic'] == topic]
    topic_df = assign_fill(topic_df)
    edges_sub = edges[(edges['source'].isin(topic_df['id'].to_numpy())) &
                      (edges['target'].isin(topic_df['id'].to_numpy()))]
    nodes_dict = topic_df.to_dict('records')
    edges_dict = edges_sub.to_dict('records')
    elements = []
    for n in nodes_dict:
      elements.append({'data': n, 'group':'nodes'})
    for e in edges_dict:
      elements.append({'data': e, 'group':'edges'})
    return elements

top_class_el = convert_to_dict(top_class_nodes_pd, top_class_edges_pd)
top_ts_el_list = [convert_to_dict(top_ts_nodes_pd, top_ts_edges_pd, t) for t in list(topics.value)]
top_w_el = convert_to_dict(top_w_nodes_pd, top_w_edges_pd)

In [77]:
# Function for generating color legend
def generate_color_legend(df):
  import numpy as np
  min_pr = df['pageRank'].min()
  max_pr = df['pageRank'].max()
  n=100
  rg = np.linspace(start=min_pr, stop=max_pr, num=n)
  sequences = [("temps", px.colors.diverging.Temps)]
  color_bar = go.Figure(
          data=[
              go.Bar(
                  orientation="h",
                  y=[name] * n,
                  x=np.array([(max_pr)/n]*(n)),
                  customdata=[x for x in rg],
                  marker=dict(color=list(rg), 
                              colorscale=name, 
                              line_width=0),
                  hovertemplate="%{customdata}"
              )
              for name, colors in reversed(sequences)
          ],
          layout=dict(
              title={"text": "PageRank value", "x" : 0.5},
              barmode="stack",
              bargap=0,
              showlegend=False,
              xaxis=dict(showticklabels=True, 
                        showgrid=True,
                        range=[min_pr, max_pr],
                        tickmode = 'array',
                        tickvals = np.linspace(start=min_pr, stop=max_pr, num=5)
                        ),
              yaxis=dict(visible=False),
              height=150,
              width=600,
              margin=dict(b=10)
          ))
  return color_bar

class_bar = generate_color_legend(top_class_nodes_pd)
w_bar = generate_color_legend(top_w_nodes_pd)
ts_bar_list = [generate_color_legend(top_ts_nodes_pd[top_ts_nodes_pd['topic'] == t]) for t in list(topics.value)]

In [78]:
def generate_ranking_graph(df, topic = None):
  if topic is None:
    fig = px.bar(df, x="pageRank", y="Ranking",
             orientation = 'h',
             barmode='group',
             text = "label",
             labels = {'pageRank' : 'PageRank value'}, width=900, height = 1500)
    fig.update_yaxes(type='category', autorange="reversed")
    fig.update_traces(textposition='outside')
  else:
    fig = px.bar(df[df['topic'] == topic], 
                 x="pageRank", y="Ranking",
                 orientation = 'h',
                 barmode='group',
                 text = "label",
                 labels = {'pageRank' : 'PageRank value'}, width=900, height = 900)
    fig.update_yaxes(type='category', autorange="reversed")
    fig.update_traces(textposition='outside')
  return fig

class_plot_rank = generate_ranking_graph(top_class_nodes_pd)
ts_plot_rank_list = [generate_ranking_graph(top_ts_nodes_pd, t) for t in list(topics.value)]
w_plot_rank = generate_ranking_graph(top_w_nodes_pd)

Launch the dashboard

In [80]:
app = JupyterDash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])
cyto_stylesheet = [
    {
        'selector': 'node',
        'style': {
            'label': 'data(label)',
            'background-color': 'data(fill)',
            'text-halign' : 'center',
            'text-valign' : 'top',
            'text-wrap' : 'wrap'
        }
    }, 
    {
      'selector': ':selected',
      'style': {
        'background-color': 'LightBlue'
      }
    }
]

pr_view = html.Div([
    cyto.Cytoscape(
      layout={'name': 'circle'},
      style={'width': '100%', 'height': '700px'},
      id="cyto_pr",
      elements=top_class_el,
      stylesheet = cyto_stylesheet, 
      autounselectify=False, 
      responsive=True
      )
], style= {'borderWidth': '1px', 
           'borderStyle': 'solid',
           'width': '100%',
           'height': '100%'},
  className='border-secondary rounded', id = "cyto-container")
col_1 = dbc.Col(children=[
   pr_view                      
], width = 9)
col_2 = dbc.Col([
   html.Label(['Graph layout', dcc.Dropdown(
    id='dropdown',
    value='circle',
    clearable=False,
    options=[
        {'label': name.capitalize(), 'value': name}
        for name in ['grid', 'random', 'circle', 'cose', 'concentric']
    ], 
    style={'width':'100%'}
   )], style={'width':'100%'}),
   dbc.Card([
       dbc.CardBody([
          html.H4("Topics", className="card-title"), 
          dbc.FormGroup([
            dbc.RadioItems(
                options=[
                  dict(label= t, value= list(topics.value).index(t)) for t in list(topics.value)  
                ],
                value=0,
                id="topic-choice"
            )
          ])
       ])      
   ], color = "info", outline=True, id = "topic-card", style={'display':'none'}),
   dbc.Card([
     dbc.CardBody([
        html.H4("INFO", className="card-title"),
        html.H6("""Select nodes to visualize associated 
        information""", className="card-subtitle"),
        html.Div(className="card-text", id = 'card-info-1')      
     ])
   ])             
], width = 3)
buttons = html.Div([dbc.ButtonGroup(
    [
        dbc.Button("Classic PageRank", style = {'marginRight': '5px'}, 
                   active=True, id='btn-prc'),
        dbc.Button("Topic-sensitive PageRank", style = {'marginRight': '5px'},
                   id='btn-prts'),
        dbc.Button("Weighted PageRank", style = {'paddingTop':'2px'},
                   id='btn-prw')
    ])])
row_1 = dbc.Row(children=[
   dbc.Col(buttons)                   
], style = {'marginBottom': '1rem'})
row_2 = dbc.Row(children=[
    col_1,
    col_2                    
])
row_3 = dbc.Row(children=[
   dbc.Col([
      html.Center([
        dcc.Graph(id = "colorscale", figure = class_bar, config={'autosizable':True})
      ], style = {'width':'100%', 'height':'150px'})      
   ], width=9)                                              
])
row_4 = dbc.Row(children=[
   dbc.Col([
      html.Center([
        dcc.Graph(id = "rank-plot", figure = class_plot_rank, config={'autosizable':True})
      ], style = {'width':'100%'})      
   ])                       
])

app.layout = dbc.Container(children=[row_1, row_2, row_3, row_4], 
                           id = 'app_container', 
                           fluid=True, style={'padding': '1rem'})

@app.callback(Output('card-info-1', 'children'),
              Input('cyto_pr', 'selectedNodeData'))
def display_node_info(selected):
  if selected is None or len(selected) == 0:
    return ''
  children = []
  for n in selected:
    children.append(
        html.Div([
          html.P([html.Span('Node id: ', style={'fontWeight':'bold'}),
                  html.Span(n.get("id"))]),
          html.P([html.Span('Name: ', style={'fontWeight':'bold'}),
                  html.Span(n.get("label"))]), 
          html.P([html.Span('Genres: ', style={'fontWeight':'bold'}),
                  html.Span(", ".join(n.get("genres")))]),    
          html.P([html.Span('PageRank: ', style={'fontWeight':'bold'}),
                  html.Span(n.get("pageRank"))]),                   
        ], style={'marginTop':'1em', 'padding':'1em'}, 
        className="border border-secondary rounded")
    )
  return children

@app.callback(Output('cyto_pr', 'layout'),
              Input('dropdown', 'value'))
def update_layout(layout):
    return {
        'name': layout,
        'animate': True
    }

@app.callback(Output("cyto_pr", "elements"),
              Output("topic-card", "style"),
              Output("colorscale", "figure"),
              Output("btn-prc", "active"),
              Output("btn-prts", "active"),
              Output("btn-prw", "active"),
              Output("rank-plot", "figure"),
              Input("btn-prc", "n_clicks"),
              Input("btn-prts", "n_clicks"),
              Input("btn-prw", "n_clicks"),
              Input("topic-choice", "value")
              )
def toggle_buttons(btn_c, btn_ts, bt_w, t_choice):
  ctx = dash.callback_context
  if not ctx.triggered:
    raise dash.exceptions.PreventUpdate
  else:
    comp_id = ctx.triggered[0]['prop_id'].split('.')[0]
  if comp_id == "btn-prc":
    return top_class_el, {'display':'none'}, class_bar, True, False, False, class_plot_rank
  elif comp_id == "btn-prts" or comp_id == "topic-choice":
    return top_ts_el_list[t_choice], {'marginBottom':'5px'}, ts_bar_list[t_choice], False, True, False, ts_plot_rank_list[t_choice]
  elif comp_id == "btn-prw":
    return top_w_el, {'display':'none'}, w_bar, False, False, True, w_plot_rank

if __name__ == '__main__':
    app.run_server(mode='inline', height=2000)
    #app.run_server(mode='external')

Dash app running on:


<IPython.core.display.Javascript object>