NOTE: 使用 graphframes 需要按照 https://spark-packages.org/package/graphframes/graphframes 方法，下载 对应 jar 包，否则没法使用 graphframes

然后将 %HOMEPATH%\.ivy2\jars\ 下载的 jar 文件，全部复制到 spark 安装目录下的 jars\ 文件夹。

NOTE 2: pyspark 版本，需要和 spark 版本一致，否则各种 bug

In [3]:
import glob
import os
import sys

from graphframes import *

# from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *

spark_home = "D:/ProgramData/spark/spark-2.4.7-bin-hadoop2.7"
driver_home = "D:/ProgramData/spark/spark-2.4.7-bin-hadoop2.7/jars"

if "SPARK_HOME" not in os.environ:
    os.environ["SPARK_HOME"] = spark_home

SPARK_HOME = os.environ["SPARK_HOME"]

# from pyspark.sql import SQLContext

conf = SparkConf()
conf.set("spark.executor.memory", "8g")
conf.set("spark.driver.memory", "8g")
conf.set("spark.cores.max", "2")
conf.set("spark.task.maxDirectResultSize", "8g")
conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
conf.set("spark.sql.broadcastTimeout", "3000")

# conf.set("spark.driver.extraClassPath",
#     driver_home+'/jdbc/postgresql-9.4-1201-jdbc41.jar:'\
#     +driver_home+'/jdbc/clickhouse-jdbc-0.1.52.jar:'\
#     +driver_home+'/mongo/mongo-spark-connector_2.11-2.2.3.jar:'\
#     +driver_home+'/mongo/mongo-java-driver-3.8.0.jar')

sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)

In [4]:
def create_transport_graph():
    node_fields = [
        StructField("id", StringType(), True),
        StructField("latitude", FloatType(), True),
        StructField("longitude", FloatType(), True),
        StructField("population", IntegerType(), True),
    ]

    nodes = spark.read.csv("data/transport-nodes.csv",
                           header=True,
                           schema=StructType(node_fields))

    relations = spark.read.csv("data/transport-relationships.csv", header=True)

    reversed_rels = (relations.withColumn("newSrc", relations.dst).withColumn(
        "newDst", relations.src).drop("dst", "src").withColumnRenamed(
            "newSrc",
            "src").withColumnRenamed("newDst",
                                     "dst").select("src", "dst", "relationship",
                                                   "cost"))

    relation_all = relations.union(reversed_rels)

    return GraphFrame(nodes, relation_all)

In [5]:
g = create_transport_graph()

In [6]:
(g.vertices.filter("population > 100000 and population < 300000").sort(
    "population").show())

+----------+--------+---------+----------+
|        id|latitude|longitude|population|
+----------+--------+---------+----------+
|Colchester|51.88921|  0.90421|    104390|
|   Ipswich|52.05917|  1.15545|    133384|
+----------+--------+---------+----------+



## SPARK 广度优先

In [7]:
from_expr = "id='Den Haag'"
to_expr = "population > 100000 and population < 300000 and id <> 'Den Haag'"
result = g.bfs(from_expr, to_expr)

In [8]:
print(result.columns)  # e 边    v 节点

['from', 'e0', 'v1', 'e1', 'v2', 'e2', 'to']


In [9]:
columns = [column for column in result.columns if not column.startswith("e")]
result.select(columns).show()

