**Mount Google Drive**

In [1]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


**Setup**

In [2]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz
!tar xf spark-3.4.2-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
import os
import pandas as pd

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.2-bin-hadoop3"

import findspark

findspark.init()

from pyspark.sql import SparkSession
import random

spark = SparkSession.builder.appName("YourTest") \
    .master("local[2]") \
    .config('spark.ui.port', random.randrange(4000, 5000)) \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.3-spark3.4-s_2.12") \
    .getOrCreate()

from graphframes import GraphFrame
from graphframes.lib import Pregel
import pyspark.sql.functions as f

spark.sparkContext.setCheckpointDir("./checkpoint/")

In [4]:
# add loop to aviod deadend
df = pd.read_csv("/content/drive/MyDrive/twitch_gamers/large_twitch_edges_test.csv")
all_vertices = set(df['numeric_id_1']).union(set(df['numeric_id_2']))

# deadend
missing_vertices = [vertex for vertex in all_vertices if vertex not in set(df['numeric_id_1'])]

# add loop
missing_edges_df = pd.DataFrame({'numeric_id_1': missing_vertices, 'numeric_id_2': missing_vertices})

df = pd.concat([df, missing_edges_df], ignore_index=True)

df.to_csv('loop.csv')


In [5]:
# Read the excel file / rename columns
twitch_gamers = spark.read.format("csv").option("header", "true").load(
    "loop.csv").withColumnRenamed("numeric_id_1",
                                  "src").withColumnRenamed(
    "numeric_id_2", "dst")

vertices = twitch_gamers.select("src").union(twitch_gamers.select("dst")).distinct().withColumnRenamed("src", "id")

edges = twitch_gamers.select("src", "dst")
graph = GraphFrame(vertices, edges)

# Show graph
graph.vertices.show()
graph.edges.show()


+---+
| id|
+---+
|  3|
|  1|
|  2|
+---+

+---+---+
|src|dst|
+---+---+
|  1|  2|
|  1|  3|
|  2|  3|
|  3|  3|
+---+---+


**In degree function**

In [6]:
# Implement in_degree function
def out_Degrees(graph):
    out_degrees = graph.edges.groupBy("src").count().select(f.col("src").alias("id"),
                                                            f.col("count").alias("out_degree"))

    # for 0 indegree
    return graph.vertices.join(out_degrees, "id", "left_outer").select("id", "out_degree").na.fill(0)

**PageRank**

In [7]:
# Implement Page Rank

