Skip to content

Commit

Permalink
Revise
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Jul 17, 2023
1 parent 84a2144 commit 33d375d
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 75 deletions.
185 changes: 139 additions & 46 deletions docs/applications/spark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,57 +47,150 @@ with `Neo4j2GraphAr.scala`_ providing a complete example.

.. code:: scala
def main(args: Array[String]): Unit = {
// connect to the Neo4j instance
val spark = SparkSession.builder()
.appName("Neo4j to GraphAr for Movie Graph")
.config("neo4j.url", "bolt://localhost:7687")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", sys.env.get("Neo4j_USR").get)
.config("neo4j.authentication.basic.password", sys.env.get("Neo4j_PWD").get)
.config("spark.master", "local")
.getOrCreate()

// initialize a graph writer
val writer: GraphWriter = new GraphWriter()

// put movie graph data into writer
readAndPutDataIntoWriter(writer, spark)

// write in graphar format
val outputPath: String = args(0)
val vertexChunkSize: Long = args(1).toLong
val edgeChunkSize: Long = args(2).toLong
val fileType: String = args(3)

writer.write(outputPath, spark, "MovieGraph", vertexChunkSize, edgeChunkSize, fileType)
}
def main(args: Array[String]): Unit = {
// connect to the Neo4j instance
val spark = SparkSession.builder()
.appName("Neo4j to GraphAr for Movie Graph")
.config("neo4j.url", "bolt://localhost:7687")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", sys.env.get("Neo4j_USR").get)
.config("neo4j.authentication.basic.password", sys.env.get("Neo4j_PWD").get)
.config("spark.master", "local")
.getOrCreate()
// initialize a graph writer
val writer: GraphWriter = new GraphWriter()
// put movie graph data into writer
readAndPutDataIntoWriter(writer, spark)
// write in graphar format
val outputPath: String = args(0)
val vertexChunkSize: Long = args(1).toLong
val edgeChunkSize: Long = args(2).toLong
val fileType: String = args(3)
writer.write(outputPath, spark, "MovieGraph", vertexChunkSize, edgeChunkSize, fileType)
}
The `readAndPutDataIntoWriter` method read the vertex and edge from Neo4j to DataFrame
(Refer to `Neo4j docs <https://neo4j.com/docs/spark/current/reading/>`_ for more details),
and then put then into a GraphAr GraphWriter.

.. code:: scala
def readAndPutDataIntoWriter(writer: GraphWriter, spark: SparkSession): Unit = {
// read vertices with label "Person" from Neo4j as a DataFrame
val person_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (n:Person) RETURN n.name AS name, n.born as born")
.load()
// put into writer, vertex label is "Person"
writer.PutVertexData("Person", person_df)
// read vertices with label "Movie" from Neo4j as a DataFrame
val movie_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (n:Movie) RETURN n.title AS title, n.tagline as tagline")
.load()
// put into writer, vertex label is "Movie"
writer.PutVertexData("Movie", movie_df)
// read edges with type "Person"->"PRODUCED"->"Movie" from Neo4j as a DataFrame
val produced_edge_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (a:Person)-[r:PRODUCED]->(b:Movie) return a.name as src, b.title as dst")
.load()
// put into writer, source vertex label is "Person", edge label is "PRODUCED"
// target vertex label is "Movie"
writer.PutEdgeData(("Person", "PRODUCED", "Movie"), produced_edge_df)
Finally, the `write` method writes the graph data in GraphAr format to the specified path.

Additionally, when importing data from GraphAr to create/update instances in Neo4j, please refer to the following code:

.. code:: scala
def main(args: Array[String]): Unit = {
// connect to the Neo4j instance
val spark = SparkSession.builder()
.appName("GraphAr to Neo4j for Movie Graph")
.config("neo4j.url", "bolt://localhost:7687")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", sys.env.get("Neo4j_USR").get)
.config("neo4j.authentication.basic.password", sys.env.get("Neo4j_PWD").get)
.config("spark.master", "local")
.getOrCreate()

val graphInfoPath: String = args(0)
val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, spark)

val graphData = GraphReader.read(graphInfoPath, spark)
val vertexData = graphData._1
val edgeData = graphData._2

putVertexDataIntoNeo4j(graphInfo, vertexData, spark)
putEdgeDataIntoNeo4j(graphInfo, vertexData, edgeData, spark)
}
def main(args: Array[String]): Unit = {
// connect to the Neo4j instance
val spark = SparkSession.builder()
.appName("GraphAr to Neo4j for Movie Graph")
.config("neo4j.url", "bolt://localhost:7687")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", sys.env.get("Neo4j_USR").get)
.config("neo4j.authentication.basic.password", sys.env.get("Neo4j_PWD").get)
.config("spark.master", "local")
.getOrCreate()
// path to the graph information file
val graphInfoPath: String = args(0)
val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, spark)
val graphData = GraphReader.read(graphInfoPath, spark)
val vertexData = graphData._1
val edgeData = graphData._2
putVertexDataIntoNeo4j(graphInfo, vertexData, spark)
putEdgeDataIntoNeo4j(graphInfo, vertexData, edgeData, spark)
}
Pass the graph information file path to `loadGraphInfo` method to get the graph information.
Then, read the graph data from GraphAr files with `GraphReader` as DataFrame pair,
`_1` for vertices and `_2` for edges.