+--------------------+--------------------+--------------------+--------------------+
|                from|                  v1|                  v2|                  to|
+--------------------+--------------------+--------------------+--------------------+
|[Den Haag, 52.078...|[Hoek van Holland...|[Felixstowe, 51.9...|[Ipswich, 52.0591...|
+--------------------+--------------------+--------------------+--------------------+



## NEO4J 最短路径

pip install neo4j-driver

修改配置为：

#dbms.directories.import=import

dbms.security.allow_csv_import_from_file_urls=true


工具包：

dbms.security.procedures.unrestricted=apoc.\*,gds.\*

dbms.security.procedures.whitelist=apoc.\*,gds.\*

In [2]:
from neo4j import GraphDatabase

CONFIG = {
    "uri": "bolt://localhost:7687",
    "auth": ("neo4j", "123456"),
    "encrypted": False,
}

driver = GraphDatabase.driver(**CONFIG)

In [19]:
def load_csv(context, path):
    "节点"
    context.run(
        "WITH $path_ AS uri "
        "LOAD CSV WITH HEADERS FROM uri AS row "
        "MERGE (place:Place {id:row.id}) "
        "SET place.latitude = toFloat(row.latitude), place.longitude = toFloat(row.longitude), place.population = toInteger(row.population)",
        path_=path,
    )


with driver.session() as sess:
    sess.write_transaction(
        load_csv,
        "file:///D:/Code/Machine_Learing/Spark/GraphAlgo/data/transport-nodes.csv",
    )

In [20]:
def load_rel_csv(context, path):
    "关系"
    context.run(
        "WITH $path_ AS uri "
        "LOAD CSV WITH HEADERS FROM uri AS row "
        "MATCH (origin:Place {id: row.src}) "
        "MATCH (destination:Place {id: row.dst})"
        "MERGE (origin)-[:EROAD {distance: toInteger(row.cost)}]->(destination)",
        path_=path,
    )


with driver.session() as sess:
    sess.write_transaction(
        load_rel_csv,
        "file:///D:/Code/Machine_Learing/Spark/GraphAlgo/data/transport-relationships.csv",
    )

![img](./img/1.jpg)

使用 Graph Data Science Library plugins 中的算法，所有算法默认是无向图，可手动设置 direction 为 "OUTGOING" "INCOMING"

In [None]:
def load_to_memo(context):
    "gds 先创建内存数据投影"
    res = context.run('call gds.graph.create("shortPath","Place","EROAD") '
                      "YIELD graphName, nodeCount, relationshipCount;")
    return res


with driver.session() as sess:
    res = sess.write_transaction(load_to_memo)
    print(res)

In [41]:
def shortestpath(context):
    res = context.run(
        'MATCH (source:Place {id:"Amsterdam"}),(destination:Place {id:"London"}) '
        "CALL gds.alpha.shortestPath.stream({nodeProjection: 'Place', relationshipProjection: 'EROAD', startNode: source, endNode: destination, relationshipWeightProperty: null}) "
        "YIELD nodeId, cost "
        "RETURN gds.util.asNode(nodeId).id AS place, cost")
    values = []
    for record in res:
        values.append(record.values())
    return values


with driver.session() as sess:
    res = sess.write_transaction(shortestpath)
    print(res)

[['Amsterdam', 0.0], ['Immingham', 1.0], ['Doncaster', 2.0], ['London', 3.0]]


In [21]:
def shortestpath(context):
    "加入 relationshipWeightProperty 作为 weight"
    res = context.run("""
                        MATCH (source:Place {id: "Amsterdam"}),
                              (destination:Place {id: "London"})
                        CALL gds.alpha.shortestPath.stream({
                            nodeProjection: 'Place', 
                            relationshipProjection:{
                                EROAD:{
                                type: 'EROAD',
                                properties: 'distance',
                                orientation: 'UNDIRECTED'
                                }},
                            startNode: source,
                            endNode: destination,
                            relationshipWeightProperty: 'distance'
                        })
                        YIELD nodeId, cost
                        RETURN gds.util.asNode(nodeId).id AS name, cost
                        """)
    values = []
    for record in res:
        values.append(record.values())
    return values


with driver.session() as sess:
    res = sess.read_transaction(shortestpath)
    print(res)

[['Amsterdam', 0.0], ['Den Haag', 59.0], ['Hoek van Holland', 86.0], ['Felixstowe', 293.0], ['Ipswich', 315.0], ['Colchester', 347.0], ['London', 453.0]]


## SPARK 加权最短路径

In [10]:
from graphframes import *
from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [11]:
# 构建 源 与 目标 节点的路径
add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))

In [12]:
# g = create_transport_graph()

In [13]:
def shortest_path(g, origin, destination, column_name="cost"):
    if g.vertices.filter(g.vertices.id == destination).count() == 0:
        return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
                .withColumn("path", F.array()))

    # 节点增加 visited 标记，初始化 与源节点的 distance，初始化 path array
    vertices = (g.vertices.withColumn("visited", F.lit(False))
                .withColumn(
                    "distance",
                    F.when(g.vertices["id"] == origin, 0).otherwise(float("inf")))
                .withColumn("path", F.array()))

    # 防止内存泄露，缓存在内存中
    cached_vertices = AM.getCachedDataFrame(vertices)

    g2 = GraphFrame(cached_vertices, g.edges)
    while g2.vertices.filter("visited == False").first():
        current_node_id = (
            g2.vertices.filter("visited == False").sort("distance").first().id)
        
        # 计算下一步可能的最短路径，记录 path
        msg_distance = AM.edge[column_name] + AM.src["distance"]
        msg_path = add_path_udf(AM.src["path"], AM.src["id"])
        msg_for_dst = F.when(AM.src["id"] == current_node_id,
                             F.struct(msg_distance, msg_path))
        new_distances = g2.aggregateMessages(F.min(AM.msg).alias("aggMess"),
                                             sendToDst=msg_for_dst)
        
        # 标记 visit
        new_visited_col = F.when(
            g2.vertices.visited | (g2.vertices.id == current_node_id),
            True).otherwise(False)
        # 更新最短路径
        new_distance_col = F.when(
            new_distances["aggMess"].isNotNull() &
            (new_distances.aggMess["col1"] < g2.vertices.distance),
            new_distances.aggMess["col1"],
          ).otherwise(g2.vertices.distance)
        # 更新 path
        new_path_col = F.when(
            new_distances["aggMess"].isNotNull() &
            (new_distances.aggMess["col1"] < g2.vertices.distance),
            new_distances.aggMess["col2"].cast("array<string>"),
          ).otherwise(g2.vertices.path)
        
        # 更新vertice信息
        new_vertices = (g2.vertices.join(new_distances, on="id", how="left_outer")
                        .drop(new_distances["id"])
                        .withColumn("visited", new_visited_col)
                        .withColumn("newDistance", new_distance_col)
                        .withColumn("newPath", new_path_col)
                        .drop("aggMess", "distance", "path")
                        .withColumnRenamed('newDistance', 'distance')
                        .withColumnRenamed('newPath', 'path'))
        
        cached_new_vertices = AM.getCachedDataFrame(new_vertices)
        
        g2 = GraphFrame(cached_new_vertices, g2.edges)
        # 找到结果
        if g2.vertices.filter(g2.vertices.id == destination).first().visited:
            return (g2.vertices.filter(g2.vertices.id == destination)
                    .withColumn("newPath", add_path_udf("path", "id"))
                    .drop("visited", "path")
                    .withColumnRenamed("newPath", "path"))
    # 没有结果
    return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
            .withColumn("path", F.array()))

