Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Refine the GraphWriter to automatically generate graph info and improve the Neo4j case #196

Merged
merged 27 commits into from
Jul 18, 2023

Conversation

acezen
Copy link
Contributor

@acezen acezen commented Jul 7, 2023

… schema of dataset

Proposed changes

This change focus on Spark's graph level API's refine and Neo4j case's refactor:

  • graph level API refine
    • Make GraphWriter implement as class and add PutVertexData and PutEdgeData method to put vertex/edge data frame to writer. User no need to construct Mapping for data any more.
    • Support generate graph info with DataFrame schema and add dump method to GraphInfo, VertexInfo and EdgeInfo to support dumps the info to JSON string.
  • Neo4j case
    • Refactor the case with graph level API and cover the whole movie graph, not just Person Produced Movie.
    • Add some scripts to help user to run the case easy.( For get started)
  • Some bugfix and document update too.
  • Use markdown format in cpp/spark's README since they're not include in the website.

There still something we can optimize:

  • use .option(xxxx) style to configure parameter of write method like vertex_chunk_size, edge_chunk_size etc.
  • Or user provide a configuration file to describe the parameters.

Types of changes

What types of changes does your code introduce to GraphAr?
Put an x in the boxes that apply

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation Update (if none of the other choices apply)

Checklist

Put an x in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.

  • I have read the CONTRIBUTING doc
  • I have signed the CLA
  • Lint and unit tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

Fixes #200
Fixes #201

@acezen acezen requested a review from lixueclaire July 10, 2023 06:09
@acezen
Copy link
Contributor Author

acezen commented Jul 10, 2023

hi, @lixueclaire , please help me review the change and give your opinion.

@lixueclaire
Copy link
Contributor

hi, @lixueclaire , please help me review the change and give your opinion.

Overall, it looks good for me. Have you ever tested the modified examples and checked the results? If you decide to change the example of Neo4j2GraphAr, there may be some extra things that should be updated:

  1. the data generated by this example in gar-test
  2. the related documentation

Besides, since the data generated from reading Neo4j changed, the example of GraphAr2Neo4j may not work now.

@acezen
Copy link
Contributor Author

acezen commented Jul 10, 2023

hi, @lixueclaire , please help me review the change and give your opinion.

Overall, it looks good for me. Have you ever tested the modified examples and checked the results? If you decide to change the example of Neo4j2GraphAr, there may be some extra things that should be updated:

  1. the data generated by this example in gar-test
  2. the related documentation

Besides, since the data generated from reading Neo4j changed, the example of GraphAr2Neo4j may not work now.

Yes, I will update the document and Neo4j case if we all good about the API change.

@acezen acezen changed the title [WIP][Spark] Refine the GraphWriter to automatically generate graph info base the… [Spark] Refine the GraphWriter to automatically generate graph info base the… Jul 11, 2023
@@ -94,7 +94,11 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession)
val pg0: PropertyGroup = propertyGroups.get(0)
val df0 = readVertexPropertyGroup(pg0, false)
if (len == 1) {
return df0
if (addIndex) {
return IndexGenerator.generateVertexIndexColumn(df0)
Copy link
Contributor Author

@acezen acezen Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: this is fixing the bug that no index column generated in the result when property group only contain one property.

@@ -86,6 +86,19 @@ object IndexGenerator {
spark.createDataFrame(rdd_with_index, schema_with_index)
}

def generateVertexIndexColumnAndIndexMapping(vertexDf: DataFrame, primaryKey: String = ""): (DataFrame, DataFrame) = {
Copy link
Contributor Author

@acezen acezen Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: The method is added for generating index column and index mapping in one-shot, avoid to generate index mapping again when process edges.

val df = DataFrameConcat.concat(adjList_df, properties_df)
val property_groups = edgeInfo.getPropertyGroups(adjListType)
val df = if (property_groups.size == 0) {
adjList_df
Copy link
Contributor Author

@acezen acezen Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: this is fixing the bug that when there are only adj list and no property group, the concatenated DataFrame in line 339 partition num is not same between adjList_df and properties_df(the properties_df is a empty DataFrame and its partition num is 0).

@acezen acezen force-pushed the spark-graph-writer-refine branch from 7f70ab2 to 06a8bc7 Compare July 14, 2023 09:43
.save()
})

def main(args: Array[String]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix format


- <id> the internal Neo4j ID
- <labels> a list of labels for that node
def main(args: Array[String]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix format


putVertexDataIntoNeo4j(graphInfo, vertexData, spark)
putEdgeDataIntoNeo4j(graphInfo, vertexData, edgeData, spark)
}

See `GraphAr2Neo4j.scala`_ for the complete example.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some descriptions about how to write in different modes (i.e., how to modify the example).

@@ -480,6 +480,29 @@ class EdgeInfo() {
def getConcatKey(): String = {
return getSrc_label + GeneralParams.regularSeperator + getEdge_label + GeneralParams.regularSeperator + getDst_label
}

/** Dump to Json string. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yaml?

@@ -223,6 +223,25 @@ class VertexInfo() {
}
return prefix + str
}

/** Dump to Json string. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yaml

@@ -34,20 +35,20 @@ object GraphReader {
private def readAllVertices(prefix: String, vertexInfos: Map[String, VertexInfo], spark: SparkSession): Map[String, DataFrame] = {
val vertex_dataframes: Map[String, DataFrame] = vertexInfos.map { case (label, vertexInfo) => {
val reader = new VertexReader(prefix, vertexInfo, spark)
(label, reader.readAllVertexPropertyGroups(false))
(label, reader.readAllVertexPropertyGroups(true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be adding a parameter for adding index is required for this function.

def write(path: String,
spark: SparkSession,
name: String = "graph",
vertex_chunk_size: Long = 262144, // 2^18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the default value into general parameters?


object Neo4j2GraphAr {

def main(args: Array[String]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to add some comments for args.

val edgeChunkSize: Long = args(2).toLong
val fileType: String = args(3)

writer.write(outputPath, spark, "MovieGraph", vertexChunkSize, edgeChunkSize, fileType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help to update the test data?

Copy link
Contributor Author

@acezen acezen Jul 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CI and case is no need to read from gar-test repo anymore. But if you think we still need to add a copy to test data, I will update it.

Copy link
Contributor

@lixueclaire lixueclaire Jul 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CI and case is no need to read from gar-test repo anymore.

We did not need read from gar-test previously, either. The test data in gar-test is used to give a showcase for the data generated by you example. Currently, the data is inconsistent with the new examples, thus it would be a little confusing.


object GraphAr2Neo4j {

def main(args: Array[String]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments for args.

@acezen acezen force-pushed the spark-graph-writer-refine branch from 697a657 to 33d375d Compare July 17, 2023 11:48
@acezen
Copy link
Contributor Author

acezen commented Jul 17, 2023

Thanks a lot for the review @lixueclaire . I agree with all your comments and made the necessary changes.

Copy link
Contributor

@lixueclaire lixueclaire left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM~ I believe it's a prime example of GraphAr's capabilities. Thank you for making this change!

@acezen acezen changed the title [Spark] Refine the GraphWriter to automatically generate graph info base the… [Spark] Refine the GraphWriter to automatically generate graph info and improve the Neo4j case Jul 18, 2023
@acezen acezen merged commit 11ebf37 into apache:main Jul 18, 2023
5 checks passed
@acezen acezen deleted the spark-graph-writer-refine branch July 18, 2023 02:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants