-
Notifications
You must be signed in to change notification settings - Fork 28k
/
SparkExecuteStatementOperation.scala
378 lines (352 loc) · 14.4 KB
/
SparkExecuteStatementOperation.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
import java.util.{Arrays, Map => JMap, UUID}
import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.Utils
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
import org.apache.spark.sql.execution.HiveResult.{getTimeFormatters, toHiveString, TimeFormatters}
import org.apache.spark.sql.execution.command.SetCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.{Utils => SparkUtils}
private[hive] class SparkExecuteStatementOperation(
val sqlContext: SQLContext,
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with SparkOperation
with Logging {
private var result: DataFrame = _
// We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST.
// This is only used when `spark.sql.thriftServer.incrementalCollect` is set to `false`.
// In case of `true`, this will be `None` and FETCH_FIRST will trigger re-execution.
private var resultList: Option[Array[SparkRow]] = _
private var previousFetchEndOffset: Long = 0
private var previousFetchStartOffset: Long = 0
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
private lazy val resultSchema: TableSchema = {
if (result == null || result.schema.isEmpty) {
new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
} else {
logInfo(s"Result Schema: ${result.schema}")
SparkExecuteStatementOperation.getTableSchema(result.schema)
}
}
def addNonNullColumnValue(
from: SparkRow,
to: ArrayBuffer[Any],
ordinal: Int,
timeFormatters: TimeFormatters): Unit = {
dataTypes(ordinal) match {
case StringType =>
to += from.getString(ordinal)
case IntegerType =>
to += from.getInt(ordinal)
case BooleanType =>
to += from.getBoolean(ordinal)
case DoubleType =>
to += from.getDouble(ordinal)
case FloatType =>
to += from.getFloat(ordinal)
case DecimalType() =>
to += from.getDecimal(ordinal)
case LongType =>
to += from.getLong(ordinal)
case ByteType =>
to += from.getByte(ordinal)
case ShortType =>
to += from.getShort(ordinal)
case BinaryType =>
to += from.getAs[Array[Byte]](ordinal)
// SPARK-31859, SPARK-31861: Date and Timestamp need to be turned to String here to:
// - respect spark.sql.session.timeZone
// - work with spark.sql.datetime.java8API.enabled
// These types have always been sent over the wire as string, converted later.
case _: DateType | _: TimestampType =>
to += toHiveString((from.get(ordinal), dataTypes(ordinal)), false, timeFormatters)
case CalendarIntervalType =>
to += toHiveString(
(from.getAs[CalendarInterval](ordinal), CalendarIntervalType),
false,
timeFormatters)
case _: ArrayType | _: StructType | _: MapType | _: UserDefinedType[_] =>
to += toHiveString((from.get(ordinal), dataTypes(ordinal)), false, timeFormatters)
}
}
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
s"with ${statementId}")
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
val resultRowSet: RowSet =
ThriftserverShimUtils.resultRowSet(getResultSetSchema, getProtocolVersion)
// Reset iter when FETCH_FIRST or FETCH_PRIOR
if ((order.equals(FetchOrientation.FETCH_FIRST) ||
order.equals(FetchOrientation.FETCH_PRIOR)) && previousFetchEndOffset != 0) {
// Reset the iterator to the beginning of the query.
iter = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
resultList = None
result.toLocalIterator.asScala
} else {
if (resultList.isEmpty) {
resultList = Some(result.collect())
}
resultList.get.iterator
}
}
var resultOffset = {
if (order.equals(FetchOrientation.FETCH_FIRST)) {
logInfo(s"FETCH_FIRST request with $statementId. Resetting to resultOffset=0")
0
} else if (order.equals(FetchOrientation.FETCH_PRIOR)) {
// TODO: FETCH_PRIOR should be handled more efficiently than rewinding to beginning and
// reiterating.
val targetOffset = math.max(previousFetchStartOffset - maxRowsL, 0)
logInfo(s"FETCH_PRIOR request with $statementId. Resetting to resultOffset=$targetOffset")
var off = 0
while (off < targetOffset && iter.hasNext) {
iter.next()
off += 1
}
off
} else { // FETCH_NEXT
previousFetchEndOffset
}
}
resultRowSet.setStartOffset(resultOffset)
previousFetchStartOffset = resultOffset
if (!iter.hasNext) {
resultRowSet
} else {
val timeFormatters = getTimeFormatters
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
var curRow = 0
while (curRow < maxRows && iter.hasNext) {
val sparkRow = iter.next()
val row = ArrayBuffer[Any]()
var curCol = 0
while (curCol < sparkRow.length) {
if (sparkRow.isNullAt(curCol)) {
row += null
} else {
addNonNullColumnValue(sparkRow, row, curCol, timeFormatters)
}
curCol += 1
}
resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
curRow += 1
resultOffset += 1
}
previousFetchEndOffset = resultOffset
log.info(s"Returning result set with ${curRow} rows from offsets " +
s"[$previousFetchStartOffset, $previousFetchEndOffset) with $statementId")
resultRowSet
}
}
def getResultSetSchema: TableSchema = resultSchema
override def runInternal(): Unit = {
setState(OperationState.PENDING)
logInfo(s"Submitting query '$statement' with $statementId")
HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
statement,
statementId,
parentSession.getUsername)
setHasResultSet(true) // avoid no resultset for async run
if (!runInBackground) {
execute()
} else {
val sparkServiceUGI = Utils.getUGI()
// Runnable impl to call runInternal asynchronously,
// from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
val doAsAction = new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
registerCurrentOperationLog()
try {
withLocalProperties {
execute()
}
} catch {
case e: HiveSQLException =>
setOperationException(e)
log.error("Error running hive query: ", e)
}
}
}
try {
sparkServiceUGI.doAs(doAsAction)
} catch {
case e: Exception =>
setOperationException(new HiveSQLException(e))
logError("Error running hive query as user : " +
sparkServiceUGI.getShortUserName(), e)
}
}
}
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
logError("Error submitting query in background, query rejected", rejected)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
}
}
private def execute(): Unit = {
try {
synchronized {
if (getStatus.getState.isTerminal) {
logInfo(s"Query with $statementId in terminal state before it started running")
return
} else {
logInfo(s"Running query with $statementId")
setState(OperationState.RUNNING)
}
}
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
// Always set the session state classloader to `executionHiveClassLoader` even for sync mode
if (!runInBackground) {
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
}
sqlContext.sparkContext.setJobGroup(statementId, statement)
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
HiveThriftServer2.eventManager.onStatementParsed(statementId,
result.queryExecution.toString())
iter = {
if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
resultList = None
result.toLocalIterator.asScala
} else {
resultList = Some(result.collect())
resultList.get.iterator
}
}
dataTypes = result.schema.fields.map(_.dataType)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
// When cancel() or close() is called very quickly after the query is started,
// then they may both call cleanup() before Spark Jobs are started. But before background
// task interrupted, it may have start some spark job, so we need to cancel again to
// make sure job was cancelled when background thread was interrupted
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
val currentState = getStatus().getState()
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error running query: " + root.toString, root)
}
}
} finally {
synchronized {
if (!getStatus.getState.isTerminal) {
setState(OperationState.FINISHED)
HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
sqlContext.sparkContext.clearJobGroup()
}
}
override def cancel(): Unit = {
synchronized {
if (!getStatus.getState.isTerminal) {
logInfo(s"Cancel query with $statementId")
cleanup()
setState(OperationState.CANCELED)
HiveThriftServer2.eventManager.onStatementCanceled(statementId)
}
}
}
override protected def cleanup(): Unit = {
if (runInBackground) {
val backgroundHandle = getBackgroundHandle()
if (backgroundHandle != null) {
backgroundHandle.cancel(true)
}
}
// RDDs will be cleaned automatically upon garbage collection.
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
}
}
object SparkExecuteStatementOperation {
def getTableSchema(structType: StructType): TableSchema = {
val schema = structType.map { field =>
val attrTypeString = field.dataType match {
case NullType => "void"
case CalendarIntervalType => StringType.catalogString
case other => other.catalogString
}
new FieldSchema(field.name, attrTypeString, field.getComment.getOrElse(""))
}
new TableSchema(schema.asJava)
}
}