In [14]:
result = shortest_path(g, "Amsterdam", "Colchester", "cost")
result.select("id", "distance", "path").show(truncate=False)

+----------+--------+------------------------------------------------------------------------+
|id        |distance|path                                                                    |
+----------+--------+------------------------------------------------------------------------+
|Colchester|347.0   |[Amsterdam, Den Haag, Hoek van Holland, Felixstowe, Ipswich, Colchester]|
+----------+--------+------------------------------------------------------------------------+



## NEO4J A*

In [15]:
from neo4j import GraphDatabase

CONFIG = {
    "uri": "bolt://localhost:7687",
    "auth": ("neo4j", "123456"),
    "encrypted": False,
}

driver = GraphDatabase.driver(**CONFIG)

In [19]:
def astar(context):
    "propertyKeyLat, propertyKeyLon计算地理坐标时使用"
    res = context.run("""
                        MATCH  (source:Place {id: "Den Haag"}),(destination:Place {id: "London"})
                        CALL gds.alpha.shortestPath.astar.stream({
                            nodeProjection: {Place: {properties: ['longitude', 'latitude']}},
                            relationshipProjection: {EROAD: {type: 'EROAD',orientation: 'UNDIRECTED',properties: 'distance'}},
                            startNode: source,
                            endNode: destination,
                            relationshipWeightProperty: 'distance',
                            propertyKeyLat: 'latitude',
                            propertyKeyLon: 'longitude'})
                        YIELD nodeId, cost
                        RETURN gds.util.asNode(nodeId).id AS station, cost
                        """)
    values = []
    for record in res:
        values.append(record.values())
    return values


