-
Notifications
You must be signed in to change notification settings - Fork 28k
/
WriteToDataSourceV2Exec.scala
515 lines (459 loc) · 18.6 KB
/
WriteToDataSourceV2Exec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import java.util.UUID
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, SupportsWrite, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, LogicalWriteInfoImpl, PhysicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LongAccumulator, Utils}
/**
* Deprecated logical plan for writing data into data source v2. This is being replaced by more
* specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]].
*/
@deprecated("Use specific logical plans like AppendData instead", "2.4.0")
case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(query)
override def output: Seq[Attribute] = Nil
}
/**
* Physical plan node for v2 create table as select when the catalog does not support staging
* the table creation.
*
* A new table will be created using the schema of the query, and rows from the query are appended.
* If either table creation or the append fails, the table will be deleted. This implementation is
* not atomic; for an atomic variant for catalogs that support the appropriate features, see
* CreateTableAsSelectStagingExec.
*/
case class CreateTableAsSelectExec(
catalog: TableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
ifNotExists: Boolean) extends TableWriteExecHelper {
override protected def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
if (ifNotExists) {
return Nil
}
throw new TableAlreadyExistsException(ident)
}
val table = catalog.createTable(ident, query.schema.asNullable,
partitioning.toArray, properties.asJava)
writeToTable(catalog, table, writeOptions, ident)
}
}
/**
* Physical plan node for v2 create table as select, when the catalog is determined to support
* staging table creation.
*
* A new table will be created using the schema of the query, and rows from the query are appended.
* The CTAS operation is atomic. The creation of the table is staged and the commit of the write
* should bundle the commitment of the metadata and the table contents in a single unit. If the
* write fails, the table is instructed to roll back all staged changes.
*/
case class AtomicCreateTableAsSelectExec(
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
ifNotExists: Boolean) extends TableWriteExecHelper {
override protected def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
if (ifNotExists) {
return Nil
}
throw new TableAlreadyExistsException(ident)
}
val stagedTable = catalog.stageCreate(
ident, query.schema.asNullable, partitioning.toArray, properties.asJava)
writeToTable(catalog, stagedTable, writeOptions, ident)
}
}
/**
* Physical plan node for v2 replace table as select when the catalog does not support staging
* table replacement.
*
* A new table will be created using the schema of the query, and rows from the query are appended.
* If the table exists, its contents and schema should be replaced with the schema and the contents
* of the query. This is a non-atomic implementation that drops the table and then runs non-atomic
* CTAS. For an atomic implementation for catalogs with the appropriate support, see
* ReplaceTableAsSelectStagingExec.
*/
case class ReplaceTableAsSelectExec(
session: SparkSession,
catalog: TableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
orCreate: Boolean) extends TableWriteExecHelper {
override protected def run(): Seq[InternalRow] = {
// Note that this operation is potentially unsafe, but these are the strict semantics of
// RTAS if the catalog does not support atomic operations.
//
// There are numerous cases we concede to where the table will be dropped and irrecoverable:
//
// 1. Creating the new table fails,
// 2. Writing to the new table fails,
// 3. The table returned by catalog.createTable doesn't support writing.
if (catalog.tableExists(ident)) {
val table = catalog.loadTable(ident)
uncacheTable(session, catalog, table, ident)
catalog.dropTable(ident)
} else if (!orCreate) {
throw new CannotReplaceMissingTableException(ident)
}
val table = catalog.createTable(
ident, query.schema.asNullable, partitioning.toArray, properties.asJava)
writeToTable(catalog, table, writeOptions, ident)
}
}
/**
*
* Physical plan node for v2 replace table as select when the catalog supports staging
* table replacement.
*
* A new table will be created using the schema of the query, and rows from the query are appended.
* If the table exists, its contents and schema should be replaced with the schema and the contents
* of the query. This implementation is atomic. The table replacement is staged, and the commit
* operation at the end should perform the replacement of the table's metadata and contents. If the
* write fails, the table is instructed to roll back staged changes and any previously written table
* is left untouched.
*/
case class AtomicReplaceTableAsSelectExec(
session: SparkSession,
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
orCreate: Boolean) extends TableWriteExecHelper {
override protected def run(): Seq[InternalRow] = {
val schema = query.schema.asNullable
if (catalog.tableExists(ident)) {
val table = catalog.loadTable(ident)
uncacheTable(session, catalog, table, ident)
}
val staged = if (orCreate) {
catalog.stageCreateOrReplace(
ident, schema, partitioning.toArray, properties.asJava)
} else if (catalog.tableExists(ident)) {
try {
catalog.stageReplace(
ident, schema, partitioning.toArray, properties.asJava)
} catch {
case e: NoSuchTableException =>
throw new CannotReplaceMissingTableException(ident, Some(e))
}
} else {
throw new CannotReplaceMissingTableException(ident)
}
writeToTable(catalog, staged, writeOptions, ident)
}
}
/**
* Physical plan node for append into a v2 table.
*
* Rows in the output data set are appended.
*/
case class AppendDataExec(
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan,
refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper {
override protected def run(): Seq[InternalRow] = {
val writtenRows = writeWithV2(newWriteBuilder().buildForBatch())
refreshCache()
writtenRows
}
}
/**
* Physical plan node for overwrite into a v2 table.
*
* Overwrites data in a table matched by a set of filters. Rows matching all of the filters will be
* deleted and rows in the output data set are appended.
*
* This plan is used to implement SaveMode.Overwrite. The behavior of SaveMode.Overwrite is to
* truncate the table -- delete all rows -- and append the output data set. This uses the filter
* AlwaysTrue to delete all rows.
*/
case class OverwriteByExpressionExec(
table: SupportsWrite,
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan,
refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper {
private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
}
override protected def run(): Seq[InternalRow] = {
val writtenRows = newWriteBuilder() match {
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
writeWithV2(builder.truncate().buildForBatch())
case builder: SupportsOverwrite =>
writeWithV2(builder.overwrite(deleteWhere).buildForBatch())
case _ =>
throw new SparkException(s"Table does not support overwrite by expression: $table")
}
refreshCache()
writtenRows
}
}
/**
* Physical plan node for dynamic partition overwrite into a v2 table.
*
* Dynamic partition overwrite is the behavior of Hive INSERT OVERWRITE ... PARTITION queries, and
* Spark INSERT OVERWRITE queries when spark.sql.sources.partitionOverwriteMode=dynamic. Each
* partition in the output data set replaces the corresponding existing partition in the table or
* creates a new partition. Existing partitions for which there is no data in the output data set
* are not modified.
*/
case class OverwritePartitionsDynamicExec(
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan,
refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper {
override protected def run(): Seq[InternalRow] = {
val writtenRows = newWriteBuilder() match {
case builder: SupportsDynamicOverwrite =>
writeWithV2(builder.overwriteDynamicPartitions().buildForBatch())
case _ =>
throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
}
refreshCache()
writtenRows
}
}
case class WriteToDataSourceV2Exec(
batchWrite: BatchWrite,
query: SparkPlan) extends V2TableWriteExec {
def writeOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()
override protected def run(): Seq[InternalRow] = {
writeWithV2(batchWrite)
}
}
/**
* Helper for physical plans that build batch writes.
*/
trait BatchWriteHelper {
def table: SupportsWrite
def query: SparkPlan
def writeOptions: CaseInsensitiveStringMap
def newWriteBuilder(): WriteBuilder = {
val info = LogicalWriteInfoImpl(
queryId = UUID.randomUUID().toString,
query.schema,
writeOptions)
table.newWriteBuilder(info)
}
}
/**
* The base physical plan for writing data into data source v2.
*/
trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
def query: SparkPlan
var commitProgress: Option[StreamWriterCommitProgress] = None
override def child: SparkPlan = query
override def output: Seq[Attribute] = Nil
protected def writeWithV2(batchWrite: BatchWrite): Seq[InternalRow] = {
val rdd: RDD[InternalRow] = {
val tempRdd = query.execute()
// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
// partition rdd to make sure we at least set up one write task to write the metadata.
if (tempRdd.partitions.length == 0) {
sparkContext.parallelize(Array.empty[InternalRow], 1)
} else {
tempRdd
}
}
val writerFactory = batchWrite.createBatchWriterFactory(
PhysicalWriteInfoImpl(rdd.getNumPartitions))
val useCommitCoordinator = batchWrite.useCommitCoordinator
val messages = new Array[WriterCommitMessage](rdd.partitions.length)
val totalNumRowsAccumulator = new LongAccumulator()
logInfo(s"Start processing data source write support: $batchWrite. " +
s"The input RDD has ${messages.length} partitions.")
try {
sparkContext.runJob(
rdd,
(context: TaskContext, iter: Iterator[InternalRow]) =>
DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator),
rdd.partitions.indices,
(index, result: DataWritingSparkTaskResult) => {
val commitMessage = result.writerCommitMessage
messages(index) = commitMessage
totalNumRowsAccumulator.add(result.numRows)
batchWrite.onDataWriterCommit(commitMessage)
}
)
logInfo(s"Data source write support $batchWrite is committing.")
batchWrite.commit(messages)
logInfo(s"Data source write support $batchWrite committed.")
commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
} catch {
case cause: Throwable =>
logError(s"Data source write support $batchWrite is aborting.")
try {
batchWrite.abort(messages)
} catch {
case t: Throwable =>
logError(s"Data source write support $batchWrite failed to abort.")
cause.addSuppressed(t)
throw new SparkException("Writing job failed.", cause)
}
logError(s"Data source write support $batchWrite aborted.")
cause match {
// Only wrap non fatal exceptions.
case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
case _ => throw cause
}
}
Nil
}
protected def uncacheTable(
session: SparkSession,
catalog: TableCatalog,
table: Table,
ident: Identifier): Unit = {
val plan = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
session.sharedState.cacheManager.uncacheQuery(session, plan, cascade = true)
}
}
object DataWritingSparkTask extends Logging {
def run(
writerFactory: DataWriterFactory,
context: TaskContext,
iter: Iterator[InternalRow],
useCommitCoordinator: Boolean): DataWritingSparkTaskResult = {
val stageId = context.stageId()
val stageAttempt = context.stageAttemptNumber()
val partId = context.partitionId()
val taskId = context.taskAttemptId()
val attemptId = context.attemptNumber()
val dataWriter = writerFactory.createWriter(partId, taskId)
var count = 0L
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
while (iter.hasNext) {
// Count is here.
count += 1
dataWriter.write(iter.next())
}
val msg = if (useCommitCoordinator) {
val coordinator = SparkEnv.get.outputCommitCoordinator
val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, attemptId)
if (commitAuthorized) {
logInfo(s"Commit authorized for partition $partId (task $taskId, attempt $attemptId, " +
s"stage $stageId.$stageAttempt)")
dataWriter.commit()
} else {
val message = s"Commit denied for partition $partId (task $taskId, attempt $attemptId, " +
s"stage $stageId.$stageAttempt)"
logInfo(message)
// throwing CommitDeniedException will trigger the catch block for abort
throw new CommitDeniedException(message, stageId, partId, attemptId)
}
} else {
logInfo(s"Writer for partition ${context.partitionId()} is committing.")
dataWriter.commit()
}
logInfo(s"Committed partition $partId (task $taskId, attempt $attemptId, " +
s"stage $stageId.$stageAttempt)")
DataWritingSparkTaskResult(count, msg)
})(catchBlock = {
// If there is an error, abort this writer
logError(s"Aborting commit for partition $partId (task $taskId, attempt $attemptId, " +
s"stage $stageId.$stageAttempt)")
dataWriter.abort()
logError(s"Aborted commit for partition $partId (task $taskId, attempt $attemptId, " +
s"stage $stageId.$stageAttempt)")
}, finallyBlock = {
dataWriter.close()
})
}
}
private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
protected def writeToTable(
catalog: TableCatalog,
table: Table,
writeOptions: CaseInsensitiveStringMap,
ident: Identifier): Seq[InternalRow] = {
Utils.tryWithSafeFinallyAndFailureCallbacks({
table match {
case table: SupportsWrite =>
val info = LogicalWriteInfoImpl(
queryId = UUID.randomUUID().toString,
query.schema,
writeOptions)
val writeBuilder = table.newWriteBuilder(info)
val writtenRows = writeBuilder match {
case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write())
case v2 => writeWithV2(v2.buildForBatch())
}
table match {
case st: StagedTable => st.commitStagedChanges()
case _ =>
}
writtenRows
case _ =>
// Table does not support writes - staged changes are also rolled back below if table
// is staging.
throw new SparkException(
s"Table implementation does not support writes: ${ident.quoted}")
}
})(catchBlock = {
table match {
// Failure rolls back the staged writes and metadata changes.
case st: StagedTable => st.abortStagedChanges()
case _ => catalog.dropTable(ident)
}
})
}
}
private[v2] case class DataWritingSparkTaskResult(
numRows: Long,
writerCommitMessage: WriterCommitMessage)
/**
* Sink progress information collected after commit.
*/
private[sql] case class StreamWriterCommitProgress(numOutputRows: Long)