In [1]:
from pathlib import Path
from pyraphtory.context import PyRaphtory
from pyraphtory.algo import Vertex, Iterate, Step
import pandas as pd
!curl -o /tmp/lotr.csv https://raw.githubusercontent.com/Raphtory/Data/main/lotr.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 52206  100 52206    0     0   274k      0 --:--:-- --:--:-- --:--:--  272k


In [2]:
pr = PyRaphtory(spout_input=Path('/tmp/lotr.csv'), builder_script=Path('builder.py'),  builder_class='LotrGraphBuilder', mode='batch', logging=True).open()
rg = pr.graph()

b"17:53:54.267 [io-compute-3] INFO  com.raphtory.spouts.FileSpout - Spout: Processing file 'lotr.csv' ...\n"
b'17:53:54.422 [io-compute-3] INFO  com.raphtory.internals.management.Prometheus$ - Prometheus started on port /0:0:0:0:0:0:0:0:9999\n'
b'17:53:55.422 [spawner-akka.actor.default-dispatcher-3] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started\n'
b"17:53:55.729 [io-compute-3] INFO  com.raphtory.internals.management.PartitionsManager$ - Creating '1' Partition Managers for raphtory_1690389956.\n"
b'17:53:55.865 [io-compute-3] INFO  com.raphtory.internals.management.Py4JServer - Starting PythonGatewayServer...\n'
b'17:53:55.868 [io-compute-3] INFO  com.raphtory.internals.management.Py4JServer - Started PythonGatewayServer on port 59959 host: localhost/127.0.0.1\n'
b'17:53:55.881 [io-compute-3] INFO  com.raphtory.internals.management.Py4JServer - Writing PythonGatewayServer details to file...\n'
b'17:53:55.885 [io-compute-3] INFO  com.raphtory.internals.management.Py4JServer -

In [11]:
class PGStep(Step):
    def eval(self, v: Vertex):
        initLabel = 1.0
        v["prlabel"] = initLabel
        out_degree = v.out_degree()
        if out_degree > 0:
            msg = initLabel / out_degree
            v.message_outgoing_neighbours(msg)

class PGIterate(Iterate):
    def __init__(self, iterations: int, execute_messaged_only: bool, damping_factor: float = 0.85):
        super().__init__(iterations, execute_messaged_only)
        self.damping_factor = damping_factor

    def eval(self, v: Vertex):
        current_label = v["prlabel"]
        queue = v.message_queue()
        summed_queue = sum(queue)
        new_label = (1 - self.damping_factor) + self.damping_factor * summed_queue
        v["prlabel"] = new_label

        out_degree = v.out_degree()

        if out_degree > 0:
            v.message_outgoing_neighbours(new_label / out_degree)

        if abs(new_label - current_label) < 0.00001:
            v.vote_to_halt()



local_sink = pr.local_sink()

tracker = rg.at(32674) \
                .past() \
                .step(PGStep())\
                .iterate(PGIterate(iterations=100, execute_messaged_only=False))\
                .clear_messages()\
                .transform(pr.connected_components())\
                .select(["prlabel","cclabel"]) \
                .write_to(local_sink)\
                .wait_for_job()
                
                

res = local_sink.results()
#PageRank Written in Python
#Connected Components Written in scala
#Results from both
pd.DataFrame(list(map(lambda line: line.split(","),res)))

Unnamed: 0,0,1,2,3
0,32674,Hirgon,0.15000000000000002,-8637342647242242534
1,32674,Hador,0.24807333518209068,-8637342647242242534
2,32674,Horn,0.28189681864532734,-8637342647242242534
3,32674,Galadriel,1.2027625938738855,-8637342647242242534
4,32674,Isildur,0.15000000000000002,-8637342647242242534
...,...,...,...,...
134,32674,Faramir,4.614495665867431,-8637342647242242534
135,32674,Bain,0.21375000000000002,-6628080393138316116
136,32674,Walda,0.4409846050194102,-8637342647242242534
137,32674,Thranduil,0.41104877045276866,-8637342647242242534
