Skip to content
Permalink
Browse files
[Spark-Doris-Connector][Bug-Fix] Resolve deserialize exception when S…
…park Doris Connector in aync deserialize mode (#5336)

Resolve deserialize exception when Spark Doris Connector in aync deserialize mode 
Co-authored-by: lanhuajian <lanhuajian@sankuai.com>
  • Loading branch information
924060929 committed Mar 4, 2021
1 parent c1585ee commit 293bd8f2f270842a2014c857634038581fedc8b9
Showing 1 changed file with 34 additions and 5 deletions.
@@ -19,6 +19,7 @@ package org.apache.doris.spark.rdd

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent._
import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}

import scala.collection.JavaConversions._
import scala.util.Try
@@ -46,11 +47,14 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
protected val logger = Logger.getLogger(classOf[ScalaValueReader])

protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
protected val clientLock =
if (deserializeArrowToRowBatchAsync) new ReentrantLock()
else new NoOpLock
protected var offset = 0
protected var eos: AtomicBoolean = new AtomicBoolean(false)
protected var rowBatch: RowBatch = _
// flag indicate if support deserialize Arrow to RowBatch asynchronously
protected var deserializeArrowToRowBatchAsync: Boolean = Try {
protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean
} getOrElse {
logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC))
@@ -123,7 +127,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
params
}

protected val openResult: TScanOpenResult = client.openScanner(openParams)
protected val openResult: TScanOpenResult = lockClient(_.openScanner(openParams))
protected val contextId: String = openResult.getContext_id
protected val schema: Schema =
SchemaUtils.convertToSchema(openResult.getSelected_columns)
@@ -134,7 +138,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
nextBatchParams.setContext_id(contextId)
while (!eos.get) {
nextBatchParams.setOffset(offset)
val nextResult = client.getNext(nextBatchParams)
val nextResult = lockClient(_.getNext(nextBatchParams))
eos.set(nextResult.isEos)
if (!eos.get) {
val rowBatch = new RowBatch(nextResult, schema)
@@ -192,7 +196,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
val nextBatchParams = new TScanNextBatchParams
nextBatchParams.setContext_id(contextId)
nextBatchParams.setOffset(offset)
val nextResult = client.getNext(nextBatchParams)
val nextResult = lockClient(_.getNext(nextBatchParams))
eos.set(nextResult.isEos)
if (!eos.get) {
rowBatch = new RowBatch(nextResult, schema)
@@ -218,6 +222,31 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
def close(): Unit = {
val closeParams = new TScanCloseParams
closeParams.context_id = contextId
client.closeScanner(closeParams)
lockClient(_.closeScanner(closeParams))
}

private def lockClient[T](action: BackendClient => T): T = {
clientLock.lock()
try {
action(client)
} finally {
clientLock.unlock()
}
}

private class NoOpLock extends Lock {
override def lock(): Unit = {}

override def lockInterruptibly(): Unit = {}

override def tryLock(): Boolean = true

override def tryLock(time: Long, unit: TimeUnit): Boolean = true

override def unlock(): Unit = {}

override def newCondition(): Condition = {
throw new UnsupportedOperationException("NoOpLock can't provide a condition")
}
}
}

0 comments on commit 293bd8f

Please sign in to comment.