with driver.session() as sess:
    res = sess.read_transaction(astar)
    print(res)

[['Den Haag', 0.0], ['Hoek van Holland', 27.0], ['Felixstowe', 234.0], ['Ipswich', 256.0], ['Colchester', 288.0], ['London', 394.0]]


## NEO4J Yen

In [20]:
def yen(context):
    "计算 top k 的最短路径"
    res = context.run("""
                        MATCH (start:Place {id:"Gouda"}),
                              (end:Place {id:"Felixstowe"})
                        CALL gds.alpha.kShortestPaths.stream({
                            nodeProjection:{Place: {properties: ['longitude', 'latitude']}},
                            relationshipProjection: {EROAD: {type: 'EROAD',orientation: 'UNDIRECTED',properties: 'distance'}},
                            startNode:start,
                            endNode:end,
                            k:5,
                            relationshipWeightProperty: 'distance'})
                        YIELD index, nodeIds, costs
                        RETURN index, [node IN gds.util.asNodes(nodeIds) | node.id] AS places, reduce(acc = 0.0, cost IN costs | acc + cost) AS totalCost
                        """)
    values = []
    for record in res:
        values.append(record.values())
    return values


with driver.session() as sess:
    res = sess.read_transaction(yen)
    print(res)

[[0, ['Gouda', 'Rotterdam', 'Hoek van Holland', 'Felixstowe'], 265.0], [1, ['Gouda', 'Den Haag', 'Hoek van Holland', 'Felixstowe'], 266.0], [2, ['Gouda', 'Rotterdam', 'Den Haag', 'Hoek van Holland', 'Felixstowe'], 285.0], [3, ['Gouda', 'Den Haag', 'Rotterdam', 'Hoek van Holland', 'Felixstowe'], 298.0], [4, ['Gouda', 'Utrecht', 'Amsterdam', 'Den Haag', 'Hoek van Holland', 'Felixstowe'], 374.0]]


## 所有点对最短路径

### SPARK

In [22]:
result = g.shortestPaths(["Colchester", "Immingham", "Hoek van Holland"])
result.sort(["id"]).select("id", "distances").show(truncate=False)

+----------------+--------------------------------------------------------+
|id              |distances                                               |
+----------------+--------------------------------------------------------+
|Amsterdam       |[Immingham -> 1, Hoek van Holland -> 2, Colchester -> 4]|
|Colchester      |[Colchester -> 0, Immingham -> 3, Hoek van Holland -> 3]|
|Den Haag        |[Hoek van Holland -> 1, Immingham -> 2, Colchester -> 4]|
|Doncaster       |[Immingham -> 1, Colchester -> 2, Hoek van Holland -> 4]|
|Felixstowe      |[Hoek van Holland -> 1, Colchester -> 2, Immingham -> 4]|
|Gouda           |[Hoek van Holland -> 2, Immingham -> 3, Colchester -> 5]|
|Hoek van Holland|[Hoek van Holland -> 0, Immingham -> 3, Colchester -> 3]|
|Immingham       |[Immingham -> 0, Colchester -> 3, Hoek van Holland -> 3]|
|Ipswich         |[Colchester -> 1, Hoek van Holland -> 2, Immingham -> 4]|
|London          |[Colchester -> 1, Immingham -> 2, Hoek van Holland -> 4]|
|Rotterdam  