The `putVertexDataIntoNeo4j` and `putEdgeDataIntoNeo4j` methods creates or updates the vertices DataFrame and edges DataFrame in Neo4j.

.. code:: scala
def putVertexDataIntoNeo4j(graphInfo: GraphInfo,
vertexData: Map[String, DataFrame],
spark: SparkSession): Unit = {
// write each vertex type to Neo4j
vertexData.foreach { case (key, df) => {
val primaryKey = graphInfo.getVertexInfo(key).getPrimaryKey()
// the vertex index column is not needed in Neo4j
// write to Neo4j, refer to https://neo4j.com/docs/spark/current/writing/
df.drop(GeneralParams.vertexIndexCol).write.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":" + key)
.option("node.keys", primaryKey)
.save()
}}
}
def putEdgeDataIntoNeo4j(graphInfo: GraphInfo,
vertexData: Map[String, DataFrame],
edgeData: Map[(String, String, String), Map[String, DataFrame]],
spark: SparkSession): Unit = {
// write each edge type to Neo4j
edgeData.foreach { case (key, value) => {
// key is (source vertex label, edge label, target vertex label)
val sourceLabel = key._1
val edgeLabel = key._2
val targetLabel = key._3
val sourcePrimaryKey = graphInfo.getVertexInfo(sourceLabel).getPrimaryKey()
val targetPrimaryKey = graphInfo.getVertexInfo(targetLabel).getPrimaryKey()
val sourceDf = vertexData(sourceLabel)
val targetDf = vertexData(targetLabel)
// convert the source and target index column to the primary key column
val df = Utils.joinEdgesWithVertexPrimaryKey(value.head._2, sourceDf, targetDf, sourcePrimaryKey, targetPrimaryKey) // use the first dataframe of (adj_list_type_str, dataframe) map
val properties = if (edgeLabel == "REVIEWED") "rating,summary" else ""
// write to Neo4j, refer to https://neo4j.com/docs/spark/current/writing/
df.write.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("relationship", edgeLabel)
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":" + sourceLabel)
.option("relationship.source.save.mode", "match")
.option("relationship.source.node.keys", "src:" + sourcePrimaryKey)
.option("relationship.target.labels", ":" + targetLabel)
.option("relationship.target.save.mode", "match")
.option("relationship.target.node.keys", "dst:" + targetPrimaryKey)
.option("relationship.properties", properties)
.save()
}}
}
Finally, you will see the graph in Neo4j Browser after running the above code.

See `GraphAr2Neo4j.scala`_ for the complete example.

Expand Down
4 changes: 4 additions & 0 deletions spark/src/main/java/com/alibaba/graphar/GeneralParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,9 @@ public class GeneralParams {
public static final String regularSeperator = "_";
public static final String offsetStartChunkIndexKey = "_graphar_offset_start_chunk_index";
public static final String aggNumListOfEdgeChunkKey = "_graphar_agg_num_list_of_edge_chunk";
public static final Long defaultVertexChunkSize = 262144L; // 2^18
public static final Long defaultEdgeChunkSize = 4194304L; // 2^22
public static final String defaultFileType = "parquet";
public static final String defaultVersion = "v1"
}

2 changes: 1 addition & 1 deletion spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ class EdgeInfo() {
return getSrc_label + GeneralParams.regularSeperator + getEdge_label + GeneralParams.regularSeperator + getDst_label
}

