forked from vesoft-inc/nebula-exchange
-
Notifications
You must be signed in to change notification settings - Fork 0
/
VerticesProcessor.scala
293 lines (264 loc) · 10.9 KB
/
VerticesProcessor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
package com.vesoft.nebula.exchange.processor
import java.nio.{ByteBuffer, ByteOrder}
import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType}
import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices}
import com.vesoft.exchange.common.config.{
Configs,
FileBaseSinkConfigEntry,
SinkCategory,
StreamingDataSourceConfigEntry,
TagConfigEntry
}
import com.vesoft.exchange.common.processor.Processor
import com.vesoft.exchange.common.utils.NebulaUtils
import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.exchange.common.writer.{GenerateSstFile, NebulaGraphClientWriter, NebulaSSTWriter}
import com.vesoft.exchange.common.VidType
import com.vesoft.nebula.encoder.NebulaCodecImpl
import com.vesoft.nebula.exchange.TooManyErrorsException
import com.vesoft.nebula.meta.TagItem
import org.apache.commons.codec.digest.MurmurHash2
import org.apache.log4j.Logger
import org.apache.spark.TaskContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession}
import org.apache.spark.util.LongAccumulator
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
/**
*
* @param data
* @param tagConfig
* @param fieldKeys
* @param nebulaKeys
* @param config
* @param batchSuccess
* @param batchFailure
*/
class VerticesProcessor(spark: SparkSession,
data: DataFrame,
tagConfig: TagConfigEntry,
fieldKeys: List[String],
nebulaKeys: List[String],
config: Configs,
batchSuccess: LongAccumulator,
batchFailure: LongAccumulator)
extends Processor {
@transient
private[this] lazy val LOG = Logger.getLogger(this.getClass)
private def processEachPartition(iterator: Iterator[Vertex]): Unit = {
val graphProvider =
new GraphProvider(config.databaseConfig.getGraphAddress,
config.connectionConfig.timeout,
config.sslConfig)
val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
config.rateConfig,
tagConfig,
graphProvider)
val errorBuffer = ArrayBuffer[String]()
writer.prepare()
// batch write tags
val startTime = System.currentTimeMillis
iterator.grouped(tagConfig.batch).foreach { vertex =>
val vertices = Vertices(nebulaKeys, vertex.toList, tagConfig.vertexPolicy)
val failStatement = writer.writeVertices(vertices)
if (failStatement == null) {
batchSuccess.add(1)
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${tagConfig.name}.${TaskContext.getPartitionId()}")
errorBuffer.clear()
}
LOG.info(s"tag ${tagConfig.name} import in spark partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms")
writer.close()
graphProvider.close()
}
override def process(): Unit = {
val address = config.databaseConfig.getMetaAddress
val space = config.databaseConfig.space
val timeout = config.connectionConfig.timeout
val retry = config.connectionConfig.retry
val metaProvider = new MetaProvider(address, timeout, retry, config.sslConfig)
val fieldTypeMap = NebulaUtils.getDataSourceFieldType(tagConfig, space, metaProvider)
val isVidStringType = metaProvider.getVidType(space) == VidType.STRING
val partitionNum = metaProvider.getPartNumber(space)
if (tagConfig.dataSinkConfigEntry.category == SinkCategory.SST) {
val fileBaseConfig = tagConfig.dataSinkConfigEntry.asInstanceOf[FileBaseSinkConfigEntry]
val namenode = fileBaseConfig.fsName.orNull
val tagName = tagConfig.name
val vidType = metaProvider.getVidType(space)
val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()
var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
sstKeyValueData = customRepartition(spark, sstKeyValueData, partitionNum)
}
sstKeyValueData
.toDF("key", "value")
.sortWithinPartitions("key")
.foreachPartition { iterator: Iterator[Row] =>
val generateSstFile = new GenerateSstFile
generateSstFile.writeSstFiles(iterator,
fileBaseConfig,
partitionNum,
namenode,
batchFailure)
}
} else {
val streamFlag = data.isStreaming
val vertices = data
.filter { row =>
isVertexValid(row, tagConfig, streamFlag, isVidStringType)
}
.map { row =>
convertToVertex(row, tagConfig, isVidStringType, fieldKeys, fieldTypeMap)
}(Encoders.kryo[Vertex])
// streaming write
if (streamFlag) {
val streamingDataSourceConfig =
tagConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry]
val wStream = vertices.writeStream
if (tagConfig.checkPointPath.isDefined)
wStream.option("checkpointLocation", tagConfig.checkPointPath.get)
wStream
.foreachBatch((vertexSet: Dataset[Vertex], batchId: Long) => {
LOG.info(s"${tagConfig.name} tag start batch ${batchId}.")
vertexSet.foreachPartition(processEachPartition _)
})
.trigger(Trigger.ProcessingTime(s"${streamingDataSourceConfig.intervalSeconds} seconds"))
.start()
.awaitTermination()
} else
vertices.foreachPartition(processEachPartition _)
}
}
/**
* filter and check row data for vertex, if streaming only print log
* for not streaming datasource, if the vertex data is invalid, throw AssertException.
*/
def isVertexValid(row: Row,
tagConfig: TagConfigEntry,
streamFlag: Boolean,
isVidStringType: Boolean): Boolean = {
val index = row.schema.fieldIndex(tagConfig.vertexField)
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"vertexId must exist and cannot be null, your row data is $row")
return false
}
val vertexId = row.get(index).toString
// process int type vid
if (tagConfig.vertexPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(vertexId)) {
printChoice(
streamFlag,
s"space vidType is int, but your vertex id $vertexId is not numeric.your row data is $row")
return false
}
// process string type vid
if (tagConfig.vertexPolicy.isDefined && isVidStringType) {
printChoice(
streamFlag,
s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row")
return false
}
true
}
/**
* Convert row data to {@link Vertex}
*/
def convertToVertex(row: Row,
tagConfig: TagConfigEntry,
isVidStringType: Boolean,
fieldKeys: List[String],
fieldTypeMap: Map[String, Int]): Vertex = {
val index = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId = row.get(index).toString
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
if (tagConfig.vertexPolicy.isEmpty && isVidStringType) {
vertexId = NebulaUtils.escapeUtil(vertexId).mkString("\"", "", "\"")
}
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield extraValueForClient(row, property, fieldTypeMap)
Vertex(vertexId, values)
}
/**
* encode vertex
*/
def encodeVertex(row: Row,
partitionNum: Int,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)
val index: Int = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId: String = row.get(index).toString
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
if (tagConfig.vertexPolicy.isDefined) {
tagConfig.vertexPolicy.get match {
case KeyPolicy.HASH =>
vertexId = MurmurHash2
.hash64(vertexId.getBytes(), vertexId.getBytes().length, 0xc70f6907)
.toString
case KeyPolicy.UUID =>
throw new UnsupportedOperationException("do not support uuid yet")
case _ =>
throw new IllegalArgumentException(s"policy ${tagConfig.vertexPolicy.get} is invalidate")
}
}
val partitionId = NebulaUtils.getPartitionId(vertexId, partitionNum, vidType)
import java.nio.ByteBuffer
val vidBytes = if (vidType == VidType.INT) {
ByteBuffer
.allocate(8)
.order(ByteOrder.nativeOrder)
.putLong(vertexId.toLong)
.array
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(orphanVertexKey, vertexKey, vertexValue)
}
}