diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aaa178904..8cbe50841 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,6 +12,7 @@ on: - '**.rst' - 'docs/**' - 'misc/**' + - 'spark/**' pull_request: branches: - main @@ -21,6 +22,7 @@ on: - '**.rst' - 'docs/**' - 'misc/**' + - 'spark/**' concurrency: group: ${{ github.repository }}-${{ github.event.number || github.head_ref || github.sha }}-${{ github.workflow }} diff --git a/.github/workflows/spark.yaml b/.github/workflows/spark.yaml new file mode 100644 index 000000000..1feed20f8 --- /dev/null +++ b/.github/workflows/spark.yaml @@ -0,0 +1,41 @@ +name: GraphAr Spark CI + +on: + # Trigger the workflow on push or pull request, + # but only for the main branch + push: + branches: + - main + paths: + - 'spark/**' + pull_request: + branches: + - main + paths: + - 'spark/**' + +concurrency: + group: ${{ github.repository }}-${{ github.event.number || github.head_ref || github.sha }}-${{ github.workflow }} + cancel-in-progress: true + +jobs: + GraphAr-spark: + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 + with: + submodules: true + + - name: Build GraphAr Spark + run: | + export JAVA_HOME=${JAVA_HOME_11_X64} + pushd spark + mvn clean package + popd + + - name: Run test + run: | + export JAVA_HOME=${JAVA_HOME_11_X64} + pushd spark + mvn test + popd diff --git a/.gitignore b/.gitignore index bc399f64b..0c4a1b713 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ /build/ -spark/generator/target/ +spark/target/ .vscode .idea .DS_store diff --git a/spark/pom.xml b/spark/pom.xml new file mode 100644 index 000000000..375d4250a --- /dev/null +++ b/spark/pom.xml @@ -0,0 +1,125 @@ + + + 4.0.0 + + com.alibaba + graphar + 0.1.0-SNAPSHOT + + + graphar + UTF-8 + UTF-8 + 2.12.10 + 2.12 + 512m + 1024m + 3.1.1 + 8 + 1.8 + 1.8 + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-mllib_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + provided + + + org.scalatest + scalatest_${scala.binary.version} + 3.1.1 + provided + + + org.scala-lang + scala-library + ${scala.version} + provided + + + org.yaml + snakeyaml + 1.26 + + + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + ${scala.version} + + -target:jvm-1.8 + + + -Xss4096K + + + + + scala-compile + + compile + + + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + scala-test-compile + + testCompile + + + + + + org.scalatest + scalatest-maven-plugin + 2.0.0 + + + test + + test + + + + + + + jar + diff --git a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java new file mode 100644 index 000000000..8cfb5069b --- /dev/null +++ b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java @@ -0,0 +1,14 @@ +package com.alibaba.graphar; + +public class GeneralParams { + // column name + public static final String vertexIndexCol = "_graphArVertexIndex"; + public static final String srcIndexCol = "_graphArSrcIndex"; + public static final String dstIndexCol = "_graphArDstIndex"; + public static final String offsetCol = "_graphArOffset"; + public static final String primaryCol = "_graphArPrimary"; + public static final String vertexChunkIndexCol = "_graphArVertexChunkIndex"; + public static final String edgeIndexCol = "_graphArEdgeIndex"; + public static final String regularSeperator = "_"; +} + diff --git a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala new file mode 100644 index 000000000..9f408b3e6 --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala @@ -0,0 +1,276 @@ +package com.alibaba.graphar + +import java.io.{File, FileInputStream} +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor +import scala.beans.BeanProperty + +class EdgeInfo() { + @BeanProperty var src_label: String = "" + @BeanProperty var edge_label: String = "" + @BeanProperty var dst_label: String = "" + @BeanProperty var chunk_size: Long = 0 + @BeanProperty var src_chunk_size: Long = 0 + @BeanProperty var dst_chunk_size: Long = 0 + @BeanProperty var directed: Boolean = false + @BeanProperty var prefix: String = "" + @BeanProperty var adj_lists = new java.util.ArrayList[AdjList]() + @BeanProperty var version: String = "" + + def containAdjList(adj_list_type: AdjListType.Value): Boolean = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) + return true + } + return false + } + + def getAdjListPrefix(adj_list_type: AdjListType.Value): String = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) { + var str: String = adj_list.getPrefix + if (str == "") { + str = AdjListType.AdjListTypeToString(adj_list_type) + "/" + } + return str + } + } + throw new IllegalArgumentException + } + + def getAdjListFileType(adj_list_type: AdjListType.Value): FileType.Value = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) { + return adj_list.getFile_type_in_gar + } + } + throw new IllegalArgumentException + } + + def getPropertyGroups(adj_list_type: AdjListType.Value): java.util.ArrayList[PropertyGroup] = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) { + return adj_list.getProperty_groups + } + } + throw new IllegalArgumentException + } + + def containPropertyGroup(property_group: PropertyGroup, adj_list_type: AdjListType.Value): Boolean = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) { + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + if (pg == property_group) { + return true + } + } + } + } + return false + } + + def containProperty(property_name: String): Boolean = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return true + } + } + } + } + return false + } + + def getPropertyGroup(property_name: String, adj_list_type: AdjListType.Value): PropertyGroup = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + if (adj_list.getAdjList_type_in_gar == adj_list_type) { + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return pg + } + } + } + } + } + throw new IllegalArgumentException + } + + def getPropertyType(property_name: String): GarType.Value = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return properties.get(j).getData_type_in_gar + } + } + } + } + throw new IllegalArgumentException + } + + def isPrimaryKey(property_name: String): Boolean = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return properties.get(j).getIs_primary + } + } + } + } + throw new IllegalArgumentException + } + + def getPrimaryKey(): String = { + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getIs_primary) { + return properties.get(j).getName + } + } + } + } + return "" + } + + def isValidated(): Boolean = { + if (src_label == "" || edge_label == "" || dst_label == "") + return false + if (chunk_size <= 0 || src_chunk_size <= 0 || dst_chunk_size <= 0) + return false + val tot: Int = adj_lists.size + for ( k <- 0 to tot - 1 ) { + val adj_list = adj_lists.get(k) + val file_type = adj_list.getFile_type_in_gar + val property_groups = adj_list.getProperty_groups + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + if (num == 0) + return false + val pg_file_type = pg.getFile_type_in_gar + } + } + return true + } + + def getAdjListOffsetFilePath(chunk_index: Long, adj_list_type: AdjListType.Value) : String = { + if (containAdjList(adj_list_type) == false) + throw new IllegalArgumentException + val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/part" + + chunk_index.toString() + "/chunk0" + return str + } + + def getAdjListOffsetDirPath(adj_list_type: AdjListType.Value) : String = { + if (containAdjList(adj_list_type) == false) + throw new IllegalArgumentException + return prefix + getAdjListPrefix(adj_list_type) + "offset/" + } + + def getAdjListFilePath(vertex_chunk_index: Long, chunk_index: Long, adj_list_type: AdjListType.Value) : String = { + var str: String = prefix + getAdjListPrefix(adj_list_type) + "adj_list/part" + + vertex_chunk_index.toString() + "/chunk" + chunk_index.toString() + return str + } + + def getAdjListFilePath(vertex_chunk_index: Long, adj_list_type: AdjListType.Value) : String = { + var str: String = prefix + getAdjListPrefix(adj_list_type) + "adj_list/part" + + vertex_chunk_index.toString() + "/" + return str + } + + def getAdjListDirPath(adj_list_type: AdjListType.Value) : String = { + return prefix + getAdjListPrefix(adj_list_type) + "adj_list/" + } + + def getPropertyFilePath(property_group: PropertyGroup, adj_list_type: AdjListType.Value, vertex_chunk_index: Long, chunk_index: Long) : String = { + if (containPropertyGroup(property_group, adj_list_type) == false) + throw new IllegalArgumentException + var str: String = property_group.getPrefix + if (str == "") { + val properties = property_group.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (j > 0) + str += GeneralParams.regularSeperator + str += properties.get(j).getName; + } + str += "/" + } + str = prefix + getAdjListPrefix(adj_list_type) + str + "part" + + vertex_chunk_index.toString() + "/chunk" + chunk_index.toString() + return str + } + + def getPropertyDirPath(property_group: PropertyGroup, adj_list_type: AdjListType.Value) : String = { + if (containPropertyGroup(property_group, adj_list_type) == false) + throw new IllegalArgumentException + var str: String = property_group.getPrefix + if (str == "") { + val properties = property_group.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (j > 0) + str += GeneralParams.regularSeperator + str += properties.get(j).getName; + } + str += "/" + } + str = prefix + getAdjListPrefix(adj_list_type) + str + return str + } +} diff --git a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala new file mode 100644 index 000000000..a60813080 --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala @@ -0,0 +1,141 @@ +package com.alibaba.graphar + +import java.io.{File, FileInputStream} +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor +import scala.beans.BeanProperty + +object GarType extends Enumeration{ + type GarType = Value + val BOOL = Value(0) + val INT32 = Value(2) + val INT64 = Value(3) + val FLOAT = Value(4) + val DOUBLE = Value(5) + val STRING = Value(6) + + def GarTypeToString(gar_type: GarType.Value): String = gar_type match { + case GarType.BOOL => "bool" + case GarType.INT32 => "int32" + case GarType.INT64 => "int64" + case GarType.FLOAT => "float" + case GarType.DOUBLE => "double" + case GarType.STRING => "string" + case _ => throw new IllegalArgumentException + } + + def StringToGarType(str: String): GarType.Value = str match { + case "bool" => GarType.BOOL + case "int32" => GarType.INT32 + case "int64" => GarType.INT64 + case "float" => GarType.FLOAT + case "double" => GarType.DOUBLE + case "string" => GarType.STRING + case _ => throw new IllegalArgumentException + } +} + +object FileType extends Enumeration { + type FileType = Value + val CSV = Value(0) + val PARQUET = Value(1) + val ORC = Value(2) + + def FileTypeToString(file_type: FileType.Value): String = file_type match { + case FileType.CSV => "csv" + case FileType.PARQUET => "parquet" + case FileType.ORC => "orc" + case _ => throw new IllegalArgumentException + } + + def StringToFileType(str: String): FileType.Value = str match { + case "csv" => FileType.CSV + case "parquet" => FileType.PARQUET + case "orc" => FileType.ORC + case _ => throw new IllegalArgumentException + } + +} + +object AdjListType extends Enumeration { + type AdjListType = Value + val unordered_by_source = Value(0) + val unordered_by_dest = Value(1) + val ordered_by_source = Value(2) + val ordered_by_dest = Value(3) + + def AdjListTypeToString(adjList_type: AdjListType.Value): String = adjList_type match { + case AdjListType.unordered_by_source => "unordered_by_source" + case AdjListType.unordered_by_dest => "unordered_by_dest" + case AdjListType.ordered_by_source => "ordered_by_source" + case AdjListType.ordered_by_dest => "ordered_by_dest" + case _ => throw new IllegalArgumentException + } + + def StringToAdjListType(str: String): AdjListType.Value = str match { + case "unordered_by_source" => AdjListType.unordered_by_source + case "unordered_by_dest" => AdjListType.unordered_by_dest + case "ordered_by_source" => AdjListType.ordered_by_source + case "ordered_by_dest" => AdjListType.ordered_by_dest + case _ => throw new IllegalArgumentException + } +} + +class Property () { + @BeanProperty var name: String = "" + @BeanProperty var data_type: String = "" + @BeanProperty var is_primary: Boolean = false + + def getData_type_in_gar: GarType.Value = { + GarType.StringToGarType(data_type) + } +} + +class PropertyGroup () { + @BeanProperty var prefix: String = "" + @BeanProperty var file_type: String = "" + @BeanProperty var properties = new java.util.ArrayList[Property]() + + def getFile_type_in_gar: FileType.Value = { + FileType.StringToFileType(file_type) + } +} + +class AdjList () { + @BeanProperty var ordered: Boolean = false + @BeanProperty var aligned_by: String = "src" + @BeanProperty var prefix: String = "" + @BeanProperty var file_type: String = "" + @BeanProperty var property_groups = new java.util.ArrayList[PropertyGroup]() + + def getFile_type_in_gar: FileType.Value = { + FileType.StringToFileType(file_type) + } + + def getAdjList_type: String = { + var str: String = "" + if (ordered) { + str = "ordered_by_" + } else { + str = "unordered_by_" + } + if (aligned_by == "src") { + str += "source" + } else { + str += "dest" + } + return str + } + + def getAdjList_type_in_gar: AdjListType.Value = { + AdjListType.StringToAdjListType(getAdjList_type) + } +} + +class GraphInfo() { + @BeanProperty var name: String = "" + @BeanProperty var prefix: String = "" + @BeanProperty var vertices = new java.util.ArrayList[String]() + @BeanProperty var edges = new java.util.ArrayList[String]() + @BeanProperty var version: String = "" +} diff --git a/spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala b/spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala new file mode 100644 index 000000000..15bd02d3e --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala @@ -0,0 +1,119 @@ +package com.alibaba.graphar + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Row +import org.apache.spark.rdd.RDD + +import scala.collection.SortedMap +import scala.collection.mutable.ArrayBuffer + +object IndexGenerator { + + // index helper for the vertex DataFrame + + // return a DataFrame contains two columns: vertex index & primary key + def constructVertexIndexMapping(vertexDf: DataFrame, primaryKey: String): DataFrame = { + val spark = vertexDf.sparkSession + val schema = vertexDf.schema + val id_index = schema.fieldIndex(primaryKey) + val mapping_schema = StructType(Seq(StructField(GeneralParams.vertexIndexCol, LongType, false), schema.apply(id_index))) + val rdd = vertexDf.rdd + val counts = rdd + .mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true) + .collectAsMap() + val aggregatedCounts = SortedMap(counts.toSeq: _*) + .foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) => + (total + c, map + (i -> total)) + } + ._2 + val broadcastedCounts = spark.sparkContext.broadcast(aggregatedCounts) + val mapping = rdd.mapPartitionsWithIndex((i, ps) => { + val start = broadcastedCounts.value(i) + for { (p, j) <- ps.zipWithIndex } yield Row(start + j, p(id_index)) + }) + spark.createDataFrame(mapping, mapping_schema).withColumnRenamed(primaryKey, GeneralParams.primaryCol) + } + + //add a column contains vertex index + def generateVertexIndexColumn(vertexDf: DataFrame): DataFrame = { + val spark = vertexDf.sparkSession + val schema = vertexDf.schema + val schema_with_index = StructType(StructType(Seq(StructField(GeneralParams.vertexIndexCol, LongType, true)))++schema) + val rdd = vertexDf.rdd + val counts = rdd + .mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true) + .collectAsMap() + val aggregatedCounts = SortedMap(counts.toSeq: _*) + .foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) => + (total + c, map + (i -> total)) + } + ._2 + val broadcastedCounts = spark.sparkContext.broadcast(aggregatedCounts) + val rdd_with_index = rdd.mapPartitionsWithIndex((i, ps) => { + val start = broadcastedCounts.value(i) + for { (p, j) <- ps.zipWithIndex } yield Row.fromSeq(Seq(start + j) ++ p.toSeq) + }) + spark.createDataFrame(rdd_with_index, schema_with_index) + } + + //index helper for the Edge DataFrame + + // join the edge table with the vertex index mapping for source column + def generateSrcIndexForEdgesFromMapping(edgeDf: DataFrame, srcColumnName: String, srcIndexMapping: DataFrame): DataFrame = { + val spark = edgeDf.sparkSession + srcIndexMapping.createOrReplaceTempView("src_vertex") + edgeDf.createOrReplaceTempView("edge") + val srcCol = GeneralParams.srcIndexCol; + val indexCol = GeneralParams.vertexIndexCol; + val srcPrimaryKey = GeneralParams.primaryCol; + val trans_df = spark.sql(f"select src_vertex.$indexCol%s as $srcCol%s, edge.* from edge inner join src_vertex on src_vertex.$srcPrimaryKey%s=edge.$srcColumnName%s") + // drop the old src id col + trans_df.drop(srcColumnName) + } + + // join the edge table with the vertex index mapping for destination column + def generateDstIndexForEdgesFromMapping(edgeDf: DataFrame, dstColumnName: String, dstIndexMapping: DataFrame): DataFrame = { + val spark = edgeDf.sparkSession + dstIndexMapping.createOrReplaceTempView("dst_vertex") + edgeDf.createOrReplaceTempView("edge") + val dstCol = GeneralParams.dstIndexCol; + val indexCol = GeneralParams.vertexIndexCol; + val dstPrimaryKey = GeneralParams.primaryCol; + val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edge.* from edge inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edge.$dstColumnName%s") + // drop the old dst id col + trans_df.drop(dstColumnName) + } + + // join the edge table with the vertex index mapping for source & destination columns + def generateSrcAndDstIndexForEdgesFromMapping(edgeDf: DataFrame, srcColumnName: String, dstColumnName: String, srcIndexMapping: DataFrame, dstIndexMapping: DataFrame): DataFrame = { + val df_with_src_index = generateSrcIndexForEdgesFromMapping(edgeDf, srcColumnName, srcIndexMapping) + generateDstIndexForEdgesFromMapping(df_with_src_index, dstColumnName, dstIndexMapping) + } + + // construct vertex index for source column + def generateSrcIndexForEdges(edgeDf: DataFrame, srcColumnName: String): DataFrame = { + val srcDf = edgeDf.select(srcColumnName).distinct() + val srcIndexMapping = constructVertexIndexMapping(srcDf, srcColumnName) + generateSrcIndexForEdgesFromMapping(edgeDf, srcColumnName, srcIndexMapping) + } + + // construct vertex index for destination column + def generateDstIndexForEdges(edgeDf: DataFrame, dstColumnName: String): DataFrame = { + val dstDf = edgeDf.select(dstColumnName).distinct() + val dstIndexMapping = constructVertexIndexMapping(dstDf, dstColumnName) + generateDstIndexForEdgesFromMapping(edgeDf, dstColumnName, dstIndexMapping) + } + + // union and construct vertex index for source & destination columns + def generateSrcAndDstIndexUnitedlyForEdges(edgeDf: DataFrame, srcColumnName: String, dstColumnName: String): DataFrame = { + val srcDf = edgeDf.select(srcColumnName) + val dstDf = edgeDf.select(dstColumnName) + val primaryKey = GeneralParams.primaryCol; + val vertexDf = srcDf.withColumnRenamed(srcColumnName, primaryKey).union(dstDf.withColumnRenamed(dstColumnName, primaryKey)).distinct() + val vertexIndexMapping = constructVertexIndexMapping(vertexDf, primaryKey) + generateSrcAndDstIndexForEdgesFromMapping(edgeDf, srcColumnName, dstColumnName, vertexIndexMapping, vertexIndexMapping) + } +} diff --git a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala new file mode 100644 index 000000000..5cab17601 --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala @@ -0,0 +1,156 @@ +package com.alibaba.graphar + +import java.io.{File, FileInputStream} +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor +import scala.beans.BeanProperty + +class VertexInfo() { + @BeanProperty var label: String = "" + @BeanProperty var chunk_size: Long = 0 + @BeanProperty var prefix: String = "" + @BeanProperty var property_groups = new java.util.ArrayList[PropertyGroup]() + @BeanProperty var version: String = "" + + def containPropertyGroup(property_group: PropertyGroup): Boolean = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + if (pg == property_group) { + return true + } + } + return false + } + + def containProperty(property_name: String): Boolean = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return true + } + } + } + return false + } + + def getPropertyGroup(property_name: String): PropertyGroup = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return pg + } + } + } + throw new IllegalArgumentException + } + + def getPropertyType(property_name: String): GarType.Value = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return properties.get(j).getData_type_in_gar + } + } + } + throw new IllegalArgumentException + } + + def isPrimaryKey(property_name: String): Boolean = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getName == property_name) { + return properties.get(j).getIs_primary + } + } + } + throw new IllegalArgumentException + } + + def getPrimaryKey(): String = { + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (properties.get(j).getIs_primary) { + return properties.get(j).getName + } + } + } + return "" + } + + def isValidated(): Boolean = { + if (label == "" || chunk_size <= 0) + return false + val len: Int = property_groups.size + for ( i <- 0 to len - 1 ) { + val pg: PropertyGroup = property_groups.get(i) + val properties = pg.getProperties + val num = properties.size + if (num == 0) + return false + val file_type = pg.getFile_type_in_gar + } + return true + } + + def getVerticesNumFilePath(): String = { + return prefix + "vertex_count" + } + + def getFilePath(property_group: PropertyGroup, chunk_index: Long): String = { + if (containPropertyGroup(property_group) == false) + throw new IllegalArgumentException + var str: String = "" + if (property_group.getPrefix == "") { + val properties = property_group.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (j > 0) + str += GeneralParams.regularSeperator + str += properties.get(j).getName; + } + str += "/" + } else { + str = property_group.getPrefix + } + return prefix + str + "part" + chunk_index.toString() + "/chunk0" + } + + def getDirPath(property_group: PropertyGroup): String = { + if (containPropertyGroup(property_group) == false) + throw new IllegalArgumentException + var str: String = "" + if (property_group.getPrefix == "") { + val properties = property_group.getProperties + val num = properties.size + for ( j <- 0 to num - 1 ) { + if (j > 0) + str += GeneralParams.regularSeperator + str += properties.get(j).getName; + } + } else { + str = property_group.getPrefix + } + return prefix + str; + } +} diff --git a/spark/src/test/resources/gar-test b/spark/src/test/resources/gar-test new file mode 120000 index 000000000..952da9d1f --- /dev/null +++ b/spark/src/test/resources/gar-test @@ -0,0 +1 @@ +../../../../test/gar-test \ No newline at end of file diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala new file mode 100644 index 000000000..d1e4db582 --- /dev/null +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -0,0 +1,156 @@ +package com.alibaba.graphar + +import java.io.{File, FileInputStream} +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor +import scala.beans.BeanProperty +import org.scalatest.funsuite.AnyFunSuite + +class GraphInfoSuite extends AnyFunSuite { + + test("load graph info") { + val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml") + val yaml = new Yaml(new Constructor(classOf[GraphInfo])) + val graph_info = yaml.load(input).asInstanceOf[GraphInfo] + + assert(graph_info.getName == "ldbc_sample") + assert(graph_info.getPrefix == "" ) + assert(graph_info.getVertices.size() == 1) + val vertices = new java.util.ArrayList[String] + vertices.add("person.vertex.yml") + assert(graph_info.getVertices == vertices) + assert(graph_info.getEdges.size() == 1) + val edges = new java.util.ArrayList[String] + edges.add("person_knows_person.edge.yml") + assert(graph_info.getEdges == edges) + assert(graph_info.getVersion == "gar/v1") + } + + test("load vertex info") { + val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person.vertex.yml") + val yaml = new Yaml(new Constructor(classOf[VertexInfo])) + val vertex_info = yaml.load(input).asInstanceOf[VertexInfo] + + assert(vertex_info.getLabel == "person") + assert(vertex_info.isValidated) + assert(vertex_info.getVerticesNumFilePath() == "vertex/person/vertex_count") + assert(vertex_info.getPrimaryKey() == "id") + assert(vertex_info.getProperty_groups.size() == 2) + assert(vertex_info.getVersion == "gar/v1") + assert(vertex_info.getChunk_size == 100) + + for ( i <- 0 to (vertex_info.getProperty_groups.size - 1) ) { + val property_group = vertex_info.getProperty_groups.get(i) + assert(vertex_info.containPropertyGroup(property_group)) + } + assert(vertex_info.containProperty("id")) + val property_group = vertex_info.getPropertyGroup("id") + assert(property_group.getProperties.size == 1) + assert(property_group.getFile_type == "csv") + assert(property_group.getFile_type_in_gar == FileType.CSV) + assert(vertex_info.containPropertyGroup(property_group)) + assert(vertex_info.getPropertyType("id") == GarType.INT64) + assert(vertex_info.isPrimaryKey("id")) + assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/part0/chunk0") + assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/part4/chunk0") + assert(vertex_info.getDirPath(property_group) == "vertex/person/id/") + + assert(vertex_info.containProperty("firstName")) + val property_group_2 = vertex_info.getPropertyGroup("firstName") + assert(property_group_2.getProperties.size == 3) + assert(property_group_2.getFile_type == "csv") + assert(property_group_2.getFile_type_in_gar == FileType.CSV) + assert(vertex_info.containPropertyGroup(property_group_2)) + assert(vertex_info.getPropertyType("firstName") == GarType.STRING) + assert(vertex_info.isPrimaryKey("firstName") == false) + assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/part0/chunk0") + assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/part4/chunk0") + assert(vertex_info.getDirPath(property_group_2) == "vertex/person/firstName_lastName_gender/") + + assert(vertex_info.containProperty("not_exist") == false) + assertThrows[IllegalArgumentException](vertex_info.getPropertyGroup("not_exist")) + assertThrows[IllegalArgumentException](vertex_info.getPropertyType("now_exist")) + assertThrows[IllegalArgumentException](vertex_info.isPrimaryKey("now_exist")) + } + + test("load edge info") { + val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person_knows_person.edge.yml") + val yaml = new Yaml(new Constructor(classOf[EdgeInfo])) + val edge_info = yaml.load(input).asInstanceOf[EdgeInfo] + + assert(edge_info.getSrc_label == "person") + assert(edge_info.getDst_label == "person") + assert(edge_info.getEdge_label == "knows") + assert(edge_info.getChunk_size == 1024) + assert(edge_info.getSrc_chunk_size == 100) + assert(edge_info.getDst_chunk_size == 100) + assert(edge_info.getDirected == false) + assert(edge_info.getPrefix == "edge/person_knows_person/") + assert(edge_info.getAdj_lists.size == 2) + assert(edge_info.getVersion == "gar/v1") + assert(edge_info.isValidated) + assert(edge_info.getPrimaryKey == "") + + assert(edge_info.containAdjList(AdjListType.ordered_by_source)) + assert(edge_info.getAdjListPrefix(AdjListType.ordered_by_source) == "ordered_by_source/") + assert(edge_info.getAdjListFileType(AdjListType.ordered_by_source) == FileType.CSV) + assert(edge_info.getPropertyGroups(AdjListType.ordered_by_source).size == 1) + assert(edge_info.getAdjListFilePath(0, 0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part0/chunk0") + assert(edge_info.getAdjListFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part0/") + assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/chunk2") + assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/") + assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/") + assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part0/chunk0") + assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part4/chunk0") + assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/") + val property_group = edge_info.getPropertyGroups(AdjListType.ordered_by_source).get(0) + assert(edge_info.containPropertyGroup(property_group, AdjListType.ordered_by_source)) + val property = property_group.getProperties.get(0) + val property_name = property.getName + assert(edge_info.containProperty(property_name)) + assert(edge_info.getPropertyGroup(property_name, AdjListType.ordered_by_source) == property_group) + assert(edge_info.getPropertyType(property_name) == property.getData_type_in_gar) + assert(edge_info.isPrimaryKey(property_name) == property.getIs_primary) + assert(edge_info.getPropertyFilePath(property_group, AdjListType.ordered_by_source, 0, 0) == "edge/person_knows_person/ordered_by_source/creationDate/part0/chunk0") + assert(edge_info.getPropertyFilePath(property_group, AdjListType.ordered_by_source, 1, 2) == "edge/person_knows_person/ordered_by_source/creationDate/part1/chunk2") + assert(edge_info.getPropertyDirPath(property_group, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/creationDate/") + + assert(edge_info.containAdjList(AdjListType.ordered_by_dest)) + assert(edge_info.getAdjListPrefix(AdjListType.ordered_by_dest) == "ordered_by_dest/") + assert(edge_info.getAdjListFileType(AdjListType.ordered_by_dest) == FileType.CSV) + assert(edge_info.getPropertyGroups(AdjListType.ordered_by_dest).size == 1) + assert(edge_info.getAdjListFilePath(0, 0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part0/chunk0") + assert(edge_info.getAdjListFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part0/") + assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/chunk2") + assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/") + assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/") + assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part0/chunk0") + assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part4/chunk0") + assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/") + val property_group_2 = edge_info.getPropertyGroups(AdjListType.ordered_by_dest).get(0) + assert(edge_info.containPropertyGroup(property_group_2, AdjListType.ordered_by_dest)) + val property_2 = property_group_2.getProperties.get(0) + val property_name_2 = property_2.getName + assert(edge_info.containProperty(property_name_2)) + assert(edge_info.getPropertyGroup(property_name_2, AdjListType.ordered_by_dest) == property_group_2) + assert(edge_info.getPropertyType(property_name_2) == property_2.getData_type_in_gar) + assert(edge_info.isPrimaryKey(property_name_2) == property_2.getIs_primary) + assert(edge_info.getPropertyFilePath(property_group_2, AdjListType.ordered_by_dest, 0, 0) == "edge/person_knows_person/ordered_by_dest/creationDate/part0/chunk0") + assert(edge_info.getPropertyFilePath(property_group_2, AdjListType.ordered_by_dest, 1, 2) == "edge/person_knows_person/ordered_by_dest/creationDate/part1/chunk2") + assert(edge_info.getPropertyDirPath(property_group_2, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/creationDate/") + + assert(edge_info.containAdjList(AdjListType.unordered_by_source) == false) + assertThrows[IllegalArgumentException](edge_info.getAdjListPrefix(AdjListType.unordered_by_source)) + assertThrows[IllegalArgumentException](edge_info.getAdjListFileType(AdjListType.unordered_by_source)) + assertThrows[IllegalArgumentException](edge_info.getPropertyGroups(AdjListType.unordered_by_source)) + assert(edge_info.containPropertyGroup(property_group, AdjListType.unordered_by_source) == false) + assertThrows[IllegalArgumentException](edge_info.getAdjListOffsetFilePath(0, AdjListType.unordered_by_source)) + assertThrows[IllegalArgumentException](edge_info.getAdjListFilePath(0, 0, AdjListType.unordered_by_source)) + assertThrows[IllegalArgumentException](edge_info.getPropertyFilePath(property_group, AdjListType.unordered_by_source, 0, 0)) + assert(edge_info.containProperty("not_exist") == false) + assertThrows[IllegalArgumentException](edge_info.getPropertyGroup("not_exist", AdjListType.ordered_by_source)) + assertThrows[IllegalArgumentException](edge_info.getPropertyType("not_exist")) + assertThrows[IllegalArgumentException](edge_info.isPrimaryKey("not_exist")) + } + + } diff --git a/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala b/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala new file mode 100644 index 000000000..a2dcea274 --- /dev/null +++ b/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala @@ -0,0 +1,40 @@ +package com.alibaba.graphar + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.scalatest.funsuite.AnyFunSuite + +class IndexGeneratorSuite extends AnyFunSuite { + val spark = SparkSession.builder() + .enableHiveSupport() + .master("local[*]") + .getOrCreate() + + test("generate vertex index") { + val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath + val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path) + assertThrows[IllegalArgumentException](vertex_df.schema.fieldIndex(GeneralParams.vertexIndexCol)) + val df_with_index = IndexGenerator.generateVertexIndexColumn(vertex_df) + val field_index = df_with_index.schema(GeneralParams.vertexIndexCol) + val desc = df_with_index.describe(GeneralParams.vertexIndexCol) + } + + test("generate edge index") { + val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath + val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path) + val df_with_index = IndexGenerator.generateSrcAndDstIndexUnitedlyForEdges(edge_df, "src", "dst") + df_with_index.show() + } + + test("generate edge index with vertex") { + val vertex_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath + val edge_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath + val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(vertex_path) + val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(edge_path) + val vertex_mapping = IndexGenerator.constructVertexIndexMapping(vertex_df, "id") + val edge_df_src_index = IndexGenerator.generateSrcIndexForEdgesFromMapping(edge_df, "src", vertex_mapping) + edge_df_src_index.show() + val edge_df_src_dst_index = IndexGenerator.generateDstIndexForEdgesFromMapping(edge_df_src_index, "dst", vertex_mapping) + edge_df_src_dst_index.show() + } + + }