### NEO

In [23]:
def allpair(context):
    res = context.run("""
                        CALL gds.alpha.allShortestPaths.stream({
                            nodeProjection:{Place: {properties: ['longitude', 'latitude']}},
                            relationshipProjection: {EROAD: {type: 'EROAD',orientation: 'UNDIRECTED'}},
                            relationshipWeightProperty: null})
                        YIELD sourceNodeId, targetNodeId, distance
                        WHERE sourceNodeId < targetNodeId
                        RETURN gds.util.asNode(sourceNodeId).id AS source,
                               gds.util.asNode(targetNodeId).id AS target,
                               distance
                        ORDER BY distance DESC
                        LIMIT 10
                        """)
    values = []
    for record in res:
        values.append(record.values())
    return values


with driver.session() as sess:
    res = sess.read_transaction(allpair)
    print(res)

[['Colchester', 'Gouda', 5.0], ['Utrecht', 'Ipswich', 5.0], ['London', 'Rotterdam', 5.0], ['London', 'Gouda', 5.0], ['Utrecht', 'Colchester', 5.0], ['Den Haag', 'Colchester', 4.0], ['Amsterdam', 'Colchester', 4.0], ['Utrecht', 'London', 4.0], ['Doncaster', 'Felixstowe', 4.0], ['Amsterdam', 'Ipswich', 4.0]]


In [24]:
def allpair(context):
    "加权"
    res = context.run("""
                        CALL gds.alpha.allShortestPaths.stream({
                            nodeProjection:{Place: {properties: ['longitude', 'latitude']}},
                            relationshipProjection: {EROAD: {type: 'EROAD',orientation: 'UNDIRECTED',properties: 'distance'}},
                            relationshipWeightProperty: 'distance'})
                        YIELD sourceNodeId, targetNodeId, distance
                        WHERE sourceNodeId < targetNodeId
                        RETURN gds.util.asNode(sourceNodeId).id AS source,
                               gds.util.asNode(targetNodeId).id AS target,
                               distance
                        ORDER BY distance DESC
                        LIMIT 10
                        """)
    values = []
    for record in res:
        values.append(record.values())
    return values


with driver.session() as sess:
    res = sess.read_transaction(allpair)
    print(res)

[['Doncaster', 'Hoek van Holland', 529.0], ['Doncaster', 'Rotterdam', 528.0], ['Doncaster', 'Gouda', 524.0], ['Immingham', 'Felixstowe', 511.0], ['Den Haag', 'Doncaster', 502.0], ['Immingham', 'Ipswich', 489.0], ['Utrecht', 'Doncaster', 489.0], ['Utrecht', 'London', 460.0], ['Immingham', 'Colchester', 457.0], ['Immingham', 'Hoek van Holland', 455.0]]


## 单源最短路径

### SPARK

In [26]:
add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))


