Skip to content

Commit

Permalink
[Improvement][Spark] Improve the writer effeciency with parallel proc…
Browse files Browse the repository at this point in the history
…ess (#329)
  • Loading branch information
acezen authored Jan 19, 2024
1 parent 9f94f34 commit 10bdb68
Showing 1 changed file with 42 additions and 38 deletions.
80 changes: 42 additions & 38 deletions spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.types.{
}
import org.apache.spark.sql.functions._

import scala.collection.parallel.immutable.ParSeq
import scala.collection.SortedMap
import scala.collection.mutable.ArrayBuffer

Expand All @@ -47,7 +48,7 @@ object EdgeWriter {
edgeInfo: EdgeInfo,
adjListType: AdjListType.Value,
vertexNumOfPrimaryVertexLabel: Long
): (DataFrame, Seq[DataFrame], Array[Long], Map[Long, Int]) = {
): (DataFrame, ParSeq[(Int, DataFrame)], Array[Long], Map[Long, Int]) = {
val edgeSchema = edgeDf.schema
val colName = if (
adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.unordered_by_source
Expand Down Expand Up @@ -150,34 +151,35 @@ object EdgeWriter {
val offsetDfSchema = StructType(
Seq(StructField(GeneralParams.offsetCol, IntegerType))
)
val offsetDfArray: Seq[DataFrame] = (0 until vertexChunkNum).map { i =>
{
val filterRDD = edgeCountsByPrimaryKey
.filter(v => v._1 / vertexChunkSize == i)
.map { case (k, v) => (k - i * vertexChunkSize + 1, v) }
val initRDD = spark.sparkContext.parallelize(
(0L to vertexChunkSize).map(key => (key, 0))
)
val unionRDD = spark.sparkContext
.union(filterRDD, initRDD)
.reduceByKey(_ + _)
.sortByKey(numPartitions = 1)
val offsetRDD = unionRDD
.mapPartitionsWithIndex((i, ps) => {
var sum = 0
var preSum = 0
for ((k, count) <- ps) yield {
preSum = sum
sum = sum + count
(k, count + preSum)
}
})
.map { case (k, v) => Row(v) }
val offsetChunk = spark.createDataFrame(offsetRDD, offsetDfSchema)
offsetChunk.persist(GeneralParams.defaultStorageLevel)
offsetChunk
val offsetDfArray: ParSeq[(Int, DataFrame)] =
(0 until vertexChunkNum).par.map { i =>
{
val filterRDD = edgeCountsByPrimaryKey
.filter(v => v._1 / vertexChunkSize == i)
.map { case (k, v) => (k - i * vertexChunkSize + 1, v) }
val initRDD = spark.sparkContext.parallelize(
(0L to vertexChunkSize).map(key => (key, 0))
)
val unionRDD = spark.sparkContext
.union(filterRDD, initRDD)
.reduceByKey(_ + _)
.sortByKey(numPartitions = 1)
val offsetRDD = unionRDD
.mapPartitionsWithIndex((i, ps) => {
var sum = 0
var preSum = 0
for ((k, count) <- ps) yield {
preSum = sum
sum = sum + count
(k, count + preSum)
}
})
.map { case (k, v) => Row(v) }
val offsetChunk = spark.createDataFrame(offsetRDD, offsetDfSchema)
offsetChunk.persist(GeneralParams.defaultStorageLevel)
(i, offsetChunk)
}
}
}
edgeCountsByPrimaryKey.unpersist() // unpersist the edgeCountsByPrimaryKey
return (
partitionEdgeDf,
Expand All @@ -186,7 +188,7 @@ object EdgeWriter {
edgeNumMutableMap.toMap
)
}
val offsetDfArray = Seq.empty[DataFrame]
val offsetDfArray = ParSeq.empty[(Int, DataFrame)]
return (
partitionEdgeDf,
offsetDfArray,
Expand Down Expand Up @@ -258,7 +260,7 @@ class EdgeWriter(
}

private val edgeDfAndOffsetDf
: (DataFrame, Seq[DataFrame], Array[Long], Map[Long, Int]) =
: (DataFrame, ParSeq[(Int, DataFrame)], Array[Long], Map[Long, Int]) =
EdgeWriter.repartitionAndSort(
spark,
edgeDf,
Expand All @@ -275,9 +277,10 @@ class EdgeWriter(
else edgeInfo.getDst_chunk_size()
val vertexChunkNum: Int =
((vertexNum + vertexChunkSize - 1) / vertexChunkSize).toInt
for (i <- 0 until vertexChunkNum) {
val edgeNumber = edgeDfAndOffsetDf._4(i)
val outputPath = prefix + edgeInfo.getEdgesNumFilePath(i, adjListType)
val parallelEdgeNums = edgeDfAndOffsetDf._4.par
parallelEdgeNums.foreach { case (chunkIndex, edgeNumber) =>
val outputPath =
prefix + edgeInfo.getEdgesNumFilePath(chunkIndex, adjListType)
FileSystem.writeValue(
edgeNumber,
outputPath,
Expand All @@ -291,16 +294,17 @@ class EdgeWriter(
var chunkIndex: Int = 0
val fileType = edgeInfo.getAdjListFileType(adjListType)
val outputPrefix = prefix + edgeInfo.getOffsetPathPrefix(adjListType)
for (offsetChunk <- edgeDfAndOffsetDf._2) {
// TODO(@acezen): Support parallel write with GarDataSource
val offsetChunks = edgeDfAndOffsetDf._2.seq
offsetChunks.foreach { case (i, offsetChunk) =>
FileSystem.writeDataFrame(
offsetChunk,
FileType.FileTypeToString(fileType),
outputPrefix,
Some(chunkIndex),
Some(i),
None
)
offsetChunk.unpersist()
chunkIndex = chunkIndex + 1
}
}

Expand Down Expand Up @@ -346,11 +350,11 @@ class EdgeWriter(
val property = pIter.next()
propertyList += "`" + property.getName() + "`"
}
val propetyGroupDf = edgeDfAndOffsetDf._1.select(propertyList.map(col): _*)
val propertyGroupDf = edgeDfAndOffsetDf._1.select(propertyList.map(col): _*)
val outputPrefix =
prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType)
FileSystem.writeDataFrame(
propetyGroupDf,
propertyGroupDf,
propertyGroup.getFile_type(),
outputPrefix,
None,
Expand Down

0 comments on commit 10bdb68

Please sign in to comment.