Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withSchedulerPool {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
Expand Down Expand Up @@ -210,7 +210,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}

private def execute(): Unit = {
private def execute(): Unit = withSchedulerPool {
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
Expand All @@ -225,10 +225,6 @@ private[hive] class SparkExecuteStatementOperation(
statementId,
parentSession.getUsername)
sqlContext.sparkContext.setJobGroup(statementId, statement)
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
// It may have unpredictably behavior since we use thread pools to execute quries and
// the 'spark.scheduler.pool' may not be 'default' when we did not set its value.(SPARK-26914)
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
try {
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
Expand Down Expand Up @@ -291,6 +287,20 @@ private[hive] class SparkExecuteStatementOperation(
sqlContext.sparkContext.cancelJobGroup(statementId)
}
}

private def withSchedulerPool[T](body: => T): T = {
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if (pool != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this looks like it undoes the previous fix, you're saying it will always be null when this method starts right? It seems easier to unconditionally set it, unless there's some overhead I'm missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because only the user submitted sql has set spark.sql.thriftserver.scheduler.pool=xxx, sessionToActivePool will put xxx value, will not be the default scheduler, so the default scheduler pool name is null

Unconditional setting it is worried about ambiguity, because only after the put value, there will be a need to take this value, although there is no overhead

case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
sessionToActivePool.put(parentSession.getSessionHandle, value)
logInfo(s"Setting ${SparkContext.SPARK_SCHEDULER_POOL}=$value for future statements " +
"in this session.")

sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
}
try {
body
} finally {
if (pool != null) {
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, null)
}
}
}
}

object SparkExecuteStatementOperation {
Expand Down