# Running GDS workloads on data hosted in AuraDB

The new Project 🍓 (Strawberry) is a new way to run GDS on data hosted in an AuraDB database.
The way to do this currently in Aura is to copy the database over to an AuraDS instance, which is itself a database service (single instance).
This detaches the data and the two databases will likely diverge almost immediately.
It also has another couple of limitations:

- It's very manual; users have to click in the Aura Console to copy the database
- Once GDS computations are finished, writing back to the AuraDB instance is also a manual configuration
- AuraDS instances have to be manually managed in the Aura Console and do not encourage users to delete them after usage, thus causing increased running costs

With Project 🍓 we're addressing the top two limitations, and alleviating the final one a little bit.

## AuraDB

A base assumption is that there is an AuraDB already with data in it.
In this notebook, we will illustrate using some tiny example graph.

In [None]:
# Just to begin, let's make sure we have the correct version of the GDS Python Client installed

from graphdatascience import __version__

assert __version__ == "1.10a1"

First we need to configure access to our AuraDB instance. Please fill in the instance id and password.

In [None]:
db_id = "YOUR_DATABASE_ID"
db_password = "YOUR_DATABASE_PASSWORD"

Now we connect to the AuraDB instance to and run some preparations for the notebook

In [None]:
from graphdatascience.gds_session import DbmsConnectionInfo
import os

from neo4j import GraphDatabase

# We can tell the GDS client that we are working with a development environment.
# os.environ["AURA_ENV"] = "devstrawberryfield"

uri = (
    f"neo4j+s://{db_id}-{os.environ['AURA_ENV']}.databases.neo4j-dev.io"
    if os.environ.get("AURA_ENV")
    else f"neo4j+s://{db_id}.databases.neo4j.io"
)

db_connection_info = DbmsConnectionInfo(uri, "neo4j", db_password)
# start a standard Neo4j Python Driver to connect to the AuraDB instance
driver = GraphDatabase.driver(db_connection_info.uri, auth=db_connection_info.auth())

# try out our connection
with driver.session() as session:
    display(session.run("RETURN true AS success").to_df())

Let's add some very basic data to our database. 
The content does not really matter for this notebook, feel free to replace it with more interesting data.

In [None]:
with driver.session() as session:
    session.run("CREATE CONSTRAINT users FOR (u:User) REQUIRE u.id IS NODE KEY")
    session.run("CREATE CONSTRAINT products FOR (p:Product) REQUIRE p.id IS NODE KEY")
    session.run(
        """
        UNWIND range(0, 999) AS i
        CREATE (:User {id: i, age: toInteger(rand() * 75)})
        """
    ).consume()
    session.run(
        """
        UNWIND range(0, 99) AS i
        CREATE (:Product {id: i, cost: rand() * 200})
        """
    ).consume()
    session.run(
        """
        UNWIND range(1, 8000) AS i
        WITH toInteger(rand() * 1000) AS source, toInteger(rand() * 1000) AS target
        MATCH (s:User {id: source})
        MATCH (t:User {id: target})
        CREATE (s)-[:KNOWS {since: 2020 - (rand() * 100)}]->(t)
        """
    ).consume()
    session.run(
        """
        UNWIND range(1, 2000) AS i
        WITH toInteger(rand() * 1000) AS source, toInteger(rand() * 100) AS target
        MATCH (s:User {id: source})
        MATCH (t:Product {id: target})
        CREATE (s)-[:BOUGHT {price: t.cost * (1 + rand())}]->(t)
        """
    ).consume()

    print(f"Number of nodes: {session.run('MATCH () RETURN count(*)').single().value()}")
    print(f"Number of relationships: {session.run('MATCH ()-->() RETURN count(*)').single().value()}")

# A new database component: Arrow Server

We have built a new piece of software into the Neo4j DBMS: an Arrow Server.
It is akin to the already existing Bolt and HTTP servers, but with a more narrow purpose: projecting graphs to a remote location, and receiving results to write back to the database.

With the Arrow Server comes one crucial new feature: an aggregating projection function.
This aggregating function is called `gds.graph.project.remote` and is very similar to Cypher projection v2 in standard GDS.
There are three key differences between them:

1. In AuraDB, the aggregating function does not take a graph name as a parameter.
2. In AuraDB, the aggregating function does not project the graph to the local server.
3. The aggregation function should only be called through the GDS Python Client.

The aggregating function is used in queries that look quite identical to those of Cypher projections v2, and are authored by the user.

There is another function that comes with the Arrow Server, which is internal, undocumented, but is callable: `internal.arrow.status`.
It is used as a crucial part of the GDS Python Client functionality for managing the AuraDB - GDS connection.

In [None]:
# Let's call this function and verify the arrow server is enabled
with driver.session() as session:
    arrow_status = session.run("CALL internal.arrow.status").to_df()
    print(arrow_status)
    assert arrow_status["enabled"][0] == True, "Arrow server on the db needs to be enabled"

