# [Graph Algorithms: Practical Examples in Apache Spark and Neo4j](https://neo4j.com/graph-algorithms-book/)

- https://github.com/neo4j-graph-analytics/book
- https://resources.oreilly.com/examples/0636920233145

In [None]:
%%bash

mkdir -p data vendor

curl -sL 'https://resources.oreilly.com/examples/0636920233145/-/archive/master/0636920233145-master.tar.bz2' |
  tar --strip-components=1 -xjC data
curl -sL 'https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/3.5.0.9/apoc-3.5.0.9-all.jar' \
  >vendor/apoc-3.5.0.9-all.jar
curl -sL 'https://s3-eu-west-1.amazonaws.com/com.neo4j.graphalgorithms.dist/neo4j-graph-algorithms-3.5.9.0-standalone.jar' \
  >vendor/neo4j-graph-algorithms-3.5.9.0-standalone.jar

## Chapter 3. Graph Platforms and Processing

### PySpark

```bash
pyspark --packages=graphframes:graphframes:0.8.0-spark2.4-s_2.11
```

### Neo4j

See https://github.com/neo4j-contrib/neo4j-apoc-procedures/#manual-installation-download-latest-release.

```bash
d=/tmp/neo4j-graph-analytics/neo4j bash -c 'mkdir -p $d/data $d/import $d/plugins'
cp -t /tmp/neo4j-graph-analytics/neo4j/plugins \
  vendor/apoc-3.5.0.9-all.jar \
  vendor/neo4j-graph-algorithms-3.5.9.0-standalone.jar
cp -R data/data /tmp/neo4j-graph-analytics/neo4j/import
docker run --rm -e NEO4J_AUTH=none -p 7474:7474 -p 7687:7687 \
  -e NEO4J_dbms_security_procedures_unrestricted='algo.*,apoc.\\\*' \
  -v /tmp/neo4j-graph-analytics/neo4j/data:/data \
  -v /tmp/neo4j-graph-analytics/neo4j/import:/import \
  -v /tmp/neo4j-graph-analytics/neo4j/plugins:/plugins \
  neo4j:3.5.9
```

## Chapter 4. Pathfinding and Graph Search Algorithms

### Breadth First Search

- Edward F. Moore, 1959
- C. Y. Lee, "An Algorithm for Path Connections and Its Applications," in IRE Transactions on Electronic Computers, vol. EC-10, no. 3, pp. 346-365, Sept. 1961.
  https://ieeexplore.ieee.org/document/5219222


#### Breadth First Search with Apache Spark

In [None]:
import pandas as pd
from graphframes import GraphFrame
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("neo4j-graph-analytics")
    .config("spark.packages", "graphframes:graphframes:0.8.0-spark2.4-s_2.11")
    .getOrCreate()
)
g = GraphFrame(
    spark.read.option("inferSchema", True).csv(
        "data/data/transport-nodes.csv", header=True
    ),
    (
        spark.read.option("inferSchema", True)
        .csv("data/data/transport-relationships.csv", header=True)
        .createOrReplaceTempView("transport_relationships")
        or spark.sql(
            """
            select src, dst, relationship, cost from transport_relationships
            union
            select dst as src, src as dst, relationship, cost from transport_relationships
            """
        )
    ),
)

In [None]:
(
    g.vertices.createOrReplaceTempView("_")
    or spark.sql(
        "select * from _ where 100000<population and population<300000 order by population"
    ).show()
)
g.vertices.filter("100000<population and population<300000").sort("population").show()
g.bfs(
    "id='Den Haag'", "100000<population and population<300000 and id!='Den Haag'",
).show()

In [None]:
g.bfs("id='Amsterdam'", "id='London'").toPandas().filter(regex=r'^e').T

### Shortest Path

- Edsger Dijkstra, 1956

#### Shortest Path with Neo4j

In [None]:
from neo4j import GraphDatabase

driver = GraphDatabase.driver("bolt://localhost:7687")
with driver.session() as session:
    for cypher in [
        r"""
        LOAD CSV WITH HEADERS FROM 'file:///data/transport-nodes.csv' AS row
        MERGE (p:Place {id: row.id})
        SET p.latitude = toFloat(row.latitude),
        p.longitude = toFloat(row.longitude),
        p.population = toInt(row.population);
        """,
        r"""
        CREATE INDEX ON:Place(id);
        """,
        r"""
        LOAD CSV WITH HEADERS FROM 'file:///data/transport-relationships.csv' AS row
        MERGE (src:Place {id: row.src})
        MERGE (dst:Place {id: row.dst})
        WITH src, dst, {distance: toInt(row.cost)} AS props, row
        CALL apoc.merge.relationship(src, row.relationship, props, props, dst, props)
        YIELD rel
        RETURN count(rel);
        """,
    ]:
        session.write_transaction(lambda tx: tx.run(cypher))

