Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Commit

Permalink
handle task cancellation before druid query is issued
Browse files Browse the repository at this point in the history
  • Loading branch information
Harish Butani committed Sep 9, 2016
1 parent ed5b133 commit 9f9ae5b
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions src/main/scala/org/sparklinedata/druid/DruidRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,24 @@ class DruidRDD(sqlContext: SQLContext,
* the ''Spark Executor'', so it is safe to return an empty iterator.
* 4. Always clear this Druid Query from the TaskCancelHandler
*/
var cancelCallback : TaskCancelHandler.TaskCancelHolder = null
var dr: CloseableIterator[ResultRow] = null
var client : DruidQueryServerClient = null
val qryId = mQry.context.map(_.queryId).getOrElse(s"q-${System.nanoTime()}")
val cancelCallback = TaskCancelHandler.registerQueryId(qryId, context)
val client = p.queryClient(useSmile, httpMaxPerRoute, httpMaxTotal)
client.setCancellableHolder(cancelCallback)

val qrySTime = System.currentTimeMillis()
val qrySTimeStr = s"${new java.util.Date()}"
var dr : CloseableIterator[ResultRow] = null
var qrySTime = System.currentTimeMillis()
var qrySTimeStr = s"${new java.util.Date()}"
try {
cancelCallback = TaskCancelHandler.registerQueryId(qryId, context)
client = p.queryClient(useSmile, httpMaxPerRoute, httpMaxTotal)
client.setCancellableHolder(cancelCallback)
qrySTime = System.currentTimeMillis()
qrySTimeStr = s"${new java.util.Date()}"
dr = mQry.executeQuery(client)
} catch {
case _ if cancelCallback.wasCancelTriggered => { dr = new DummyResultIterator() }
case e : Throwable => throw e
case _ if cancelCallback.wasCancelTriggered && client != null => {
dr = new DummyResultIterator()
}
case e: Throwable => throw e
}
finally {
TaskCancelHandler.clearQueryId(qryId)
Expand Down

0 comments on commit 9f9ae5b

Please sign in to comment.