# Aura API and GDS Python Client

Apart from the extension to AuraDB, we have also added a new API to the GDS Python Client.
This API is a Python frontend to the Aura API, as well as a set of internal management features for the AuraDB - GDS connection.
In order to use the Aura API, the user needs to have Aura API credentials.
These are generated in the Aura Console (under `Account settings`) and are a pair of strings: `CLIENT_ID` and `CLIENT_SECRET`.

Using these credentials the full set of features offered by the GDS Python Client can be used.
In particular, the features are:

- Create a new GDS session
- (Re-)connect to an existing GDS session
- List all existing GDS sessions
- Delete a GDS session

We will illustrate what this looks like below.

## Tenants

If the user is a member of multiple tenants, then they also need to enter their tenant id, in order to disambiguate which tenant they want to use.
In this notebook, we will use only a single tenant and omit the tenant id. 


In [None]:
# Initialise Aura API credentials
CLIENT_ID = "YOUR_AURA_API_CLIENT_ID"
CLIENT_SECRET = "YOUR_AURA_API_CLIENT_SECRET"

# The GDS session

A key new concept is the GDS session.
This takes the place of an AuraDS instance.
(In fact, it is exactly an AuraDS instance at this time, but we don't want to expose that to the user.
They should think of it as a GDS session and a separate thing, as much as possible.)
The GDS session offers all the GDS functionality that we are familiar with from AuraDS.
However, since the idea is to offload database work to AuraDB, the GDS session is not to be considered a database service.
As such, any standard Cypher queries run through the client's `run_cypher` method will not be executed on the GDS session, but on the AuraDB instance.

All projections will go from AuraDB to GDS session, not from a co-located database.
Similarly, writing back will follow the same path back to AuraDB, and not to a co-located database.

## Implementation limitation

As mentioned in the parenthesis above, we do make use of existing AuraDS infrastructure to host the GDS sessions.
Due to that fact, there actually is a co-located database, but we try to not expose its Bolt URI, in an attempt to prohibit users adding data to that database.

In [None]:
# The new stuff!
from graphdatascience.gds_session import GdsSessions, AuraAPICredentials

# Create a new AuraSessions object
sessions = GdsSessions(ds_connection=AuraAPICredentials(CLIENT_ID, CLIENT_SECRET))

#### Listing sessions

A user can list their running sessions.
By default no session is running:

In [None]:
sessions.list()

# Creating a new session

A new session is created by calling `sessions.get_or_create()`.
Creating a new session takes a few minutes to complete.

## Session sizes

A session is identified by a name and needs a size to be specified.
Session sizes are logical and specified using the `SessionSizes.by_memory()` enum.
Possible values are `DEFAULT`, `XS`, `S`, `SM`, `M`, `ML`, `L`, `XL`, `XXL`, `X3L`, `X4L`, `X5L`.

## Database connection

A GDS Session is by default connected to an AuraDB instance.
In order to connect successfully, address and credentials for the AuraDB instance must be specified. 
It is possible to specify connection information for a self-managed Neo4j DBMS instance (even localhost), but this requires enabling the Arrow Server on that instance.
Documentation for how to do that is not yet available.

## Session cost 

The creation of a session marks the start of billable activity.
Sessions are machines that run in the cloud, and they cost money.
This cost will accumulate for the lifetime of the session, which needs to be manually deleted (see below).

In [None]:
from graphdatascience.gds_session.session_sizes import SessionSizes

sessions.set_cloud_location("euwest-1", "gcp")

# let's create a GDS session!
gds = sessions.get_or_create(
    session_name="pagerank-compute",
    size=SessionSizes.by_memory().DEFAULT,
    db_connection=db_connection_info,
)

# If we had already created a session and want to reconnect to it, the same code is used.
# Doing that will not incur any additional costs, and will be a lot faster.

# Projecting Graphs

In order to project graphs from an AuraDB instance into the GDS session we created a new projection method: `gds.graph.project.remote`
The projection works similar to Cypher projections V2 and is implemented as a Cypher aggregating function.
The Cypher query containing the projection function is executed on the AuraDB instance and the data it produces is transferred to the GDS session instance via an Arrow connection. 

There are two key differences between the remote projection and Cypher projections V2:

1. In AuraDB, the aggregating function does not take a graph name as a parameter.
2. The aggregation function should only be called through the GDS Python Client endpoint `gds.graph.project`.

## Configuration

Apart from the configuration passed into the aggregating function itself and which varies row by row, there is also configuration for the projection as a whole. 
The remote projection endpoint is configured using the following parameters:

- `graph_name`: The name of the graph. Mandatory.
- `query`: The query with the aggregating function. Mandatory.
- `undirectedRelationships`: A list of relationship types that should be projected as undirected. Optional.
- `inverseIndexedRelationships`: A list of relationship types that should be projected also inversely indexed. Optional.
- `nodePropertySchema`: A map of property keys (strings) to its target property type. Optional.
- `relationshipPropertySchema`: A map of property keys (strings) to its target property type. Optional.

While specifying the two schema parameters is optional, it is recommended to do so.
When these are not specified, the projection query will have to make sure that every row contains the full node and relationship schema for automatic inference.


In [None]:
from graphdatascience.gds_session.schema import GdsPropertyTypes

G, result = gds.graph.project(
    "pagerank-graph",
    """
    CALL {
        MATCH (u1:User)
        OPTIONAL MATCH (u1)-[r:KNOWS]->(u2:User)
        RETURN u1 AS source, r AS rel, u2 AS target, {age: u1.age} AS sourceNodeProperties, {} AS targetNodeProperties
        UNION
        MATCH (p:Product)
        OPTIONAL MATCH (p)<-[r:BOUGHT]-(user:User)
        RETURN user AS source, r AS rel, p AS target, {} AS sourceNodeProperties, {cost: p.cost} AS targetNodeProperties
    }
    RETURN gds.graph.project.remote(source, target, {
      sourceNodeProperties: sourceNodeProperties,
      targetNodeProperties: targetNodeProperties,
      sourceNodeLabels: labels(source),
      targetNodeLabels: labels(target),
      relationshipType: type(rel),
      relationshipProperties: properties(rel)
    })
    """,
    nodePropertySchema={"age": GdsPropertyTypes.LONG, "cost": GdsPropertyTypes.DOUBLE},
    relationshipPropertySchema={"since": GdsPropertyTypes.DOUBLE, "price": GdsPropertyTypes.DOUBLE},
)

result

# Running Algorithms

Running algorithms on the projected graph works exactly the same as with standard GDS, particularly when running `stream`, `stats`, and `mutate` modes.
Mutated algorithm results will be stored in the in-memory graph catalog of the GDS Session and the data can be retrieved via the stream operations on the graph like `gds.graph.nodeProperty.stream`.

In [None]:
print("Running PageRank ...")
pr_result = gds.pageRank.mutate(G, mutateProperty="pagerank")
print(f"Compute millis: {pr_result['computeMillis']}")
print(f"Node properties written: {pr_result['nodePropertiesWritten']}")
print(f"Centrality distribution: {pr_result['centralityDistribution']}")

print("Running FastRP ...")
frp_result = gds.fastRP.mutate(
    G,
    mutateProperty="fastRP",
    embeddingDimension=64,
    featureProperties=["pagerank"],
    propertyRatio=0.2,
    nodeSelfInfluence=0.2,
)
print(f"Compute millis: {frp_result['computeMillis']}")
gds.graph.nodeProperties.stream(G, ["pagerank", "fastRP"], separate_property_columns=True)

# Writing back to AuraDB

The GDS Session's in-memory graph was projected from data in AuraDB.
Write back operations will thus persist the data back to the same AuraDB.

When calling any write operations the GDS Python client will automatically use the new remote write back functionality so that no API changes are necessary.

The AuraDB coordinates are not stored in the GDS session, but in the client.
Thus, it is important to use DB credentials that identify the correct database from which the projection came, in the case where the `get_or_create()` method was used to reconnect to an existing session (the `get` case).

In [None]:
# if this fails once with some error like "unable to retrieve routing table"
# then run it again. this is a transient error with a stale server cache.
gds.graph.nodeProperties.write(G, "pagerank")

Of course, we can just use `.write` modes as well:

In [None]:
gds.fastRP.write(
    G,
    writeProperty="fastRP",
    embeddingDimension=64,
    featureProperties=["pagerank"],
    propertyRatio=0.2,
    nodeSelfInfluence=0.2,
)

We can now use the `gds.run_cypher()` method to query the updated graph.
Note that the `run_cypher()` method will run the query on the AuraDB instance.
The GDS Session is not a database service, so it does not support Cypher queries. 

In [None]:
gds.run_cypher(
    """
    MATCH (u:User) 
    RETURN u.id, u.age, u.fastRP, u.pagerank AS rank 
     ORDER BY rank DESC
     LIMIT 5
    """
)

In [None]:
# We can also do arbitrary write queries in the same way
gds.run_cypher(
    """
    CREATE (myNode:MyNode {prop: 1})
    """
)

gds.run_cypher(
    """
    MATCH (myNode:MyNode)
    RETURN myNode.prop
    """
)

# Deleting the session

The idea is that a GDS Session should only live for the time it takes to run a single, complete workload.
If the same workload needs to be re-run, for example to work with updated data, a new session should be created.
This is because the GDS Session is not a database service, but a compute service.
If it is not computing, it should be deleted to avoid unnecessary costs.

The `session.delete()` operation will delete the session and release all resources associated with it.
It is important to note that until this command is called, running the session is associated with billable costs.

In [None]:
# this will return True if it did delete something
# it will return False otherwise, but it will not normally fail
sessions.delete("pagerank-compute")