In [1]:
#python notebook 

from pyspark.sql.functions import lit

v = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
], ["id", "name", "age"]) \
.withColumn("entity", lit("person"))


In [2]:
# COMMAND ----------

e = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])



In [3]:
# COMMAND ----------

from graphframes import GraphFrame
g = GraphFrame(v, e)

# COMMAND ----------

display(g.vertices)

# COMMAND ----------

display(g.edges)

# COMMAND ----------

# MAGIC %md ## Convert Vertices and Edges to Cosmos DB internal format
# MAGIC Cosmos DB Gremlin API internally keeps a JSON document representation of Edges and Vertices [as explained here](https://github.com/LuisBosquez/azure-cosmos-db-graph-working-guides/blob/master/graph-backend-json.md). Also `id` in Cosmos DB is [part of the resource URI](https://github.com/Azure/azure-cosmosdb-dotnet/issues/35#issuecomment-121009258) and hence must be URL encoded.



src,dst,relationship
a,b,friend
b,c,follow
c,b,follow
f,c,follow
e,f,follow
e,d,friend
d,a,friend
a,e,friend


In [4]:
# COMMAND ----------

from pyspark.sql.types import StringType
from urllib.parse import quote

def urlencode(value):
  return quote(value, safe="")

udf_urlencode = udf(urlencode, StringType())

# COMMAND ----------

def to_cosmosdb_vertices(dfVertices, labelColumn, partitionKey = ""):
  dfVertices = dfVertices.withColumn("id", udf_urlencode("id"))
  
  columns = ["id", labelColumn]
  
  if partitionKey:
    columns.append(partitionKey)
  
  columns.extend(['nvl2({x}, array(named_struct("id", uuid(), "_value", {x})), NULL) AS {x}'.format(x=x) \
                for x in dfVertices.columns if x not in columns])
 
  return dfVertices.selectExpr(*columns).withColumnRenamed(labelColumn, "label")

# COMMAND ----------

cosmosDbVertices = to_cosmosdb_vertices(g.vertices, "entity")
display(cosmosDbVertices)



id,label,name,age
a,person,"List(List(0df5419b-9241-48fa-8a2e-f58163f357a7, Alice))","List(List(7027fb58-21f2-400f-976e-7ebd22643950, 34))"
b,person,"List(List(20a33687-a84e-4fa5-8195-fb09fb00647a, Bob))","List(List(46a6bead-fc1a-4471-bc23-57a25cd38b4d, 36))"
c,person,"List(List(6c3c7c7c-0407-414c-aed2-e7d20e99129e, Charlie))","List(List(72192e4b-27e3-4c00-b99c-8c0393927938, 30))"
d,person,"List(List(fbd555a2-df94-43d4-949f-f06cf940638d, David))","List(List(00558941-1052-404d-a028-d31e32710170, 29))"
e,person,"List(List(8fd4539b-e6ee-4eeb-81db-1e7422d4cb53, Esther))","List(List(f50cfbff-6525-4c1b-bc38-b17f96a2630d, 32))"
f,person,"List(List(01cc852e-417c-41fb-bef7-e7d1591bb9e9, Fanny))","List(List(8dc1f40c-447b-4757-afe3-08e893f6dbcd, 36))"
g,person,"List(List(0847f5bf-49aa-4fc8-b368-a45c7e38a7a0, Gabby))","List(List(2c336d42-b892-4023-9998-bab6ed4c9e52, 60))"


In [5]:
# COMMAND ----------

from pyspark.sql.functions import concat_ws, col

def to_cosmosdb_edges(g, labelColumn, partitionKey = ""): 
  dfEdges = g.edges
  
  if partitionKey:
    dfEdges = dfEdges.alias("e") \
      .join(g.vertices.alias("sv"), col("e.src") == col("sv.id")) \
      .join(g.vertices.alias("dv"), col("e.dst") == col("dv.id")) \
      .selectExpr("e.*", "sv." + partitionKey, "dv." + partitionKey + " AS _sinkPartition")

  dfEdges = dfEdges \
    .withColumn("id", udf_urlencode(concat_ws("_", col("src"), col(labelColumn), col("dst")))) \
    .withColumn("_isEdge", lit(True)) \
    .withColumn("_vertexId", udf_urlencode("src")) \
    .withColumn("_sink", udf_urlencode("dst")) \
    .withColumnRenamed(labelColumn, "label") \
    .drop("src", "dst")
  
  return dfEdges



In [6]:
# COMMAND ----------

cosmosDbEdges = to_cosmosdb_edges(g, "relationship")
display(cosmosDbEdges)

# COMMAND ----------

cosmosDbConfig = {
  "Endpoint" : "https://<accnt>.documents.azure.com:443/",
  "Masterkey" : "your key",
  "Database" : "dbricks-powered-graphdb",
  "Collection" : "dbricks-powered-graph",
  "Upsert" : "true"
}

cosmosDbFormat = "com.microsoft.azure.cosmosdb.spark"

cosmosDbVertices.write.format(cosmosDbFormat).mode("append").options(**cosmosDbConfig).save()
cosmosDbEdges.write.format(cosmosDbFormat).mode("append").options(**cosmosDbConfig).save()

# COMMAND ----------

label,id,_isEdge,_vertexId,_sink
friend,a_friend_b,True,a,b
follow,b_follow_c,True,b,c
follow,c_follow_b,True,c,b
follow,f_follow_c,True,f,c
follow,e_follow_f,True,e,f
friend,e_friend_d,True,e,d
friend,d_friend_a,True,d,a
friend,a_friend_e,True,a,e
