# Graph processing with Azure Databricks and Cosmos DB

In this notebook, you will download some bike trips data (stored in parquet files on Azure Data Lake Storage Gen2) and then use the Spark cluster run by Azure Databricks to persist the data as a graph in Azure Cosmos DB.

This sample application is meant to provide you with template code to process and persist a graph in Azure Cosmos DB. You will then be able to test the Cosmos DB graph processing capabilities using the Cosmos DB Gremlin API. The Gremlin API supports modeling graph data and provides APIs to traverse through the graph data.

## Attach notebook to you cluster

Before executing any cells in the notebook, you need to attach it to your cluster. In the notebook's toolbar, select the drop down arrow next to Detached, and then select your cluster under Attach to.

![Detach is expanded in the notebook toolbar, and the cluster is highlighted under Attach to.](https://github.com/alagala/labs/raw/master/azure/databricks/media/databricks-attach-notebook.png "Attach notebook")

## Download the sample parquet files (with bike trips data) on the cluster local store

In [4]:
%sh
wget -P /tmp https://raw.githubusercontent.com/alagala/labs/master/azure/cosmosdb/graph/data/stations.snappy.parquet
wget -P /tmp https://raw.githubusercontent.com/alagala/labs/master/azure/cosmosdb/graph/data/trips.snappy.parquet

## Connect to the Azure Data Lake Storage Gen2 (ADLS)

The snippet of code below connects to ADLS and uploads the parquet. Then, it lists all files available on the ADLS filesystem.

In production, you would probably use services like [Azure Data Factory](https://docs.microsoft.com/en-us/azure/data-factory/) to copy data from external data sources to the Azure Data Lake Storage.

> **NOTE**: For this hands-on lab, the endpoint and master key have already been added to the Azure Key Vault service, so you will retrieve the values from there using `dbutils.secrets.get()`.

In [6]:
# Variable declarations. These will be accessible by any calling notebook.
keyVaultScope = "key-vault-secrets"
adlsAccountName = dbutils.secrets.get(keyVaultScope, "ADLS-Gen2-Account-Name")
adlsAccountKey = dbutils.secrets.get(keyVaultScope, "ADLS-Gen2-Account-Key")

abfsUri = "abfss://biketrips@" + adlsAccountName + ".dfs.core.windows.net/"

spark.conf.set("fs.azure.account.key." + adlsAccountName + ".dfs.core.windows.net", adlsAccountKey)

In [7]:
# Since the hierarchical namespace is enabled for the storage account, we must initialize a filesystem before we can access it.
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls(abfsUri)
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

dbutils.fs.cp("file:///tmp/stations.snappy.parquet", abfsUri)
dbutils.fs.cp("file:///tmp/trips.snappy.parquet", abfsUri)

dbutils.fs.ls(abfsUri)

## Load the bike trip data

In [9]:
spark.conf.set("spark.sql.shuffle.partitions", "16")

In [10]:
from pyspark.sql.functions import col

bikeStations = spark.read.parquet(abfsUri + "stations.snappy.parquet")
tripData = spark.read.parquet(abfsUri + "trips.snappy.parquet")

In [11]:
bikeStations.show(5)

In [12]:
tripData.show(5)

## Build the graph

The first step is to build the graph. Therefore, we need to define the vertices (bike stations) and the edges (bike trips, from one station to the other). In our case we are building a _directed graph_: this graph will point from the source to the location. In the context of this bike trip data, this will point from a trip's starting location to a trip's ending location.

To define the graph, we use the naming conventions fro columns presented in the GraphFrames library. In the vertices table, we define our identifer as `id`, and in the edges table we label each edge's source vertex ID as `src` and the destination ID as `dst`.

In [14]:
from graphframes import GraphFrame

from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from urllib.parse import quote

def urlencode(value):
  return quote(value, safe="")

udf_urlencode = udf(urlencode, StringType())

stationVertices = bikeStations \
  .withColumn("id", col("station_id").cast(StringType())) \
  .withColumn("station_id", udf_urlencode("name")) \
  .withColumn("type", lit("station")) \
  .distinct()

tripEdges = tripData \
  .withColumn("id", col("trip_id").cast(StringType())) \
  .withColumn("src", col("start_terminal").cast(StringType())) \
  .withColumn("dst", col("end_terminal").cast(StringType())) \
  .withColumn("relationship", lit("bike_to")) \
  .drop("trip_id", "start_terminal", "end_terminal")

stationGraph = GraphFrame(stationVertices, tripEdges)
stationGraph.cache()

# Display some basic statistics about the graph.
print("Total number of stations: " + str(stationGraph.vertices.count()))
print("Total number of trips in graph: " + str(stationGraph.edges.count()))
print("Total number of trips in original data: " + str(tripData.count()))

## Cosmos DB Graph backend format

A Cosmos DB graph collection stores data in JSON format in the backend. A detailed description of this representation of vertices and edges is described [here](https://github.com/LuisBosquez/azure-cosmos-db-graph-working-guides/blob/master/graph-backend-json.md).

### Preparing Vertices

From the GraphFrame vertices, we’ll create a new DataFrame containing Cosmos DB vertex rows. Each such row contains the following columns:

- `id`: Unique ID of the vertex . **Note**: `id` in Cosmos DB is part of the resource URI and hence, if it is a string, it must be URL encoded;
- `label` (_optional_): The type of vertex entity (in our example we could say the type is `station`);
- Partition key: If the Cosmos DB graph is provisioned as a partitioned collection, there must be a column with that partition key name (in our example, we have indicated the partition key to be `station_id`);
- Property bag: Gremlin vertex properties are stored as Arrays of Struct, because multiple values are allowed in a single property (**Note**: partition key property is stored as a regular column and NOT a property bag).

In [17]:
def to_cosmosdb_vertices(dfVertices, labelColumn, partitionKey = ""):
  columns = ["id", labelColumn]
  
  if partitionKey:
    columns.append(partitionKey)
  
  columns.extend(['nvl2({x}, array(named_struct("id", uuid(), "_value", {x})), NULL) AS {x}'.format(x=x) \
                  for x in dfVertices.columns if x not in columns])
 
  return dfVertices.selectExpr(*columns).withColumnRenamed(labelColumn, "label")

cosmosDbVertices = to_cosmosdb_vertices(stationGraph.vertices, "type", "station_id")
display(cosmosDbVertices)

### Preparing Edges

Now its time to transform GraphFrame edges into a Cosmos DB DataFrame. Each row inside that DataFrame will contain the following columns:

- `id`: Similar to the `id` column in a vertex row, a unique ID of the edge;
- `label`: The name of edge relationship;
- Gremlin edge properties are stored as regular columns, since multi-valued properties are not supported in Gremlin edges;
- `_isEdge`: Hardcoded boolean column with value `True`;
- `_vertexId`: ID of the source vertex;
- `_sink`: ID of the destination vertex;

Similar to vertices, if the Cosmos DB graph is provisioned as a partitioned collection, the following additional columns must also be provided:

- Partition key: Column with the source vertex partition key name;
- `_sinkPartition`: Value of partition key of destination vertex.

In [19]:
from pyspark.sql.functions import concat_ws, col

def to_cosmosdb_edges(g, labelColumn, partitionKey = ""): 
  dfEdges = g.edges
  
  if partitionKey:
    dfEdges = dfEdges.alias("e") \
      .join(g.vertices.alias("sv"), col("e.src") == col("sv.id")) \
      .join(g.vertices.alias("dv"), col("e.dst") == col("dv.id")) \
      .selectExpr("e.*", "sv." + partitionKey, "dv." + partitionKey + " AS _sinkPartition")

  dfEdges = dfEdges \
    .withColumn("_isEdge", lit(True)) \
    .withColumn("_vertexId", col("src")) \
    .withColumn("_sink", col("dst")) \
    .withColumnRenamed(labelColumn, "label") \
    .drop("src", "dst")
  
  return dfEdges

cosmosDbEdges = to_cosmosdb_edges(stationGraph, "relationship", "station_id")
display(cosmosDbEdges)

## Store the graph in Cosmos DB

Using the Azure Cosmos DB Spark Connector, you will now persist the graph to Azure Cosmos DB where users and applications can connect for interactive queries.

In order to write to Cosmos DB, you need to first create a configuration object that contains the configuration information. If you are curious, read the [configuration reference](https://github.com/Azure/azure-cosmosdb-spark/wiki/Configuration-references) for details on all of the options. 

The core items you need to provide are:

  - **Endpoint**: Your Cosmos DB url (i.e. https://youraccount.documents.azure.com:443/).
  - **Masterkey**: The primary or secondary key string for you Cosmos DB account.
  - **Database**: The name of the database.
  - **Collection**: The name of the collection that you wish to query.

> **NOTE**: For this hands-on lab, the endpoint and master key have already been added to the Azure Key Vault service, so you will retrieve the values from there using `dbutils.secrets.get()`.

In [21]:
cosmosDbConfig = {
    "Endpoint" : dbutils.secrets.get(keyVaultScope, "Cosmos-DB-URI"),
    "Masterkey" : dbutils.secrets.get(keyVaultScope, "Cosmos-DB-Key"),
    "Database" : "BikeTrips",
    "Collection" : "trips",
    "Upsert" : "true",
    "WritingBatchSize": "2000"
}

cosmosDbFormat = "com.microsoft.azure.cosmosdb.spark"

cosmosDbVertices.write.format(cosmosDbFormat).mode("append").options(**cosmosDbConfig).save()
cosmosDbEdges.write.format(cosmosDbFormat).mode("append").options(**cosmosDbConfig).save()