From 6a2bca616a4590950235a4974afa16b13900ca49 Mon Sep 17 00:00:00 2001 From: acezen Date: Wed, 14 Dec 2022 18:04:09 +0800 Subject: [PATCH 01/13] Initialize --- .gitignore | 2 +- spark/pom.xml | 73 ++++++ .../com/alibaba/graphar/GeneralParams.java | 13 + .../scala/com/alibaba/graphar/EdgeInfo.scala | 244 ++++++++++++++++++ .../scala/com/alibaba/graphar/GraphInfo.scala | 141 ++++++++++ .../com/alibaba/graphar/TestGraphInfo.scala | 86 ++++++ .../com/alibaba/graphar/VertexInfo.scala | 139 ++++++++++ 7 files changed, 697 insertions(+), 1 deletion(-) create mode 100644 spark/pom.xml create mode 100644 spark/src/main/java/com/alibaba/graphar/GeneralParams.java create mode 100644 spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala create mode 100644 spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala create mode 100644 spark/src/main/scala/com/alibaba/graphar/TestGraphInfo.scala create mode 100644 spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala diff --git a/.gitignore b/.gitignore index bc399f64b..0c4a1b713 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ /build/ -spark/generator/target/ +spark/target/ .vscode .idea .DS_store diff --git a/spark/pom.xml b/spark/pom.xml new file mode 100644 index 000000000..675fc1fcc --- /dev/null +++ b/spark/pom.xml @@ -0,0 +1,73 @@ + + + 4.0.0 + + com.laibaba + graphar + 0.1.0-SNAPSHOT + + + graphar + UTF-8 + UTF-8 + 2.12.10 + 2.12 + 512m + 1024m + 3.1.1 + 11 + 11 + 11 + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-mllib_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + provided + + + org.scalatest + scalatest_${scala.binary.version} + 3.1.1 + provided + + + org.scala-lang + scala-library + ${scala.version} + provided + + + org.yaml + snakeyaml + 1.26 + + + jar + diff --git a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java new file mode 100644 index 000000000..bc67b54bd --- /dev/null +++ b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java @@ -0,0 +1,13 @@ +package org.alibaba.graphar; + +public class GeneralParams { + // column name + public static final String vertexIndexCol = "_graphArVertexIndex"; + public static final String srcIndexCol = "_graphArSrcIndex"; + public static final String dstIndexCol = "_graphArDstIndex"; + public static final String offsetCol = "_graphArOffset"; + public static final String primaryCol = "_graphArPrimary"; + public static final String vertexChunkIndexCol = "_graphArVertexChunkIndex"; + public static final String edgeIndexCol = "_graphArEdgeIndex"; +} + diff --git a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala new file mode 100644 index 000000000..f3a455633 --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala @@ -0,0 +1,244 @@ +package org.alibaba.graphar + +import java.io.{File, FileInputStream} +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor +import scala.beans.BeanProperty + +class EdgeInfo() { + @BeanProperty var src_label: String = "" + @BeanProperty var edge_label: String = "" + @BeanProperty var dst_label: String = "" + @BeanProperty var chunk_size: Long = 0 + @BeanProperty var src_chunk_size: Long = 0 + @BeanProperty var dst_chunk_size: Long = 0 + @BeanProperty var directed: Boolean = false + @BeanProperty var prefix: String = "" + @BeanProperty var adj_lists = new java.util.ArrayList[AdjList]() + @BeanProperty var version: String = "" + + val REGULAR_SEPERATOR = "_" + + def containAdjList(adj_list_type: AdjListType.Value): Boolean = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) + return true + } + return false + } + + def getAdjListPrefix(adj_list_type: AdjListType.Value): String = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) { + var str: String = adj_list.getPrefix + if (str == "") { + str = AdjListType.AdjListTypeToString(adj_list_type) + "/" + } + return str + } + } + throw new IllegalArgumentException + } + + def getAdjListFileType(adj_list_type: AdjListType.Value): FileType.Value = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) { + return adj_list.getFile_type_in_gar + } + } + throw new IllegalArgumentException + } + + def getPropertyGroups(adj_list_type: AdjListType.Value): java.util.ArrayList[PropertyGroup] = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) { + return adj_list.getProperty_groups + } + } + throw new IllegalArgumentException + } + + def containPropertyGroup(property_group: PropertyGroup, adj_list_type: AdjListType.Value): Boolean = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) { + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + if (pg == property_group) { + return true + } + } + } + } + return false + } + + def containProperty(property_name: String): Boolean = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return true + } + } + } + } + return false + } + + def getPropertyGroup(property_name: String, adj_list_type: AdjListType.Value): PropertyGroup = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) { + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return pg + } + } + } + } + } + throw new IllegalArgumentException + } + + def getPropertyType(property_name: String): GarType.Value = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return properties.get(j).getData_type_in_gar + } + } + } + } + throw new IllegalArgumentException + } + + def isPrimaryKey(property_name: String): Boolean = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return properties.get(j).getIs_primary + } + } + } + } + throw new IllegalArgumentException + } + + def getPrimaryKey(): String = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getIs_primary) { + return properties.get(j).getName + } + } + } + } + return "" + } + + def isValidated(): Boolean = { + if (src_label == "" || edge_label == "" || dst_label == "") + return false + if (chunk_size <= 0 || src_chunk_size <= 0 || dst_chunk_size <= 0) + return false + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + val file_type = adj_list.getFile_type_in_gar + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + if (num == 0) + return false + val pg_file_type = pg.getFile_type_in_gar + } + } + return true + } + + def getAdjListOffsetFilePath(chunk_index: Long, adj_list_type: AdjListType.Value) : String = { + if (containAdjList(adj_list_type) == false) + throw new IllegalArgumentException + val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/part" + + chunk_index.toString() + "/chunk0" + return str + } + + def getAdjListFilePath(vertex_chunk_index: Long, chunk_index: Long, adj_list_type: AdjListType.Value) : String = { + var str: String = prefix + getAdjListPrefix(adj_list_type) + "adj_list/part" + + vertex_chunk_index.toString() + "/chunk" + chunk_index.toString() + return str + } + + def getPropertyFilePath(property_group: PropertyGroup, adj_list_type: AdjListType.Value, vertex_chunk_index: Long, chunk_index: Long) : String = { + if (containPropertyGroup(property_group, adj_list_type) == false) + throw new IllegalArgumentException + var str: String = property_group.getPrefix + if (str == "") { + val properties = property_group.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (j > 0) + str += REGULAR_SEPERATOR + str += properties.get(j).getName; + } + str += "/" + } + str = prefix + getAdjListPrefix(adj_list_type) + str + "part" + + vertex_chunk_index.toString() + "/chunk" + chunk_index.toString() + return str + } +} \ No newline at end of file diff --git a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala new file mode 100644 index 000000000..f6a035b4f --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala @@ -0,0 +1,141 @@ +package org.alibaba.graphar + +import java.io.{File, FileInputStream} +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor +import scala.beans.BeanProperty + +object GarType extends Enumeration{ + type GarType = Value + val BOOL = Value(0) + val INT32 = Value(2) + val INT64 = Value(3) + val FLOAT = Value(4) + val DOUBLE = Value(5) + val STRING = Value(6) + + def GarTypeToString(gar_type: GarType.Value): String = gar_type match { + case GarType.BOOL => "bool" + case GarType.INT32 => "int32" + case GarType.INT64 => "int64" + case GarType.FLOAT => "float" + case GarType.DOUBLE => "double" + case GarType.STRING => "string" + case _ => throw new IllegalArgumentException + } + + def StringToGarType(str: String): GarType.Value = str match { + case "bool" => GarType.BOOL + case "int32" => GarType.INT32 + case "int64" => GarType.INT64 + case "float" => GarType.FLOAT + case "double" => GarType.DOUBLE + case "string" => GarType.STRING + case _ => throw new IllegalArgumentException + } +} + +object FileType extends Enumeration { + type FileType = Value + val CSV = Value(0) + val PARQUET = Value(1) + val ORC = Value(2) + + def FileTypeToString(file_type: FileType.Value): String = file_type match { + case FileType.CSV => "csv" + case FileType.PARQUET => "parquet" + case FileType.ORC => "orc" + case _ => throw new IllegalArgumentException + } + + def StringToFileType(str: String): FileType.Value = str match { + case "csv" => FileType.CSV + case "parquet" => FileType.PARQUET + case "orc" => FileType.ORC + case _ => throw new IllegalArgumentException + } + +} + +object AdjListType extends Enumeration { + type AdjListType = Value + val unordered_by_source = Value(0) + val unordered_by_dest = Value(1) + val ordered_by_source = Value(2) + val ordered_by_dest = Value(3) + + def AdjListTypeToString(adjList_type: AdjListType.Value): String = adjList_type match { + case AdjListType.unordered_by_source => "unordered_by_source" + case AdjListType.unordered_by_dest => "unordered_by_dest" + case AdjListType.ordered_by_source => "ordered_by_source" + case AdjListType.ordered_by_dest => "ordered_by_dest" + case _ => throw new IllegalArgumentException + } + + def StringToAdjListType(str: String): AdjListType.Value = str match { + case "unordered_by_source" => AdjListType.unordered_by_source + case "unordered_by_dest" => AdjListType.unordered_by_dest + case "ordered_by_source" => AdjListType.ordered_by_source + case "ordered_by_dest" => AdjListType.ordered_by_dest + case _ => throw new IllegalArgumentException + } +} + +class Property () { + @BeanProperty var name: String = "" + @BeanProperty var data_type: String = "" + @BeanProperty var is_primary: Boolean = false + + def getData_type_in_gar: GarType.Value = { + GarType.StringToGarType(data_type) + } +} + +class PropertyGroup () { + @BeanProperty var prefix: String = "" + @BeanProperty var file_type: String = "" + @BeanProperty var properties = new java.util.ArrayList[Property]() + + def getFile_type_in_gar: FileType.Value = { + FileType.StringToFileType(file_type) + } +} + +class AdjList () { + @BeanProperty var ordered: Boolean = false + @BeanProperty var aligned_by: String = "src" + @BeanProperty var prefix: String = "" + @BeanProperty var file_type: String = "" + @BeanProperty var property_groups = new java.util.ArrayList[PropertyGroup]() + + def getFile_type_in_gar: FileType.Value = { + FileType.StringToFileType(file_type) + } + + def getAdjList_type: String = { + var str: String = "" + if (ordered) { + str = "ordered_by_" + } else { + str = "unordered_by_" + } + if (aligned_by == "src") { + str += "source" + } else { + str += "dest" + } + return str + } + + def getAdjList_type_in_gar: AdjListType.Value = { + AdjListType.StringToAdjListType(getAdjList_type) + } +} + +class GraphInfo() { + @BeanProperty var name: String = "" + @BeanProperty var prefix: String = "" + @BeanProperty var vertices = new java.util.ArrayList[String]() + @BeanProperty var edges = new java.util.ArrayList[String]() + @BeanProperty var version: String = "" +} diff --git a/spark/src/main/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/main/scala/com/alibaba/graphar/TestGraphInfo.scala new file mode 100644 index 000000000..7fc0d6239 --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -0,0 +1,86 @@ +package org.alibaba.graphar + +import java.io.{File, FileInputStream} +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor +import scala.beans.BeanProperty + +object TestGraphInfo { + def testGraph(): Unit = { + println("----Test GraphInfo----") + + val filename = "/Users/lixue/Downloads/GraphInfo.yaml" + val input = new FileInputStream(new File(filename)) + val yaml = new Yaml(new Constructor(classOf[GraphInfo])) + val graph_info = yaml.load(input).asInstanceOf[GraphInfo] + + println(graph_info.getVertices) + println(graph_info.getEdges) + } + + def testVertex(): Unit = { + println("----Test VertexInfo----") + + val filename = "/Users/lixue/Downloads/VertexInfo.yaml" + val input = new FileInputStream(new File(filename)) + val yaml = new Yaml(new Constructor(classOf[VertexInfo])) + val vertex_info = yaml.load(input).asInstanceOf[VertexInfo] + + println(vertex_info.getLabel) + println(vertex_info.isValidated) + println(vertex_info.getVerticesNumFilePath()) + println(vertex_info.getPrimaryKey()) + + for ( i <- 0 to (vertex_info.getProperty_groups.size - 1) ) { + val property_group = vertex_info.getProperty_groups.get(i) + println(vertex_info.containPropertyGroup(property_group), property_group.getFile_type_in_gar) + println(vertex_info.getFilePath(property_group, 0)) + for (j <- 0 to (property_group.getProperties.size - 1) ) { + val property = property_group.getProperties.get(j) + println(property.getName, property.getData_type_in_gar, property.getIs_primary) + } + } + + println(vertex_info.getPropertyType("id"), vertex_info.isPrimaryKey("id"), vertex_info.getPropertyGroup("id").getPrefix) + println(vertex_info.getPropertyType("firstName"), vertex_info.isPrimaryKey("firstName"), vertex_info.getPropertyGroup("firstName").getPrefix) + } + + def testEdge(): Unit = { + println("----Test EdgeInfo----") + + val filename = "/Users/lixue/Downloads/EdgeInfo.yaml" + val input = new FileInputStream(new File(filename)) + val yaml = new Yaml(new Constructor(classOf[EdgeInfo])) + val edge_info = yaml.load(input).asInstanceOf[EdgeInfo] + + println(edge_info.getChunk_size) + println(edge_info.isValidated) + println(edge_info.getPrimaryKey) + + for ( i <- 0 to (edge_info.getAdj_lists.size - 1) ) { + val adj_list = edge_info.getAdj_lists.get(i) + println(adj_list.getAdjList_type_in_gar, adj_list.getProperty_groups.size) + for (j <- 0 to (adj_list.getProperty_groups.size - 1) ) { + val property_group = adj_list.getProperty_groups.get(j) + println(edge_info.containPropertyGroup(property_group, adj_list.getAdjList_type_in_gar), edge_info.getPropertyFilePath(property_group, adj_list.getAdjList_type_in_gar, 0, 100)) + } + } + + println(edge_info.containProperty("creationDate"), edge_info.containProperty("id")) + println(edge_info.getPropertyType("creationDate"), edge_info.isPrimaryKey("creationDate")) + println(edge_info.containAdjList(AdjListType.ordered_by_source), edge_info.containAdjList(AdjListType.unordered_by_dest)) + println(edge_info.getAdjListPrefix(AdjListType.ordered_by_source), edge_info.getAdjListFileType(AdjListType.ordered_by_source)) + println(edge_info.getPropertyGroups(AdjListType.unordered_by_source).size) + println(edge_info.getPropertyGroup("creationDate", AdjListType.unordered_by_source).getPrefix) + println(edge_info.getAdjListOffsetFilePath(0, AdjListType.unordered_by_source)) + println(edge_info.getAdjListFilePath(0, 100, AdjListType.ordered_by_source)) + } + + def main(args: Array[String]): Unit = { + testGraph() + testVertex() + testEdge() + } +} + + diff --git a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala new file mode 100644 index 000000000..a5a406cce --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala @@ -0,0 +1,139 @@ +package org.alibaba.graphar + +import java.io.{File, FileInputStream} +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor +import scala.beans.BeanProperty + +class VertexInfo() { + @BeanProperty var label: String = "" + @BeanProperty var chunk_size: Long = 0 + @BeanProperty var prefix: String = "" + @BeanProperty var property_groups = new java.util.ArrayList[PropertyGroup]() + @BeanProperty var version: String = "" + + val REGULAR_SEPERATOR = "_" + + def containPropertyGroup(property_group: PropertyGroup): Boolean = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + if (pg == property_group) { + return true + } + } + return false + } + + def containProperty(property_name: String): Boolean = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return true + } + } + } + return false + } + + def getPropertyGroup(property_name: String): PropertyGroup = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return pg + } + } + } + throw new IllegalArgumentException + } + + def getPropertyType(property_name: String): GarType.Value = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return properties.get(j).getData_type_in_gar + } + } + } + throw new IllegalArgumentException + } + + def isPrimaryKey(property_name: String): Boolean = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return properties.get(j).getIs_primary + } + } + } + throw new IllegalArgumentException + } + + def getPrimaryKey(): String = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getIs_primary) { + return properties.get(j).getName + } + } + } + return "" + } + + def isValidated(): Boolean = { + if (label == "" || chunk_size <= 0) + return false + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + if (num == 0) + return false + val file_type = pg.getFile_type_in_gar + } + return true + } + + def getVerticesNumFilePath(): String = { + return prefix + "vertex_count" + } + + def getFilePath(property_group: PropertyGroup, chunk_index: Long): String = { + if (containPropertyGroup(property_group) == false) + throw new IllegalArgumentException + var str: String = "" + if (property_group.getPrefix == "") { + val properties = property_group.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (j > 0) + str += REGULAR_SEPERATOR + str += properties.get(j).getName; + } + } else { + str = property_group.getPrefix + } + return prefix + str + "part" + chunk_index.toString() + "/chunk0" + } +} \ No newline at end of file From 9d2443da29eaf3a2b766be229ccb13cb5721a963 Mon Sep 17 00:00:00 2001 From: acezen Date: Wed, 14 Dec 2022 21:51:43 +0800 Subject: [PATCH 02/13] Initalize the spark tool of GraphAr --- .github/workflows/spark.yaml | 34 ++++++++ spark/pom.xml | 58 ++++++++++++- .../com/alibaba/graphar/TestGraphInfo.scala | 86 ------------------- .../src/test/resources/ldbc_sample.graph.yml | 6 ++ spark/src/test/resources/person.vertex.yml | 23 +++++ .../resources/person_knows_person.edge.yml | 32 +++++++ .../com/alibaba/graphar/TestGraphInfo.scala | 34 ++++++++ 7 files changed, 184 insertions(+), 89 deletions(-) create mode 100644 .github/workflows/spark.yaml delete mode 100644 spark/src/main/scala/com/alibaba/graphar/TestGraphInfo.scala create mode 100644 spark/src/test/resources/ldbc_sample.graph.yml create mode 100644 spark/src/test/resources/person.vertex.yml create mode 100644 spark/src/test/resources/person_knows_person.edge.yml create mode 100644 spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala diff --git a/.github/workflows/spark.yaml b/.github/workflows/spark.yaml new file mode 100644 index 000000000..66c3dceff --- /dev/null +++ b/.github/workflows/spark.yaml @@ -0,0 +1,34 @@ +name: GraphAr Spark CI + +on: + # Trigger the workflow on push or pull request, + # but only for the main branch + push: + branches: + - main + paths: + - 'spark/**' + pull_request: + branches: + - main + paths: + - 'spark/**' + +concurrency: + group: ${{ github.repository }}-${{ github.event.number || github.head_ref || github.sha }}-${{ github.workflow }} + cancel-in-progress: true + +jobs: + GraphAr-spark: + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 + with: + submodules: true + + - name: Build GraphAr Spark + run: | + export JAVA_HOME=${JAVA_HOME_11_X64 + pushd spark + mvn clean package + popd diff --git a/spark/pom.xml b/spark/pom.xml index 675fc1fcc..fdecc2e33 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -16,9 +16,9 @@ 512m 1024m 3.1.1 - 11 - 11 - 11 + 8 + 1.8 + 1.8 @@ -69,5 +69,57 @@ 1.26 + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + ${scala.version} + + -target:jvm-1.8 + + + -Xss4096K + + + + + scala-compile + + compile + + + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + scala-test-compile + + testCompile + + + + + + org.scalatest + scalatest-maven-plugin + 2.0.0 + + + test + + test + + + + + + jar diff --git a/spark/src/main/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/main/scala/com/alibaba/graphar/TestGraphInfo.scala deleted file mode 100644 index 7fc0d6239..000000000 --- a/spark/src/main/scala/com/alibaba/graphar/TestGraphInfo.scala +++ /dev/null @@ -1,86 +0,0 @@ -package org.alibaba.graphar - -import java.io.{File, FileInputStream} -import org.yaml.snakeyaml.Yaml -import org.yaml.snakeyaml.constructor.Constructor -import scala.beans.BeanProperty - -object TestGraphInfo { - def testGraph(): Unit = { - println("----Test GraphInfo----") - - val filename = "/Users/lixue/Downloads/GraphInfo.yaml" - val input = new FileInputStream(new File(filename)) - val yaml = new Yaml(new Constructor(classOf[GraphInfo])) - val graph_info = yaml.load(input).asInstanceOf[GraphInfo] - - println(graph_info.getVertices) - println(graph_info.getEdges) - } - - def testVertex(): Unit = { - println("----Test VertexInfo----") - - val filename = "/Users/lixue/Downloads/VertexInfo.yaml" - val input = new FileInputStream(new File(filename)) - val yaml = new Yaml(new Constructor(classOf[VertexInfo])) - val vertex_info = yaml.load(input).asInstanceOf[VertexInfo] - - println(vertex_info.getLabel) - println(vertex_info.isValidated) - println(vertex_info.getVerticesNumFilePath()) - println(vertex_info.getPrimaryKey()) - - for ( i <- 0 to (vertex_info.getProperty_groups.size - 1) ) { - val property_group = vertex_info.getProperty_groups.get(i) - println(vertex_info.containPropertyGroup(property_group), property_group.getFile_type_in_gar) - println(vertex_info.getFilePath(property_group, 0)) - for (j <- 0 to (property_group.getProperties.size - 1) ) { - val property = property_group.getProperties.get(j) - println(property.getName, property.getData_type_in_gar, property.getIs_primary) - } - } - - println(vertex_info.getPropertyType("id"), vertex_info.isPrimaryKey("id"), vertex_info.getPropertyGroup("id").getPrefix) - println(vertex_info.getPropertyType("firstName"), vertex_info.isPrimaryKey("firstName"), vertex_info.getPropertyGroup("firstName").getPrefix) - } - - def testEdge(): Unit = { - println("----Test EdgeInfo----") - - val filename = "/Users/lixue/Downloads/EdgeInfo.yaml" - val input = new FileInputStream(new File(filename)) - val yaml = new Yaml(new Constructor(classOf[EdgeInfo])) - val edge_info = yaml.load(input).asInstanceOf[EdgeInfo] - - println(edge_info.getChunk_size) - println(edge_info.isValidated) - println(edge_info.getPrimaryKey) - - for ( i <- 0 to (edge_info.getAdj_lists.size - 1) ) { - val adj_list = edge_info.getAdj_lists.get(i) - println(adj_list.getAdjList_type_in_gar, adj_list.getProperty_groups.size) - for (j <- 0 to (adj_list.getProperty_groups.size - 1) ) { - val property_group = adj_list.getProperty_groups.get(j) - println(edge_info.containPropertyGroup(property_group, adj_list.getAdjList_type_in_gar), edge_info.getPropertyFilePath(property_group, adj_list.getAdjList_type_in_gar, 0, 100)) - } - } - - println(edge_info.containProperty("creationDate"), edge_info.containProperty("id")) - println(edge_info.getPropertyType("creationDate"), edge_info.isPrimaryKey("creationDate")) - println(edge_info.containAdjList(AdjListType.ordered_by_source), edge_info.containAdjList(AdjListType.unordered_by_dest)) - println(edge_info.getAdjListPrefix(AdjListType.ordered_by_source), edge_info.getAdjListFileType(AdjListType.ordered_by_source)) - println(edge_info.getPropertyGroups(AdjListType.unordered_by_source).size) - println(edge_info.getPropertyGroup("creationDate", AdjListType.unordered_by_source).getPrefix) - println(edge_info.getAdjListOffsetFilePath(0, AdjListType.unordered_by_source)) - println(edge_info.getAdjListFilePath(0, 100, AdjListType.ordered_by_source)) - } - - def main(args: Array[String]): Unit = { - testGraph() - testVertex() - testEdge() - } -} - - diff --git a/spark/src/test/resources/ldbc_sample.graph.yml b/spark/src/test/resources/ldbc_sample.graph.yml new file mode 100644 index 000000000..aeb8691f6 --- /dev/null +++ b/spark/src/test/resources/ldbc_sample.graph.yml @@ -0,0 +1,6 @@ +name: ldbc_sample +vertices: + - person.vertex.yml +edges: + - person_knows_person.edge.yml +version: gar/v1 diff --git a/spark/src/test/resources/person.vertex.yml b/spark/src/test/resources/person.vertex.yml new file mode 100644 index 000000000..95201775e --- /dev/null +++ b/spark/src/test/resources/person.vertex.yml @@ -0,0 +1,23 @@ +label: person +chunk_size: 100 +prefix: vertex/person/ +property_groups: + - properties: + - name: id + data_type: int64 + is_primary: true + prefix: id/ + file_type: csv + - properties: + - name: firstName + data_type: string + is_primary: false + - name: lastName + data_type: string + is_primary: false + - name: gender + data_type: string + is_primary: false + prefix: firstName_lastName_gender/ + file_type: csv +version: gar/v1 diff --git a/spark/src/test/resources/person_knows_person.edge.yml b/spark/src/test/resources/person_knows_person.edge.yml new file mode 100644 index 000000000..d582c58f8 --- /dev/null +++ b/spark/src/test/resources/person_knows_person.edge.yml @@ -0,0 +1,32 @@ +src_label: person +edge_label: knows +dst_label: person +chunk_size: 1024 +src_chunk_size: 100 +dst_chunk_size: 100 +directed: false +prefix: edge/person_knows_person/ +adj_lists: + - ordered: true + aligned_by: src + prefix: ordered_by_source/ + file_type: csv + property_groups: + - prefix: creationDate/ + file_type: csv + properties: + - name: creationDate + data_type: string + is_primary: false + - ordered: true + aligned_by: dst + prefix: ordered_by_dest/ + file_type: csv + property_groups: + - prefix: creationDate/ + file_type: csv + properties: + - name: creationDate + data_type: string + is_primary: false +version: gar/v1 diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala new file mode 100644 index 000000000..4fdf5a28d --- /dev/null +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -0,0 +1,34 @@ +package org.alibaba.graphar + +import java.io.{File, FileInputStream} +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor +import scala.beans.BeanProperty +import org.scalatest.funsuite.AnyFunSuite + +class GraphInfoSuite extends AnyFunSuite { + + test("load graph info") { + val input = getClass.getClassLoader.getResourceAsStream("ldbc_sample.graph.yml") + val yaml = new Yaml(new Constructor(classOf[GraphInfo])) + val graph_info = yaml.load(input).asInstanceOf[GraphInfo] + + assert(graph_info.getName == "ldbc_sample") + assert(graph_info.getPrefix == "" ) + assert(graph_info.getVertices.size() == 1) + assert(graph_info.getEdges.size() == 1) + } + + test("load vertex info") { + val input = getClass.getClassLoader.getResourceAsStream("person.vertex.yml") + val yaml = new Yaml(new Constructor(classOf[VertexInfo])) + val vertex_info = yaml.load(input).asInstanceOf[VertexInfo] + + assert(vertex_info.getLabel == "person") + assert(vertex_info.isValidated) + assert(vertex_info.getVerticesNumFilePath() == "vertex/person/vertex_count") + assert(vertex_info.getPrimaryKey() == "id") + assert(vertex_info.getProperty_groups.size() == 2) + } + +} From e886fba72ac9822f7478fe911398cf03b3ed231a Mon Sep 17 00:00:00 2001 From: acezen Date: Wed, 14 Dec 2022 22:08:11 +0800 Subject: [PATCH 03/13] Fix --- .github/workflows/spark.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/spark.yaml b/.github/workflows/spark.yaml index 66c3dceff..882700219 100644 --- a/.github/workflows/spark.yaml +++ b/.github/workflows/spark.yaml @@ -28,7 +28,7 @@ jobs: - name: Build GraphAr Spark run: | - export JAVA_HOME=${JAVA_HOME_11_X64 + export JAVA_HOME=${JAVA_HOME_11_X64} pushd spark mvn clean package popd From 44d9d19164ca1aa71cf9fb9358ca924f6959e88b Mon Sep 17 00:00:00 2001 From: acezen Date: Wed, 14 Dec 2022 22:44:20 +0800 Subject: [PATCH 04/13] Update --- .github/workflows/spark.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/spark.yaml b/.github/workflows/spark.yaml index 882700219..1feed20f8 100644 --- a/.github/workflows/spark.yaml +++ b/.github/workflows/spark.yaml @@ -32,3 +32,10 @@ jobs: pushd spark mvn clean package popd + + - name: Run test + run: | + export JAVA_HOME=${JAVA_HOME_11_X64} + pushd spark + mvn test + popd From ce1ef3ad0edd96d5dc5c9393025b2068157315a1 Mon Sep 17 00:00:00 2001 From: acezen Date: Thu, 15 Dec 2022 10:21:38 +0800 Subject: [PATCH 05/13] Update test for vertex info --- .../src/test/resources/ldbc_sample.graph.yml | 1 + .../com/alibaba/graphar/TestGraphInfo.scala | 43 ++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/spark/src/test/resources/ldbc_sample.graph.yml b/spark/src/test/resources/ldbc_sample.graph.yml index aeb8691f6..6ace3d9a9 100644 --- a/spark/src/test/resources/ldbc_sample.graph.yml +++ b/spark/src/test/resources/ldbc_sample.graph.yml @@ -1,4 +1,5 @@ name: ldbc_sample +prefix: ./ vertices: - person.vertex.yml edges: diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala index 4fdf5a28d..f57e8a71e 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -14,9 +14,16 @@ class GraphInfoSuite extends AnyFunSuite { val graph_info = yaml.load(input).asInstanceOf[GraphInfo] assert(graph_info.getName == "ldbc_sample") - assert(graph_info.getPrefix == "" ) + assert(graph_info.getPrefix == "./" ) assert(graph_info.getVertices.size() == 1) + val vertices = new java.util.ArrayList[String] + vertices.add("person.vertex.yml") + assert(graph_info.getVertices == vertices) assert(graph_info.getEdges.size() == 1) + val edges = new java.util.ArrayList[String] + edges.add("person_knows_person.edge.yml") + assert(graph_info.getEdges == edges) + assert(graph_info.getVersion == "gar/v1") } test("load vertex info") { @@ -29,6 +36,40 @@ class GraphInfoSuite extends AnyFunSuite { assert(vertex_info.getVerticesNumFilePath() == "vertex/person/vertex_count") assert(vertex_info.getPrimaryKey() == "id") assert(vertex_info.getProperty_groups.size() == 2) + assert(vertex_info.getVersion == "gar/v1") + assert(vertex_info.getChunk_size == 100) + + for ( i <- 0 to (vertex_info.getProperty_groups.size - 1) ) { + val property_group = vertex_info.getProperty_groups.get(i) + assert(vertex_info.containPropertyGroup(property_group)) + } + assert(vertex_info.containProperty("id")) + val property_group = vertex_info.getPropertyGroup("id") + assert(property_group.getProperties.size == 1) + assert(property_group.getFile_type == "csv") + assert(property_group.getFile_type_in_gar == FileType.CSV) + assert(vertex_info.containPropertyGroup(property_group)) + assert(vertex_info.getPropertyType("id") == GarType.INT64) + assert(vertex_info.isPrimaryKey("id")) + assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/part0/chunk0") + assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/part4/chunk0") + + assert(vertex_info.containProperty("firstName")) + val property_group_2 = vertex_info.getPropertyGroup("firstName") + assert(property_group_2.getProperties.size == 3) + assert(property_group_2.getFile_type == "csv") + assert(property_group_2.getFile_type_in_gar == FileType.CSV) + assert(vertex_info.containPropertyGroup(property_group_2)) + assert(vertex_info.getPropertyType("firstName") == GarType.STRING) + assert(vertex_info.isPrimaryKey("firstName") == false) + assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/part0/chunk0") + assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/part4/chunk0") + + assert(vertex_info.containProperty("not_exist") == false) + assertThrows[IllegalArgumentException](vertex_info.getPropertyGroup("not_exist")) + assertThrows[IllegalArgumentException](vertex_info.getPropertyType("now_exist")) + assertThrows[IllegalArgumentException](vertex_info.isPrimaryKey("now_exist")) } + } From 651df78eee3754b29a51a4ca1f805caa4b431e38 Mon Sep 17 00:00:00 2001 From: acezen Date: Thu, 15 Dec 2022 12:29:27 +0800 Subject: [PATCH 06/13] Add unit tests for edge info --- .../com/alibaba/graphar/TestGraphInfo.scala | 73 ++++++++++++++++++- 1 file changed, 71 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala index f57e8a71e..8d14d3315 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -71,5 +71,74 @@ class GraphInfoSuite extends AnyFunSuite { assertThrows[IllegalArgumentException](vertex_info.isPrimaryKey("now_exist")) } - -} + test("load edge info") { + val input = getClass.getClassLoader.getResourceAsStream("person_knows_person.edge.yml") + val yaml = new Yaml(new Constructor(classOf[EdgeInfo])) + val edge_info = yaml.load(input).asInstanceOf[EdgeInfo] + + assert(edge_info.getSrc_label == "person") + assert(edge_info.getDst_label == "person") + assert(edge_info.getEdge_label == "knows") + assert(edge_info.getChunk_size == 1024) + assert(edge_info.getSrc_chunk_size == 100) + assert(edge_info.getDst_chunk_size == 100) + assert(edge_info.getDirected == false) + assert(edge_info.getPrefix == "edge/person_knows_person/") + assert(edge_info.getAdj_lists.size == 2) + assert(edge_info.getVersion == "gar/v1") + assert(edge_info.isValidated) + assert(edge_info.getPrimaryKey == "") + + assert(edge_info.containAdjList(AdjListType.ordered_by_source)) + assert(edge_info.getAdjListPrefix(AdjListType.ordered_by_source) == "ordered_by_source/") + assert(edge_info.getAdjListFileType(AdjListType.ordered_by_source) == FileType.CSV) + assert(edge_info.getPropertyGroups(AdjListType.ordered_by_source).size == 1) + assert(edge_info.getAdjListFilePath(0, 0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part0/chunk0") + assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/chunk2") + assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part0/chunk0") + assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part4/chunk0") + val property_group = edge_info.getPropertyGroups(AdjListType.ordered_by_source).get(0) + assert(edge_info.containPropertyGroup(property_group, AdjListType.ordered_by_source)) + val property = property_group.getProperties.get(0) + val property_name = property.getName + assert(edge_info.containProperty(property_name)) + assert(edge_info.getPropertyGroup(property_name, AdjListType.ordered_by_source) == property_group) + assert(edge_info.getPropertyType(property_name) == property.getData_type_in_gar) + assert(edge_info.isPrimaryKey(property_name) == property.getIs_primary) + assert(edge_info.getPropertyFilePath(property_group, AdjListType.ordered_by_source, 0, 0) == "edge/person_knows_person/ordered_by_source/creationDate/part0/chunk0") + assert(edge_info.getPropertyFilePath(property_group, AdjListType.ordered_by_source, 1, 2) == "edge/person_knows_person/ordered_by_source/creationDate/part1/chunk2") + + assert(edge_info.containAdjList(AdjListType.ordered_by_dest)) + assert(edge_info.getAdjListPrefix(AdjListType.ordered_by_dest) == "ordered_by_dest/") + assert(edge_info.getAdjListFileType(AdjListType.ordered_by_dest) == FileType.CSV) + assert(edge_info.getPropertyGroups(AdjListType.ordered_by_dest).size == 1) + assert(edge_info.getAdjListFilePath(0, 0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part0/chunk0") + assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/chunk2") + assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part0/chunk0") + assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part4/chunk0") + val property_group_2 = edge_info.getPropertyGroups(AdjListType.ordered_by_dest).get(0) + assert(edge_info.containPropertyGroup(property_group_2, AdjListType.ordered_by_dest)) + val property_2 = property_group_2.getProperties.get(0) + val property_name_2 = property_2.getName + assert(edge_info.containProperty(property_name_2)) + assert(edge_info.getPropertyGroup(property_name_2, AdjListType.ordered_by_dest) == property_group_2) + assert(edge_info.getPropertyType(property_name_2) == property_2.getData_type_in_gar) + assert(edge_info.isPrimaryKey(property_name_2) == property_2.getIs_primary) + assert(edge_info.getPropertyFilePath(property_group_2, AdjListType.ordered_by_dest, 0, 0) == "edge/person_knows_person/ordered_by_dest/creationDate/part0/chunk0") + assert(edge_info.getPropertyFilePath(property_group_2, AdjListType.ordered_by_dest, 1, 2) == "edge/person_knows_person/ordered_by_dest/creationDate/part1/chunk2") + + assert(edge_info.containAdjList(AdjListType.unordered_by_source) == false) + assertThrows[IllegalArgumentException](edge_info.getAdjListPrefix(AdjListType.unordered_by_source)) + assertThrows[IllegalArgumentException](edge_info.getAdjListFileType(AdjListType.unordered_by_source)) + assertThrows[IllegalArgumentException](edge_info.getPropertyGroups(AdjListType.unordered_by_source)) + assert(edge_info.containPropertyGroup(property_group, AdjListType.unordered_by_source) == false) + assertThrows[IllegalArgumentException](edge_info.getAdjListOffsetFilePath(0, AdjListType.unordered_by_source)) + assertThrows[IllegalArgumentException](edge_info.getAdjListFilePath(0, 0, AdjListType.unordered_by_source)) + assertThrows[IllegalArgumentException](edge_info.getPropertyFilePath(property_group, AdjListType.unordered_by_source, 0, 0)) + assert(edge_info.containProperty("not_exist") == false) + assertThrows[IllegalArgumentException](edge_info.getPropertyGroup("not_exist", AdjListType.ordered_by_source)) + assertThrows[IllegalArgumentException](edge_info.getPropertyType("not_exist")) + assertThrows[IllegalArgumentException](edge_info.isPrimaryKey("not_exist")) + } + + } From 88edece9c18460baeadf3e83073c884056c3239a Mon Sep 17 00:00:00 2001 From: acezen Date: Thu, 15 Dec 2022 12:36:30 +0800 Subject: [PATCH 07/13] Update --- spark/src/main/java/com/alibaba/graphar/GeneralParams.java | 1 + spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala | 6 ++---- spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala | 6 ++---- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java index bc67b54bd..64d7f085d 100644 --- a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java +++ b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java @@ -9,5 +9,6 @@ public class GeneralParams { public static final String primaryCol = "_graphArPrimary"; public static final String vertexChunkIndexCol = "_graphArVertexChunkIndex"; public static final String edgeIndexCol = "_graphArEdgeIndex"; + public static final String regularSeperator = "_"; } diff --git a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala index f3a455633..77ac836dd 100644 --- a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala @@ -17,8 +17,6 @@ class EdgeInfo() { @BeanProperty var adj_lists = new java.util.ArrayList[AdjList]() @BeanProperty var version: String = "" - val REGULAR_SEPERATOR = "_" - def containAdjList(adj_list_type: AdjListType.Value): Boolean = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -232,7 +230,7 @@ class EdgeInfo() { val num = properties.size for ( j <- 0 to num - 1 ) { if (j > 0) - str += REGULAR_SEPERATOR + str += GeneralParams.regularSeperator str += properties.get(j).getName; } str += "/" @@ -241,4 +239,4 @@ class EdgeInfo() { vertex_chunk_index.toString() + "/chunk" + chunk_index.toString() return str } -} \ No newline at end of file +} diff --git a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala index a5a406cce..2f0d894f0 100644 --- a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala @@ -12,8 +12,6 @@ class VertexInfo() { @BeanProperty var property_groups = new java.util.ArrayList[PropertyGroup]() @BeanProperty var version: String = "" - val REGULAR_SEPERATOR = "_" - def containPropertyGroup(property_group: PropertyGroup): Boolean = { val len: Int = property_groups.size for ( i <- 0 to len - 1 ) { @@ -128,7 +126,7 @@ class VertexInfo() { val num = properties.size for ( j <- 0 to num - 1 ) { if (j > 0) - str += REGULAR_SEPERATOR + str += GeneralParams.regularSeperator str += properties.get(j).getName; } } else { @@ -136,4 +134,4 @@ class VertexInfo() { } return prefix + str + "part" + chunk_index.toString() + "/chunk0" } -} \ No newline at end of file +} From 4ddd1df7d0b30550ccf386cd6b3b0c32a9c908b2 Mon Sep 17 00:00:00 2001 From: acezen Date: Thu, 15 Dec 2022 15:59:49 +0800 Subject: [PATCH 08/13] Fix package name --- spark/pom.xml | 2 +- spark/src/main/java/com/alibaba/graphar/GeneralParams.java | 2 +- spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala | 2 +- spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala | 2 +- spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala | 2 +- spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spark/pom.xml b/spark/pom.xml index fdecc2e33..375d4250a 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -3,7 +3,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.laibaba + com.alibaba graphar 0.1.0-SNAPSHOT diff --git a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java index 64d7f085d..8cfb5069b 100644 --- a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java +++ b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java @@ -1,4 +1,4 @@ -package org.alibaba.graphar; +package com.alibaba.graphar; public class GeneralParams { // column name diff --git a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala index 77ac836dd..6f8cbc541 100644 --- a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala @@ -1,4 +1,4 @@ -package org.alibaba.graphar +package com.alibaba.graphar import java.io.{File, FileInputStream} import org.yaml.snakeyaml.Yaml diff --git a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala index f6a035b4f..a60813080 100644 --- a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala @@ -1,4 +1,4 @@ -package org.alibaba.graphar +package com.alibaba.graphar import java.io.{File, FileInputStream} import org.yaml.snakeyaml.Yaml diff --git a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala index 2f0d894f0..21e4a08cf 100644 --- a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala @@ -1,4 +1,4 @@ -package org.alibaba.graphar +package com.alibaba.graphar import java.io.{File, FileInputStream} import org.yaml.snakeyaml.Yaml diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala index 8d14d3315..942d64404 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -1,4 +1,4 @@ -package org.alibaba.graphar +package com.alibaba.graphar import java.io.{File, FileInputStream} import org.yaml.snakeyaml.Yaml From 5b903745354432b37f7e6f8c4360336b2a17e5bc Mon Sep 17 00:00:00 2001 From: acezen Date: Thu, 15 Dec 2022 18:13:55 +0800 Subject: [PATCH 09/13] Update --- spark/src/test/resources/gar-test | 1 + .../src/test/resources/ldbc_sample.graph.yml | 7 ---- spark/src/test/resources/person.vertex.yml | 23 ------------- .../resources/person_knows_person.edge.yml | 32 ------------------- .../com/alibaba/graphar/TestGraphInfo.scala | 8 ++--- 5 files changed, 5 insertions(+), 66 deletions(-) create mode 120000 spark/src/test/resources/gar-test delete mode 100644 spark/src/test/resources/ldbc_sample.graph.yml delete mode 100644 spark/src/test/resources/person.vertex.yml delete mode 100644 spark/src/test/resources/person_knows_person.edge.yml diff --git a/spark/src/test/resources/gar-test b/spark/src/test/resources/gar-test new file mode 120000 index 000000000..952da9d1f --- /dev/null +++ b/spark/src/test/resources/gar-test @@ -0,0 +1 @@ +../../../../test/gar-test \ No newline at end of file diff --git a/spark/src/test/resources/ldbc_sample.graph.yml b/spark/src/test/resources/ldbc_sample.graph.yml deleted file mode 100644 index 6ace3d9a9..000000000 --- a/spark/src/test/resources/ldbc_sample.graph.yml +++ /dev/null @@ -1,7 +0,0 @@ -name: ldbc_sample -prefix: ./ -vertices: - - person.vertex.yml -edges: - - person_knows_person.edge.yml -version: gar/v1 diff --git a/spark/src/test/resources/person.vertex.yml b/spark/src/test/resources/person.vertex.yml deleted file mode 100644 index 95201775e..000000000 --- a/spark/src/test/resources/person.vertex.yml +++ /dev/null @@ -1,23 +0,0 @@ -label: person -chunk_size: 100 -prefix: vertex/person/ -property_groups: - - properties: - - name: id - data_type: int64 - is_primary: true - prefix: id/ - file_type: csv - - properties: - - name: firstName - data_type: string - is_primary: false - - name: lastName - data_type: string - is_primary: false - - name: gender - data_type: string - is_primary: false - prefix: firstName_lastName_gender/ - file_type: csv -version: gar/v1 diff --git a/spark/src/test/resources/person_knows_person.edge.yml b/spark/src/test/resources/person_knows_person.edge.yml deleted file mode 100644 index d582c58f8..000000000 --- a/spark/src/test/resources/person_knows_person.edge.yml +++ /dev/null @@ -1,32 +0,0 @@ -src_label: person -edge_label: knows -dst_label: person -chunk_size: 1024 -src_chunk_size: 100 -dst_chunk_size: 100 -directed: false -prefix: edge/person_knows_person/ -adj_lists: - - ordered: true - aligned_by: src - prefix: ordered_by_source/ - file_type: csv - property_groups: - - prefix: creationDate/ - file_type: csv - properties: - - name: creationDate - data_type: string - is_primary: false - - ordered: true - aligned_by: dst - prefix: ordered_by_dest/ - file_type: csv - property_groups: - - prefix: creationDate/ - file_type: csv - properties: - - name: creationDate - data_type: string - is_primary: false -version: gar/v1 diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala index 942d64404..3744d8b2e 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -9,12 +9,12 @@ import org.scalatest.funsuite.AnyFunSuite class GraphInfoSuite extends AnyFunSuite { test("load graph info") { - val input = getClass.getClassLoader.getResourceAsStream("ldbc_sample.graph.yml") + val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml") val yaml = new Yaml(new Constructor(classOf[GraphInfo])) val graph_info = yaml.load(input).asInstanceOf[GraphInfo] assert(graph_info.getName == "ldbc_sample") - assert(graph_info.getPrefix == "./" ) + assert(graph_info.getPrefix == "" ) assert(graph_info.getVertices.size() == 1) val vertices = new java.util.ArrayList[String] vertices.add("person.vertex.yml") @@ -27,7 +27,7 @@ class GraphInfoSuite extends AnyFunSuite { } test("load vertex info") { - val input = getClass.getClassLoader.getResourceAsStream("person.vertex.yml") + val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person.vertex.yml") val yaml = new Yaml(new Constructor(classOf[VertexInfo])) val vertex_info = yaml.load(input).asInstanceOf[VertexInfo] @@ -72,7 +72,7 @@ class GraphInfoSuite extends AnyFunSuite { } test("load edge info") { - val input = getClass.getClassLoader.getResourceAsStream("person_knows_person.edge.yml") + val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person_knows_person.edge.yml") val yaml = new Yaml(new Constructor(classOf[EdgeInfo])) val edge_info = yaml.load(input).asInstanceOf[EdgeInfo] From 0ff743da2707cab2875c503e10ab226c1c239df5 Mon Sep 17 00:00:00 2001 From: acezen Date: Fri, 16 Dec 2022 08:58:21 +0800 Subject: [PATCH 10/13] Add IndexGenerator --- .../com/alibaba/graphar/IndexGenerator.scala | 119 ++++++++++++++++++ .../alibaba/graphar/TestIndexGenerator.scala | 40 ++++++ 2 files changed, 159 insertions(+) create mode 100644 spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala create mode 100644 spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala diff --git a/spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala b/spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala new file mode 100644 index 000000000..15bd02d3e --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala @@ -0,0 +1,119 @@ +package com.alibaba.graphar + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Row +import org.apache.spark.rdd.RDD + +import scala.collection.SortedMap +import scala.collection.mutable.ArrayBuffer + +object IndexGenerator { + + // index helper for the vertex DataFrame + + // return a DataFrame contains two columns: vertex index & primary key + def constructVertexIndexMapping(vertexDf: DataFrame, primaryKey: String): DataFrame = { + val spark = vertexDf.sparkSession + val schema = vertexDf.schema + val id_index = schema.fieldIndex(primaryKey) + val mapping_schema = StructType(Seq(StructField(GeneralParams.vertexIndexCol, LongType, false), schema.apply(id_index))) + val rdd = vertexDf.rdd + val counts = rdd + .mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true) + .collectAsMap() + val aggregatedCounts = SortedMap(counts.toSeq: _*) + .foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) => + (total + c, map + (i -> total)) + } + ._2 + val broadcastedCounts = spark.sparkContext.broadcast(aggregatedCounts) + val mapping = rdd.mapPartitionsWithIndex((i, ps) => { + val start = broadcastedCounts.value(i) + for { (p, j) <- ps.zipWithIndex } yield Row(start + j, p(id_index)) + }) + spark.createDataFrame(mapping, mapping_schema).withColumnRenamed(primaryKey, GeneralParams.primaryCol) + } + + //add a column contains vertex index + def generateVertexIndexColumn(vertexDf: DataFrame): DataFrame = { + val spark = vertexDf.sparkSession + val schema = vertexDf.schema + val schema_with_index = StructType(StructType(Seq(StructField(GeneralParams.vertexIndexCol, LongType, true)))++schema) + val rdd = vertexDf.rdd + val counts = rdd + .mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true) + .collectAsMap() + val aggregatedCounts = SortedMap(counts.toSeq: _*) + .foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) => + (total + c, map + (i -> total)) + } + ._2 + val broadcastedCounts = spark.sparkContext.broadcast(aggregatedCounts) + val rdd_with_index = rdd.mapPartitionsWithIndex((i, ps) => { + val start = broadcastedCounts.value(i) + for { (p, j) <- ps.zipWithIndex } yield Row.fromSeq(Seq(start + j) ++ p.toSeq) + }) + spark.createDataFrame(rdd_with_index, schema_with_index) + } + + //index helper for the Edge DataFrame + + // join the edge table with the vertex index mapping for source column + def generateSrcIndexForEdgesFromMapping(edgeDf: DataFrame, srcColumnName: String, srcIndexMapping: DataFrame): DataFrame = { + val spark = edgeDf.sparkSession + srcIndexMapping.createOrReplaceTempView("src_vertex") + edgeDf.createOrReplaceTempView("edge") + val srcCol = GeneralParams.srcIndexCol; + val indexCol = GeneralParams.vertexIndexCol; + val srcPrimaryKey = GeneralParams.primaryCol; + val trans_df = spark.sql(f"select src_vertex.$indexCol%s as $srcCol%s, edge.* from edge inner join src_vertex on src_vertex.$srcPrimaryKey%s=edge.$srcColumnName%s") + // drop the old src id col + trans_df.drop(srcColumnName) + } + + // join the edge table with the vertex index mapping for destination column + def generateDstIndexForEdgesFromMapping(edgeDf: DataFrame, dstColumnName: String, dstIndexMapping: DataFrame): DataFrame = { + val spark = edgeDf.sparkSession + dstIndexMapping.createOrReplaceTempView("dst_vertex") + edgeDf.createOrReplaceTempView("edge") + val dstCol = GeneralParams.dstIndexCol; + val indexCol = GeneralParams.vertexIndexCol; + val dstPrimaryKey = GeneralParams.primaryCol; + val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edge.* from edge inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edge.$dstColumnName%s") + // drop the old dst id col + trans_df.drop(dstColumnName) + } + + // join the edge table with the vertex index mapping for source & destination columns + def generateSrcAndDstIndexForEdgesFromMapping(edgeDf: DataFrame, srcColumnName: String, dstColumnName: String, srcIndexMapping: DataFrame, dstIndexMapping: DataFrame): DataFrame = { + val df_with_src_index = generateSrcIndexForEdgesFromMapping(edgeDf, srcColumnName, srcIndexMapping) + generateDstIndexForEdgesFromMapping(df_with_src_index, dstColumnName, dstIndexMapping) + } + + // construct vertex index for source column + def generateSrcIndexForEdges(edgeDf: DataFrame, srcColumnName: String): DataFrame = { + val srcDf = edgeDf.select(srcColumnName).distinct() + val srcIndexMapping = constructVertexIndexMapping(srcDf, srcColumnName) + generateSrcIndexForEdgesFromMapping(edgeDf, srcColumnName, srcIndexMapping) + } + + // construct vertex index for destination column + def generateDstIndexForEdges(edgeDf: DataFrame, dstColumnName: String): DataFrame = { + val dstDf = edgeDf.select(dstColumnName).distinct() + val dstIndexMapping = constructVertexIndexMapping(dstDf, dstColumnName) + generateDstIndexForEdgesFromMapping(edgeDf, dstColumnName, dstIndexMapping) + } + + // union and construct vertex index for source & destination columns + def generateSrcAndDstIndexUnitedlyForEdges(edgeDf: DataFrame, srcColumnName: String, dstColumnName: String): DataFrame = { + val srcDf = edgeDf.select(srcColumnName) + val dstDf = edgeDf.select(dstColumnName) + val primaryKey = GeneralParams.primaryCol; + val vertexDf = srcDf.withColumnRenamed(srcColumnName, primaryKey).union(dstDf.withColumnRenamed(dstColumnName, primaryKey)).distinct() + val vertexIndexMapping = constructVertexIndexMapping(vertexDf, primaryKey) + generateSrcAndDstIndexForEdgesFromMapping(edgeDf, srcColumnName, dstColumnName, vertexIndexMapping, vertexIndexMapping) + } +} diff --git a/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala b/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala new file mode 100644 index 000000000..a2dcea274 --- /dev/null +++ b/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala @@ -0,0 +1,40 @@ +package com.alibaba.graphar + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.scalatest.funsuite.AnyFunSuite + +class IndexGeneratorSuite extends AnyFunSuite { + val spark = SparkSession.builder() + .enableHiveSupport() + .master("local[*]") + .getOrCreate() + + test("generate vertex index") { + val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath + val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path) + assertThrows[IllegalArgumentException](vertex_df.schema.fieldIndex(GeneralParams.vertexIndexCol)) + val df_with_index = IndexGenerator.generateVertexIndexColumn(vertex_df) + val field_index = df_with_index.schema(GeneralParams.vertexIndexCol) + val desc = df_with_index.describe(GeneralParams.vertexIndexCol) + } + + test("generate edge index") { + val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath + val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path) + val df_with_index = IndexGenerator.generateSrcAndDstIndexUnitedlyForEdges(edge_df, "src", "dst") + df_with_index.show() + } + + test("generate edge index with vertex") { + val vertex_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath + val edge_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath + val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(vertex_path) + val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(edge_path) + val vertex_mapping = IndexGenerator.constructVertexIndexMapping(vertex_df, "id") + val edge_df_src_index = IndexGenerator.generateSrcIndexForEdgesFromMapping(edge_df, "src", vertex_mapping) + edge_df_src_index.show() + val edge_df_src_dst_index = IndexGenerator.generateDstIndexForEdgesFromMapping(edge_df_src_index, "dst", vertex_mapping) + edge_df_src_dst_index.show() + } + + } From 39bc4e7c4f91953485c16757aa8d2e60d812ec13 Mon Sep 17 00:00:00 2001 From: acezen Date: Fri, 16 Dec 2022 10:15:19 +0800 Subject: [PATCH 11/13] Add Dir related member functions --- .../scala/com/alibaba/graphar/EdgeInfo.scala | 28 +++++++++++++++++++ .../com/alibaba/graphar/VertexInfo.scala | 19 +++++++++++++ .../com/alibaba/graphar/TestGraphInfo.scala | 8 ++++++ 3 files changed, 55 insertions(+) diff --git a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala index 6f8cbc541..fef409e8e 100644 --- a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala @@ -215,12 +215,22 @@ class EdgeInfo() { return str } + def getAdjListOffsetDirPath(adj_list_type: AdjListType.Value) : String = { + if (containAdjList(adj_list_type) == false) + throw new IllegalArgumentException + return prefix + getAdjListPrefix(adj_list_type) + "offset/" + } + def getAdjListFilePath(vertex_chunk_index: Long, chunk_index: Long, adj_list_type: AdjListType.Value) : String = { var str: String = prefix + getAdjListPrefix(adj_list_type) + "adj_list/part" + vertex_chunk_index.toString() + "/chunk" + chunk_index.toString() return str } + def getAdjListDirPath(adj_list_type: AdjListType.Value) : String = { + return prefix + getAdjListPrefix(adj_list_type) + "adj_list/" + } + def getPropertyFilePath(property_group: PropertyGroup, adj_list_type: AdjListType.Value, vertex_chunk_index: Long, chunk_index: Long) : String = { if (containPropertyGroup(property_group, adj_list_type) == false) throw new IllegalArgumentException @@ -239,4 +249,22 @@ class EdgeInfo() { vertex_chunk_index.toString() + "/chunk" + chunk_index.toString() return str } + + def getPropertyDirPath(property_group: PropertyGroup, adj_list_type: AdjListType.Value) : String = { + if (containPropertyGroup(property_group, adj_list_type) == false) + throw new IllegalArgumentException + var str: String = property_group.getPrefix + if (str == "") { + val properties = property_group.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (j > 0) + str += GeneralParams.regularSeperator + str += properties.get(j).getName; + } + str += "/" + } + str = prefix + getAdjListPrefix(adj_list_type) + str + return str + } } diff --git a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala index 21e4a08cf..5cab17601 100644 --- a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala @@ -129,9 +129,28 @@ class VertexInfo() { str += GeneralParams.regularSeperator str += properties.get(j).getName; } + str += "/" } else { str = property_group.getPrefix } return prefix + str + "part" + chunk_index.toString() + "/chunk0" } + + def getDirPath(property_group: PropertyGroup): String = { + if (containPropertyGroup(property_group) == false) + throw new IllegalArgumentException + var str: String = "" + if (property_group.getPrefix == "") { + val properties = property_group.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (j > 0) + str += GeneralParams.regularSeperator + str += properties.get(j).getName; + } + } else { + str = property_group.getPrefix + } + return prefix + str; + } } diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala index 3744d8b2e..03cdb9796 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -53,6 +53,7 @@ class GraphInfoSuite extends AnyFunSuite { assert(vertex_info.isPrimaryKey("id")) assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/part0/chunk0") assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/part4/chunk0") + assert(vertex_info.getDirPath(property_group) == "vertex/person/id/") assert(vertex_info.containProperty("firstName")) val property_group_2 = vertex_info.getPropertyGroup("firstName") @@ -64,6 +65,7 @@ class GraphInfoSuite extends AnyFunSuite { assert(vertex_info.isPrimaryKey("firstName") == false) assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/part0/chunk0") assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/part4/chunk0") + assert(vertex_info.getDirPath(property_group_2) == "vertex/person/firstName_lastName_gender/") assert(vertex_info.containProperty("not_exist") == false) assertThrows[IllegalArgumentException](vertex_info.getPropertyGroup("not_exist")) @@ -95,8 +97,10 @@ class GraphInfoSuite extends AnyFunSuite { assert(edge_info.getPropertyGroups(AdjListType.ordered_by_source).size == 1) assert(edge_info.getAdjListFilePath(0, 0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part0/chunk0") assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/chunk2") + assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/") assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part0/chunk0") assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part4/chunk0") + assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/") val property_group = edge_info.getPropertyGroups(AdjListType.ordered_by_source).get(0) assert(edge_info.containPropertyGroup(property_group, AdjListType.ordered_by_source)) val property = property_group.getProperties.get(0) @@ -107,6 +111,7 @@ class GraphInfoSuite extends AnyFunSuite { assert(edge_info.isPrimaryKey(property_name) == property.getIs_primary) assert(edge_info.getPropertyFilePath(property_group, AdjListType.ordered_by_source, 0, 0) == "edge/person_knows_person/ordered_by_source/creationDate/part0/chunk0") assert(edge_info.getPropertyFilePath(property_group, AdjListType.ordered_by_source, 1, 2) == "edge/person_knows_person/ordered_by_source/creationDate/part1/chunk2") + assert(edge_info.getPropertyDirPath(property_group, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/creationDate/") assert(edge_info.containAdjList(AdjListType.ordered_by_dest)) assert(edge_info.getAdjListPrefix(AdjListType.ordered_by_dest) == "ordered_by_dest/") @@ -114,8 +119,10 @@ class GraphInfoSuite extends AnyFunSuite { assert(edge_info.getPropertyGroups(AdjListType.ordered_by_dest).size == 1) assert(edge_info.getAdjListFilePath(0, 0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part0/chunk0") assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/chunk2") + assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/") assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part0/chunk0") assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part4/chunk0") + assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/") val property_group_2 = edge_info.getPropertyGroups(AdjListType.ordered_by_dest).get(0) assert(edge_info.containPropertyGroup(property_group_2, AdjListType.ordered_by_dest)) val property_2 = property_group_2.getProperties.get(0) @@ -126,6 +133,7 @@ class GraphInfoSuite extends AnyFunSuite { assert(edge_info.isPrimaryKey(property_name_2) == property_2.getIs_primary) assert(edge_info.getPropertyFilePath(property_group_2, AdjListType.ordered_by_dest, 0, 0) == "edge/person_knows_person/ordered_by_dest/creationDate/part0/chunk0") assert(edge_info.getPropertyFilePath(property_group_2, AdjListType.ordered_by_dest, 1, 2) == "edge/person_knows_person/ordered_by_dest/creationDate/part1/chunk2") + assert(edge_info.getPropertyDirPath(property_group_2, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/creationDate/") assert(edge_info.containAdjList(AdjListType.unordered_by_source) == false) assertThrows[IllegalArgumentException](edge_info.getAdjListPrefix(AdjListType.unordered_by_source)) From a7ffd65e8548ba4b392c1d2c020991447d9fd5c7 Mon Sep 17 00:00:00 2001 From: acezen Date: Fri, 16 Dec 2022 10:24:24 +0800 Subject: [PATCH 12/13] Minor update --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aaa178904..8cbe50841 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,6 +12,7 @@ on: - '**.rst' - 'docs/**' - 'misc/**' + - 'spark/**' pull_request: branches: - main @@ -21,6 +22,7 @@ on: - '**.rst' - 'docs/**' - 'misc/**' + - 'spark/**' concurrency: group: ${{ github.repository }}-${{ github.event.number || github.head_ref || github.sha }}-${{ github.workflow }} From 31fc3ab530ae44115390cb1f1d6518d6bd74d5ab Mon Sep 17 00:00:00 2001 From: acezen Date: Fri, 16 Dec 2022 12:58:19 +0800 Subject: [PATCH 13/13] Update --- spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala | 6 ++++++ .../src/test/scala/com/alibaba/graphar/TestGraphInfo.scala | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala index fef409e8e..9f408b3e6 100644 --- a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala @@ -227,6 +227,12 @@ class EdgeInfo() { return str } + def getAdjListFilePath(vertex_chunk_index: Long, adj_list_type: AdjListType.Value) : String = { + var str: String = prefix + getAdjListPrefix(adj_list_type) + "adj_list/part" + + vertex_chunk_index.toString() + "/" + return str + } + def getAdjListDirPath(adj_list_type: AdjListType.Value) : String = { return prefix + getAdjListPrefix(adj_list_type) + "adj_list/" } diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala index 03cdb9796..d1e4db582 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -96,7 +96,9 @@ class GraphInfoSuite extends AnyFunSuite { assert(edge_info.getAdjListFileType(AdjListType.ordered_by_source) == FileType.CSV) assert(edge_info.getPropertyGroups(AdjListType.ordered_by_source).size == 1) assert(edge_info.getAdjListFilePath(0, 0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part0/chunk0") + assert(edge_info.getAdjListFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part0/") assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/chunk2") + assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/") assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/") assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part0/chunk0") assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part4/chunk0") @@ -118,7 +120,9 @@ class GraphInfoSuite extends AnyFunSuite { assert(edge_info.getAdjListFileType(AdjListType.ordered_by_dest) == FileType.CSV) assert(edge_info.getPropertyGroups(AdjListType.ordered_by_dest).size == 1) assert(edge_info.getAdjListFilePath(0, 0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part0/chunk0") + assert(edge_info.getAdjListFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part0/") assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/chunk2") + assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/") assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/") assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part0/chunk0") assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part4/chunk0")