In [None]:
with driver.session() as session:
    cypher = r"""
    MATCH (src:Place {id: 'Amsterdam'}), (dst:Place {id: 'London'})
    CALL algo.shortestPath.stream(src, dst, null)
    YIELD nodeId, cost
    RETURN algo.getNodeById(nodeId).id AS place, cost;
    """

    session.read_transaction(
        lambda tx: print(*(dict(record) for record in tx.run(cypher)), sep="\n")
    )

#### Shortest Path (Weighted) with Neo4j

In [None]:
with driver.session() as session:
    cypher = r"""
    MATCH (src:Place {id: 'Amsterdam'}), (dst:Place {id: 'London'})
    CALL algo.shortestPath.stream(src, dst, 'distance')
    YIELD nodeId, cost
    RETURN algo.getNodeById(nodeId).id AS place, cost;
    """

    session.read_transaction(
        lambda tx: print(*(dict(record) for record in tx.run(cypher)), sep="\n")
    )

#### Shortest Path (Weighted) with Apache Spark

**TBD**

#### Shortest Path Variation: A*

- P. E. Hart, N. J. Nilsson and B. Raphael, "A Formal Basis for the Heuristic Determination of Minimum Cost Paths," in IEEE Transactions on Systems Science and Cybernetics, vol. 4, no. 2, pp. 100-107, July 1968.
  https://ieeexplore.ieee.org/document/4082128

In [None]:
with driver.session() as session:
    cypher = r"""
    MATCH (src:Place {id: 'Den Haag'}), (dst:Place {id: 'London'})
    CALL algo.shortestPath.astar.stream(src, dst, 'distance', 'latitude', 'longitude')
    YIELD nodeId, cost
    RETURN algo.getNodeById(nodeId).id AS place, cost;
    """

    session.read_transaction(
        lambda tx: print(*(dict(record) for record in tx.run(cypher)), sep="\n")
    )

#### Shortest Path Variation: Yen’s k-Shortest Paths

- Yen, Jin Y. “Finding the K Shortest Loopless Paths in a Network.” Management Science, vol. 17, no. 11, 1971, pp. 712–716. JSTOR, www.jstor.org/stable/2629312. Accessed 29 Mar. 2020.
  https://www.jstor.org/stable/2629312

In [None]:
with driver.session() as session:
    cypher = r"""
    MATCH (src:Place {id: 'Gouda'}), (dst:Place {id: 'Felixstowe'})
    CALL algo.kShortestPaths.stream(src, dst, 5, 'distance')
    YIELD index, nodeIds, path, costs
    RETURN index,
           [node IN algo.getNodesById(nodeIds[1..-1]) | node.id] AS via,
           reduce(totalCost = 0.0, cost IN costs | totalCost + cost) AS totalCost;
    """

    session.read_transaction(
        lambda tx: print(*(dict(record) for record in tx.run(cypher)), sep="\n")
    )

### All Pairs Shortest Path
#### All Pairs Shortest Path with Apache Spark

In [None]:
g.shortestPaths(["Colchester", "Immingham", "Hoek van Holland"]).toPandas().pipe(
    lambda df: df[["id"]].join(df.distances.apply(pd.Series))
)

#### All Pairs Shortest Path with Neo4j

In [None]:
with driver.session() as session:
    cypher = r"""
    CALL algo.allShortestPaths.stream(null)
    YIELD sourceNodeId, targetNodeId, distance
    WITH algo.getNodeById(sourceNodeId).id AS src, algo.getNodeById(targetNodeId).id AS dst, distance
      WHERE src < dst
    RETURN src, dst, distance
      ORDER BY distance DESC, src, dst
      LIMIT 10;
    """

    session.read_transaction(
        lambda tx: print(*(dict(record) for record in tx.run(cypher)), sep="\n")
    )

In [None]:
with driver.session() as session:
    cypher = r"""
    CALL algo.allShortestPaths.stream('distance')
    YIELD sourceNodeId, targetNodeId, distance
    WITH algo.getNodeById(sourceNodeId).id AS src, algo.getNodeById(targetNodeId).id AS dst, distance
      WHERE src < dst
    RETURN src, dst, distance
      ORDER BY distance DESC, src, dst
      LIMIT 10;
    """

    session.read_transaction(
        lambda tx: print(*(dict(record) for record in tx.run(cypher)), sep="\n")
    )


### Single Source Shortest Path
#### Single Source Shortest Path with Apache Spark
**TBD**

#### Single Source Shortest Path with Neo4j

In [None]:
with driver.session() as session:
    cypher = r"""
    MATCH (p:Place {id: 'London'})
    CALL algo.shortestPath.deltaStepping.stream(p, 'distance', 1.0)
    YIELD nodeId, distance
    WITH p.id AS src, algo.getNodeById(nodeId).id AS dst, distance
      WHERE src <> dst
    RETURN src, dst, distance
      ORDER BY distance;
    """

    session.read_transaction(
        lambda tx: print(*(dict(record) for record in tx.run(cypher)), sep="\n")
    )


### Minimum Spanning Tree

- Otakar Borůvka, 1926
- Prim, 1957