def sssp(g, origin, column_name="cost"):
    "和 加权最短路径部分的不同在于，这里计算与所有节点的最短路径"
    vertices = g.vertices \
        .withColumn("visited", F.lit(False)) \
        .withColumn("distance",
            F.when(g.vertices["id"] == origin, 0).otherwise(float("inf"))) \
        .withColumn("path", F.array())
    cached_vertices = AM.getCachedDataFrame(vertices)
    g2 = GraphFrame(cached_vertices, g.edges)

    while g2.vertices.filter('visited == False').first():
        current_node_id = g2.vertices.filter('visited == False').sort("distance").first().id

        msg_distance = AM.edge[column_name] + AM.src['distance']
        msg_path = add_path_udf(AM.src["path"], AM.src["id"])
        msg_for_dst = F.when(AM.src['id'] == current_node_id, F.struct(msg_distance, msg_path))
        new_distances = g2.aggregateMessages(
            F.min(AM.msg).alias("aggMess"), sendToDst=msg_for_dst)

        new_visited_col = F.when(
            g2.vertices.visited | (g2.vertices.id == current_node_id), True).otherwise(False)
        new_distance_col = F.when(new_distances["aggMess"].isNotNull() &
                                  (new_distances.aggMess["col1"] < g2.vertices.distance),
                                  new_distances.aggMess["col1"]) \
                            .otherwise(g2.vertices.distance)
        new_path_col = F.when(new_distances["aggMess"].isNotNull() &
                              (new_distances.aggMess["col1"] < g2.vertices.distance),
                              new_distances.aggMess["col2"].cast("array<string>")) \
                        .otherwise(g2.vertices.path)

        new_vertices = g2.vertices.join(new_distances, on="id", how="left_outer") \
            .drop(new_distances["id"]) \
            .withColumn("visited", new_visited_col) \
            .withColumn("newDistance", new_distance_col) \
            .withColumn("newPath", new_path_col) \
            .drop("aggMess", "distance", "path") \
            .withColumnRenamed('newDistance', 'distance') \
            .withColumnRenamed('newPath', 'path')
        cached_new_vertices = AM.getCachedDataFrame(new_vertices)
        g2 = GraphFrame(cached_new_vertices, g2.edges)

    return g2.vertices \
                .withColumn("newPath", add_path_udf("path", "id")) \
                .drop("visited", "path") \
                .withColumnRenamed("newPath", "path")

In [27]:
via_udf = F.udf(lambda path: path[1:-1], ArrayType(StringType()))

result = sssp(g, "Amsterdam", "cost")
(result
 .withColumn("via", via_udf("path"))
 .select("id", "distance", "via")
 .sort("distance")
 .show(truncate=False))

+----------------+--------+-------------------------------------------------------------+
|id              |distance|via                                                          |
+----------------+--------+-------------------------------------------------------------+
|Amsterdam       |0.0     |[]                                                           |
|Utrecht         |46.0    |[]                                                           |
|Den Haag        |59.0    |[]                                                           |
|Gouda           |81.0    |[Utrecht]                                                    |
|Rotterdam       |85.0    |[Den Haag]                                                   |
|Hoek van Holland|86.0    |[Den Haag]                                                   |
|Felixstowe      |293.0   |[Den Haag, Hoek van Holland]                                 |
|Ipswich         |315.0   |[Den Haag, Hoek van Holland, Felixstowe]                     |
|Colcheste

### NEO

In [28]:
def sssp(context):
    "deltaStepping: 将 Dijkstra 划分为多个可并行阶段"
    res = context.run("""
                        MATCH (n:Place {id:"Amsterdam"})
                        CALL gds.alpha.shortestPath.deltaStepping.stream({
                            nodeProjection:{Place: {properties: ['longitude', 'latitude']}},
                            relationshipProjection: {EROAD: {type: 'EROAD',orientation: 'UNDIRECTED',properties: 'distance'}},
                            startNode: n,
                            relationshipWeightProperty: 'distance',
                            delta: 3.0})
                        YIELD nodeId, distance
                        RETURN gds.util.asNode(nodeId).id AS destination, distance
                        ORDER BY distance
                        """)
    values = []
    for record in res:
        values.append(record.values())
    return values


with driver.session() as sess:
    res = sess.read_transaction(sssp)
    print(res)