def page_rank(graph, resetProbability=0.15, sourceId=None, maxIter=None, tol=None):
    """
    Runs the PageRank algorithm on the graph.
    Note: Exactly one of fixed_num_iter or tolerance must be set.

    See Scala documentation for more details.

    :param resetProbability: Probability of resetting to a random vertex.
    :param sourceId: (optional) the source vertex for a personalized PageRank.
    :param maxIter: If set, the algorithm is run for a fixed number
            of iterations. This may not be set if the `tol` parameter is set.
    :param tol: If set, the algorithm is run until the given tolerance.
            This may not be set if the `numIter` parameter is set.
    :return:  GraphFrame with new vertices column "pagerank" and new edges column "weight"
    """

    num_vertex = graph.vertices.count()

    # Calculate out-degree
    out_degrees = out_Degrees(graph)
    out_degrees = out_degrees.withColumnRenamed("id", "out_degree_id")
    # Join the DataFrames and add the in_degree column
    vertices = graph.vertices.join(out_degrees, graph.vertices["id"] == out_degrees["out_degree_id"], "left").drop(
        "out_degree_id")
    # Update the graph with in_degree
    graph = GraphFrame(vertices, edges)

    # init rank
    if sourceId is not None:
        if maxIter is not None:
            ranks = graph.pregel \
                .setMaxIter(maxIter) \
                .withVertexColumn("pagerank", f.when(graph.vertices.id == sourceId, f.lit(1.0)) \
                                  .otherwise(f.lit(0.0)), \
                                  f.coalesce(Pregel.msg(), f.lit(0.0)) * f.lit(1.0 - resetProbability) + f.when(
                                      graph.vertices.id == sourceId, f.lit(resetProbability)).otherwise(f.lit(0.0))) \
                .sendMsgToDst(Pregel.src("pagerank") / Pregel.src("out_degree")) \
                .aggMsgs(f.sum(Pregel.msg())) \
                .run()
            return GraphFrame(ranks, graph.edges)
    elif maxIter is not None:
        ranks = graph.pregel \
            .setMaxIter(maxIter) \
            .withVertexColumn("pagerank", f.lit(1.0 / num_vertex), \
                              f.coalesce(Pregel.msg(), f.lit(0.0)) * f.lit(1.0 - resetProbability) + f.lit(
                                  resetProbability / num_vertex)) \
            .sendMsgToDst(Pregel.src("pagerank") / Pregel.src("out_degree")) \
            .aggMsgs(f.sum(Pregel.msg())) \
            .run()
        return GraphFrame(ranks, graph.edges)

    # running first iteration for the convergence verion
    if sourceId is not None:
        ranks = graph.pregel \
            .setMaxIter(1) \
            .withVertexColumn("pagerank", f.when(graph.vertices.id == sourceId, f.lit(1.0)) \
                              .otherwise(f.lit(0.0)), \
                              f.coalesce(Pregel.msg(), f.lit(0.0)) * f.lit(1.0 - resetProbability) + f.when(
                                  graph.vertices.id == sourceId, f.lit(resetProbability)).otherwise(f.lit(0.0))) \
            .sendMsgToDst(Pregel.src("pagerank") / Pregel.src("out_degree")) \
            .aggMsgs(f.sum(Pregel.msg())) \
            .run()
    else:
        ranks = graph.pregel \
            .setMaxIter(1) \
            .withVertexColumn("pagerank", f.lit(1.0 / num_vertex), \
                              f.coalesce(Pregel.msg(), f.lit(0.0)) * f.lit(1.0 - resetProbability) + f.lit(
                                  resetProbability / num_vertex)) \
            .sendMsgToDst(Pregel.src("pagerank") / Pregel.src("out_degree")) \
            .aggMsgs(f.sum(Pregel.msg())) \
            .run()
    # goes into convergence  check
    ranks = ranks.withColumnRenamed("pagerank", "previous_pagerank")
    graph = GraphFrame(ranks, graph.edges)
    # tol
    while 1:
        # new pagerank
        if sourceId is not None:
            new = graph.pregel \
                .setMaxIter(1) \
                .withVertexColumn("pagerank", graph.vertices["previous_pagerank"], \
                                  f.coalesce(Pregel.msg(), f.lit(0.0)) * f.lit(1.0 - resetProbability) + f.when(
                                      graph.vertices.id == sourceId, f.lit(resetProbability)).otherwise(f.lit(0.0))) \
                .sendMsgToDst(Pregel.src("pagerank") / Pregel.src("out_degree")) \
                .aggMsgs(f.sum(Pregel.msg())) \
                .run()
            new = new.drop("previous_pagerank")
        else:
            new = graph.pregel \
                .setMaxIter(1) \
                .withVertexColumn("pagerank", graph.vertices["previous_pagerank"], \
                                  f.coalesce(Pregel.msg(), f.lit(0.0)) * f.lit(1.0 - resetProbability) + f.lit(
                                      resetProbability / num_vertex)) \
                .sendMsgToDst(Pregel.src("pagerank") / Pregel.src("out_degree")) \
                .aggMsgs(f.sum(Pregel.msg())) \
                .run()
            new = new.drop("previous_pagerank")
        # difference is smaller than tol
        diff = new.join(graph.vertices, "id")
        diff = diff.withColumn('difference', f.abs(diff['previous_pagerank'] - diff['pagerank']))
        diff.show()

        smallest_difference = diff.orderBy(diff['difference'].desc()).select("difference").limit(1).first()[0]
        if smallest_difference < tol:
            return GraphFrame(new, graph.edges)
        new = new.withColumnRenamed("pagerank", "previous_pagerank")
        graph = GraphFrame(new, graph.edges)



In [8]:
page_rank(graph, maxIter=1).vertices.show()

+---+----------+--------------------+
| id|out_degree|            pagerank|
+---+----------+--------------------+
|  1|         2|0.049999999999999996|
|  2|         1| 0.19166666666666665|
|  3|         1|  0.7583333333333333|
+---+----------+--------------------+


In [9]:
page_rank(graph, sourceId="1", maxIter=1).vertices.show()

+---+----------+--------+
| id|out_degree|pagerank|
+---+----------+--------+
|  1|         2|    0.15|
|  2|         1|   0.425|
|  3|         1|   0.425|
+---+----------+--------+


In [10]:
page_rank(graph, tol=0.3).vertices.show()

+---+----------+--------------------+----------+--------------------+-------------------+
| id|out_degree|            pagerank|out_degree|   previous_pagerank|         difference|
+---+----------+--------------------+----------+--------------------+-------------------+
|  1|         2|0.049999999999999996|         2|0.049999999999999996|                0.0|
|  2|         1|             0.07125|         1| 0.19166666666666665|0.12041666666666666|
|  3|         1|             0.87875|         1|  0.7583333333333333|0.12041666666666673|
+---+----------+--------------------+----------+--------------------+-------------------+

+---+----------+--------------------+
| id|out_degree|            pagerank|
+---+----------+--------------------+
|  1|         2|0.049999999999999996|
|  2|         1|             0.07125|
|  3|         1|             0.87875|
+---+----------+--------------------+