/** Dump to Json string. */
/** Dump to Yaml string. */
def dump(): String = {
val data = new java.util.HashMap[String, Object]()
data.put("src_label", src_label)
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class GraphInfo() {
return edgeInfos
}

/** Dump to Json string. */
/** Dump to Yaml string. */
def dump(): String = {
val data = new java.util.HashMap[String, Object]()
data.put("name", name)
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class VertexInfo() {
return prefix + str
}

/** Dump to Json string. */
/** Dump to Yaml string. */
def dump(): String = {
val data = new java.util.HashMap[String, Object]()
data.put("label", label)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ object GraphAr2Neo4j {
.config("spark.master", "local")
.getOrCreate()

// path to the graph information file
val graphInfoPath: String = args(0)
val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, spark)

val graphData = GraphReader.read(graphInfoPath, spark)
// The edge data need to convert src and dst to the vertex id , so we need to read
// the vertex data with index column.
val graphData = GraphReader.read(graphInfoPath, spark, true)
val vertexData = graphData._1
val edgeData = graphData._2

Expand All @@ -53,9 +56,11 @@ object GraphAr2Neo4j {
}

def putVertexDataIntoNeo4j(graphInfo: GraphInfo, vertexData: Map[String, DataFrame], spark: SparkSession): Unit = {
// write each vertex type to Neo4j
vertexData.foreach { case (key, df) => {
// write the vertices to Neo4j
val primaryKey = graphInfo.getVertexInfo(key).getPrimaryKey()
// the vertex index column is not needed in Neo4j
// write to Neo4j, refer to https://neo4j.com/docs/spark/current/writing/
df.drop(GeneralParams.vertexIndexCol).write.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":" + key)
Expand All @@ -65,15 +70,16 @@ object GraphAr2Neo4j {
}

def putEdgeDataIntoNeo4j(graphInfo: GraphInfo, vertexData: Map[String, DataFrame], edgeData: Map[(String, String, String), Map[String, DataFrame]], spark: SparkSession): Unit = {
// write each edge type to Neo4j
edgeData.foreach { case (key, value) => {
// write the edges to Neo4j
val sourceLabel = key._1
val edgeLabel = key._2
val targetLabel = key._3
val sourcePrimaryKey = graphInfo.getVertexInfo(sourceLabel).getPrimaryKey()
val targetPrimaryKey = graphInfo.getVertexInfo(targetLabel).getPrimaryKey()
val sourceDf = vertexData(sourceLabel)
val targetDf = vertexData(targetLabel)
// convert the source and target index column to the primary key column
val df = Utils.joinEdgesWithVertexPrimaryKey(value.head._2, sourceDf, targetDf, sourcePrimaryKey, targetPrimaryKey) // use the first dataframe of (adj_list_type_str, dataframe) map

// FIXME: use properties message in edge info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ object Neo4j2GraphAr {
// put movie graph data into writer
readAndPutDataIntoWriter(writer, spark)

// write in graphar format
// output directory
val outputPath: String = args(0)
// vertex chunk size
val vertexChunkSize: Long = args(1).toLong
// edge chunk size
val edgeChunkSize: Long = args(2).toLong
// file type
val fileType: String = args(3)

// write in graphar format
writer.write(outputPath, spark, "MovieGraph", vertexChunkSize, edgeChunkSize, fileType)
}

Expand All @@ -54,46 +58,62 @@ object Neo4j2GraphAr {
val person_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (n:Person) RETURN n.name AS name, n.born as born")
.load()
// put into writer
// put into writer, vertex label is "Person"
writer.PutVertexData("Person", person_df)

// read vertices with label "Person" from Neo4j as a DataFrame
// read vertices with label "Movie" from Neo4j as a DataFrame
val movie_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (n:Movie) RETURN n.title AS title, n.tagline as tagline")
.load()
// put into writer
// put into writer, vertex label is "Movie"
writer.PutVertexData("Movie", movie_df)

// read edges with type "Person"->"PRODUCED"->"Movie" from Neo4j as a DataFrame
val produced_edge_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (a:Person)-[r:PRODUCED]->(b:Movie) return a.name as src, b.title as dst")
.load()
// put into writer
// put into writer, source vertex label is "Person", edge label is "PRODUCED"
// target vertex label is "Movie"
writer.PutEdgeData(("Person", "PRODUCED", "Movie"), produced_edge_df)

// read edges with type "Person"->"ACTED_IN"->"Movie" from Neo4j as a DataFrame
val acted_in_edge_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (a:Person)-[r:ACTED_IN]->(b:Movie) return a.name as src, b.title as dst")
.load()
// put into writer, source vertex label is "Person", edge label is "ACTED_IN"
// target vertex label is "Movie"
writer.PutEdgeData(("Person", "ACTED_IN", "Movie"), acted_in_edge_df)

// read edges with type "Person"->"DIRECTED"->"Movie" from Neo4j as a DataFrame
val directed_edge_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (a:Person)-[r:DIRECTED]->(b:Movie) return a.name as src, b.title as dst")
.load()
// put into writer, source vertex label is "Person", edge label is "DIRECTED"
// target vertex label is "Movie"
writer.PutEdgeData(("Person", "DIRECTED", "Movie"), directed_edge_df)

// read edges with type "Person"->"FOLLOWS"->"Person" from Neo4j as a DataFrame
val follows_edge_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (a:Person)-[r:FOLLOWS]->(b:Person) return a.name as src, b.name as dst")
.load()
// put into writer, source vertex label is "Person", edge label is "FOLLOWS"
// target vertex label is "Person"
writer.PutEdgeData(("Person", "FOLLOWS", "Person"), follows_edge_df)

// read edges with type "Person"->"REVIEWED"->"Movie" from Neo4j as a DataFrame
val reviewed_edge_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (a:Person)-[r:REVIEWED]->(b:Movie) return a.name as src, b.title as dst, r.rating as rating, r.summary as summary")
.load()
// put into writer, source vertex label is "Person", edge label is "REVIEWED"
// target vertex label is "Movie"
writer.PutEdgeData(("Person", "REVIEWED", "Movie"), reviewed_edge_df)

// read edges with type "Person"->"WROTE"->"Movie" from Neo4j as a DataFrame
val wrote_edge_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (a:Person)-[r:WROTE]->(b:Movie) return a.name as src, b.title as dst")
.load()
// put into writer, source vertex label is "Person", edge label is "WROTE"
// target vertex label is "Movie"
writer.PutEdgeData(("Person", "WROTE", "Movie"), wrote_edge_df)
}
}
Loading

0 comments on commit 33d375d

Please sign in to comment.