[['Amsterdam', 0.0], ['Utrecht', 46.0], ['Den Haag', 59.0], ['Gouda', 81.0], ['Rotterdam', 85.0], ['Hoek van Holland', 86.0], ['Felixstowe', 293.0], ['Ipswich', 315.0], ['Colchester', 347.0], ['Immingham', 369.0], ['Doncaster', 443.0], ['London', 453.0]]


## 最小生成树

In [29]:
def minSpanTree(context):
    "Prim。  writeProperty向图中写入数据"
    res = context.run("""
                        MATCH (n:Place {id:"Amsterdam"})
                        CALL gds.alpha.spanningTree.minimum.write({
                            nodeProjection:{Place: {properties: ['longitude', 'latitude']}},
                            relationshipProjection: {EROAD: {type: 'EROAD',orientation: 'UNDIRECTED',properties: 'distance'}},
                            startNodeId: id(n),
                            relationshipWeightProperty: 'distance',
                            writeProperty: 'MINST',
                            weightWriteProperty: 'writeCost'
                        })
                        YIELD createMillis, computeMillis, writeMillis, effectiveNodeCount
                        RETURN createMillis, computeMillis, writeMillis, effectiveNodeCount
                        """)
    values = []
    for record in res:
        values.append(record.values())
    return values


with driver.session() as sess:
    res = sess.write_transaction(minSpanTree)
    print(res)

[[13, 1, 874, 12]]


可在 neo4j 浏览器中查询

```
MATCH path = (n:Place {id:"Amsterdam"})-[:MINST*]-()
WITH relationships(path) AS rels
UNWIND rels AS rel
WITH DISTINCT rel AS rel
RETURN startNode(rel).id AS source, endNode(rel).id AS destination, rel.writeCost AS cost
```

```
╒══════════════════╤══════════════════╤══════╕
│"source"          │"destination"     │"cost"│
╞══════════════════╪══════════════════╪══════╡
│"Amsterdam"       │"Utrecht"         │46.0  │
├──────────────────┼──────────────────┼──────┤
│"Utrecht"         │"Gouda"           │35.0  │
├──────────────────┼──────────────────┼──────┤
│"Gouda"           │"Rotterdam"       │25.0  │
├──────────────────┼──────────────────┼──────┤
│"Rotterdam"       │"Den Haag"        │26.0  │
├──────────────────┼──────────────────┼──────┤
│"Den Haag"        │"Hoek van Holland"│27.0  │
├──────────────────┼──────────────────┼──────┤
│"Hoek van Holland"│"Felixstowe"      │207.0 │
├──────────────────┼──────────────────┼──────┤
│"Felixstowe"      │"Ipswich"         │22.0  │
├──────────────────┼──────────────────┼──────┤
│"Ipswich"         │"Colchester"      │32.0  │
├──────────────────┼──────────────────┼──────┤
│"Colchester"      │"London"          │106.0 │
├──────────────────┼──────────────────┼──────┤
│"London"          │"Doncaster"       │277.0 │
├──────────────────┼──────────────────┼──────┤
│"Doncaster"       │"Immingham"       │74.0  │
└──────────────────┴──────────────────┴──────┘
```

## 随机游走

In [30]:
def randwalk(context):
    "steps： 跳数； walks: 多少次walk"
    res = context.run("""
                        MATCH (source:Place {id: "London"})
                        CALL gds.alpha.randomWalk.stream({nodeProjection:'*',
                            relationshipProjection: {EROAD: {type: 'EROAD',orientation: 'UNDIRECTED'}},
                            start: id(source),
                            steps: 5,
                            walks: 1})
                        YIELD nodeIds
                        UNWIND nodeIds AS nodeId
                        RETURN gds.util.asNode(nodeId).id AS place
                        """)
    values = []
    for record in res:
        values.append(record.values())
    return values


with driver.session() as sess:
    res = sess.read_transaction(randwalk)
    print(res)

[['London'], ['Colchester'], ['London'], ['Doncaster'], ['Immingham'], ['Doncaster']]
