/
FlinkSink.java
654 lines (589 loc) · 26.8 KB
/
FlinkSink.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlinkSink {
private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
private static final String ICEBERG_STREAM_WRITER_NAME =
IcebergStreamWriter.class.getSimpleName();
private static final String ICEBERG_FILES_COMMITTER_NAME =
IcebergFilesCommitter.class.getSimpleName();
private FlinkSink() {}
/**
* Initialize a {@link Builder} to export the data from generic input data stream into iceberg
* table. We use {@link RowData} inside the sink connector, so users need to provide a mapper
* function and a {@link TypeInformation} to convert those generic records to a RowData
* DataStream.
*
* @param input the generic source input data stream.
* @param mapper function to convert the generic data to {@link RowData}
* @param outputType to define the {@link TypeInformation} for the input data.
* @param <T> the data type of records.
* @return {@link Builder} to connect the iceberg table.
*/
public static <T> Builder builderFor(
DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) {
return new Builder().forMapperOutputType(input, mapper, outputType);
}
/**
* Initialize a {@link Builder} to export the data from input data stream with {@link Row}s into
* iceberg table. We use {@link RowData} inside the sink connector, so users need to provide a
* {@link TableSchema} for builder to convert those {@link Row}s to a {@link RowData} DataStream.
*
* @param input the source input data stream with {@link Row}s.
* @param tableSchema defines the {@link TypeInformation} for input data.
* @return {@link Builder} to connect the iceberg table.
*/
public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
DataFormatConverters.RowConverter rowConverter =
new DataFormatConverters.RowConverter(fieldDataTypes);
return builderFor(input, rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType))
.tableSchema(tableSchema);
}
/**
* Initialize a {@link Builder} to export the data from input data stream with {@link RowData}s
* into iceberg table.
*
* @param input the source input data stream with {@link RowData}s.
* @return {@link Builder} to connect the iceberg table.
*/
public static Builder forRowData(DataStream<RowData> input) {
return new Builder().forRowData(input);
}
public static class Builder {
private Function<String, DataStream<RowData>> inputCreator = null;
private TableLoader tableLoader;
private Table table;
private TableSchema tableSchema;
private List<String> equalityFieldColumns = null;
private String uidPrefix = null;
private final Map<String, String> snapshotProperties = Maps.newHashMap();
private ReadableConfig readableConfig = new Configuration();
private final Map<String, String> writeOptions = Maps.newHashMap();
private FlinkWriteConf flinkWriteConf = null;
private Builder() {}
private Builder forRowData(DataStream<RowData> newRowDataInput) {
this.inputCreator = ignored -> newRowDataInput;
return this;
}
private <T> Builder forMapperOutputType(
DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) {
this.inputCreator =
newUidPrefix -> {
// Input stream order is crucial for some situation(e.g. in cdc case). Therefore, we
// need to set the parallelism
// of map operator same as its input to keep map operator chaining its input, and avoid
// rebalanced by default.
SingleOutputStreamOperator<RowData> inputStream =
input.map(mapper, outputType).setParallelism(input.getParallelism());
if (newUidPrefix != null) {
inputStream.name(operatorName(newUidPrefix)).uid(newUidPrefix + "-mapper");
}
return inputStream;
};
return this;
}
/**
* This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter}
* which will write all the records into {@link DataFile}s and emit them to downstream operator.
* Providing a table would avoid so many table loading from each separate task.
*
* @param newTable the loaded iceberg table instance.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder table(Table newTable) {
this.table = newTable;
return this;
}
/**
* The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need
* this loader because {@link Table} is not serializable and could not just use the loaded table
* from Builder#table in the remote task manager.
*
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
}
/**
* Set the write properties for Flink sink. View the supported properties in {@link
* FlinkWriteOptions}
*/
public Builder set(String property, String value) {
writeOptions.put(property, value);
return this;
}
/**
* Set the write properties for Flink sink. View the supported properties in {@link
* FlinkWriteOptions}
*/
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
}
/**
* Configure the write {@link DistributionMode} that the flink sink will use. Currently, flink
* support {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
*
* @param mode to specify the write distribution mode.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder distributionMode(DistributionMode mode) {
Preconditions.checkArgument(
!DistributionMode.RANGE.equals(mode),
"Flink does not support 'range' write distribution mode now.");
if (mode != null) {
writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
}
return this;
}
/**
* Configuring the write parallel number for iceberg stream writer.
*
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
return this;
}
/**
* All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which
* means it will DELETE the old records and then INSERT the new records. In partitioned table,
* the partition fields should be a subset of equality fields, otherwise the old row that
* located in partition-A could not be deleted by the new row that located in partition-B.
*
* @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder upsert(boolean enabled) {
writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
return this;
}
/**
* Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
*
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
}
/**
* Set the uid prefix for FlinkSink operators. Note that FlinkSink internally consists of
* multiple operators (like writer, committer, dummy sink etc.) Actually operator uid will be
* appended with a suffix like "uidPrefix-writer". <br>
* <br>
* If provided, this prefix is also applied to operator names. <br>
* <br>
* Flink auto generates operator uid if not set explicitly. It is a recommended <a
* href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/">
* best-practice to set uid for all operators</a> before deploying to production. Flink has an
* option to {@code pipeline.auto-generate-uid=false} to disable auto-generation and force
* explicit setting of all operator uid. <br>
* <br>
* Be careful with setting this for an existing job, because now we are changing the operator
* uid from an auto-generated one to this new value. When deploying the change with a
* checkpoint, Flink won't be able to restore the previous Flink sink operator state (more
* specifically the committer operator state). You need to use {@code --allowNonRestoredState}
* to ignore the previous sink state. During restore Flink sink state is used to check if last
* commit was actually successful or not. {@code --allowNonRestoredState} can lead to data loss
* if the Iceberg commit failed in the last completed checkpoint.
*
* @param newPrefix prefix for Flink sink operator uid and name
* @return {@link Builder} to connect the iceberg table.
*/
public Builder uidPrefix(String newPrefix) {
this.uidPrefix = newPrefix;
return this;
}
public Builder setSnapshotProperties(Map<String, String> properties) {
snapshotProperties.putAll(properties);
return this;
}
public Builder setSnapshotProperty(String property, String value) {
snapshotProperties.put(property, value);
return this;
}
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
}
private <T> DataStreamSink<T> chainIcebergOperators() {
Preconditions.checkArgument(
inputCreator != null,
"Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
if (table == null) {
if (!tableLoader.isOpen()) {
tableLoader.open();
}
try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
throw new UncheckedIOException(
"Failed to load iceberg table from table loader: " + tableLoader, e);
}
}
flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);
// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
// Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
// Distribute the records from input data stream based on the write.distribution-mode and
// equality fields.
DataStream<RowData> distributeStream =
distributeDataStream(
rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
// Add parallel writers that append rows to files
SingleOutputStreamOperator<WriteResult> writerStream =
appendWriter(distributeStream, flinkRowType, equalityFieldIds);
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
// Add dummy discard sink
return appendDummySink(committerStream);
}
/**
* Append the iceberg sink operators to write records to iceberg table.
*
* @return {@link DataStreamSink} for sink.
*/
public DataStreamSink<Void> append() {
return chainIcebergOperators();
}
private String operatorName(String suffix) {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}
@VisibleForTesting
List<Integer> checkAndGetEqualityFieldIds() {
List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds());
if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
Set<Integer> equalityFieldSet =
Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
for (String column : equalityFieldColumns) {
org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
Preconditions.checkNotNull(
field,
"Missing required equality field column '%s' in table schema %s",
column,
table.schema());
equalityFieldSet.add(field.fieldId());
}
if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) {
LOG.warn(
"The configured equality field column IDs {} are not matched with the schema identifier field IDs"
+ " {}, use job specified equality field columns as the equality fields by default.",
equalityFieldSet,
table.schema().identifierFieldIds());
}
equalityFieldIds = Lists.newArrayList(equalityFieldSet);
}
return equalityFieldIds;
}
@SuppressWarnings("unchecked")
private <T> DataStreamSink<T> appendDummySink(
SingleOutputStreamOperator<Void> committerStream) {
DataStreamSink<T> resultStream =
committerStream
.addSink(new DiscardingSink())
.name(operatorName(String.format("IcebergSink %s", this.table.name())))
.setParallelism(1);
if (uidPrefix != null) {
resultStream = resultStream.uid(uidPrefix + "-dummysink");
}
return resultStream;
}
private SingleOutputStreamOperator<Void> appendCommitter(
SingleOutputStreamOperator<WriteResult> writerStream) {
IcebergFilesCommitter filesCommitter =
new IcebergFilesCommitter(
tableLoader,
flinkWriteConf.overwriteMode(),
snapshotProperties,
flinkWriteConf.workerPoolSize(),
flinkWriteConf.branch(),
table.spec());
SingleOutputStreamOperator<Void> committerStream =
writerStream
.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
.setParallelism(1)
.setMaxParallelism(1);
if (uidPrefix != null) {
committerStream = committerStream.uid(uidPrefix + "-committer");
}
return committerStream;
}
private SingleOutputStreamOperator<WriteResult> appendWriter(
DataStream<RowData> input, RowType flinkRowType, List<Integer> equalityFieldIds) {
// Validate the equality fields and partition fields if we enable the upsert mode.
if (flinkWriteConf.upsertMode()) {
Preconditions.checkState(
!flinkWriteConf.overwriteMode(),
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
Preconditions.checkState(
!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField : table.spec().fields()) {
Preconditions.checkState(
equalityFieldIds.contains(partitionField.sourceId()),
"In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
partitionField,
equalityFieldColumns);
}
}
}
SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
Duration tableRefreshInterval = flinkWriteConf.tableRefreshInterval();
SerializableSupplier<Table> tableSupplier;
if (tableRefreshInterval != null) {
tableSupplier =
new CachingTableSupplier(serializableTable, tableLoader, tableRefreshInterval);
} else {
tableSupplier = () -> serializableTable;
}
IcebergStreamWriter<RowData> streamWriter =
createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds);
int parallelism =
flinkWriteConf.writeParallelism() == null
? input.getParallelism()
: flinkWriteConf.writeParallelism();
SingleOutputStreamOperator<WriteResult> writerStream =
input
.transform(
operatorName(ICEBERG_STREAM_WRITER_NAME),
TypeInformation.of(WriteResult.class),
streamWriter)
.setParallelism(parallelism);
if (uidPrefix != null) {
writerStream = writerStream.uid(uidPrefix + "-writer");
}
return writerStream;
}
private DataStream<RowData> distributeDataStream(
DataStream<RowData> input,
List<Integer> equalityFieldIds,
PartitionSpec partitionSpec,
Schema iSchema,
RowType flinkRowType) {
DistributionMode writeMode = flinkWriteConf.distributionMode();
LOG.info("Write distribution mode is '{}'", writeMode.modeName());
switch (writeMode) {
case NONE:
if (equalityFieldIds.isEmpty()) {
return input;
} else {
LOG.info("Distribute rows by equality fields, because there are equality fields set");
return input.keyBy(
new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
}
case HASH:
if (equalityFieldIds.isEmpty()) {
if (partitionSpec.isUnpartitioned()) {
LOG.warn(
"Fallback to use 'none' distribution mode, because there are no equality fields set "
+ "and table is unpartitioned");
return input;
} else {
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
} else {
if (partitionSpec.isUnpartitioned()) {
LOG.info(
"Distribute rows by equality fields, because there are equality fields set "
+ "and table is unpartitioned");
return input.keyBy(
new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
} else {
for (PartitionField partitionField : partitionSpec.fields()) {
Preconditions.checkState(
equalityFieldIds.contains(partitionField.sourceId()),
"In 'hash' distribution mode with equality fields set, partition field '%s' "
+ "should be included in equality fields: '%s'",
partitionField,
equalityFieldColumns);
}
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
}
case RANGE:
if (equalityFieldIds.isEmpty()) {
LOG.warn(
"Fallback to use 'none' distribution mode, because there are no equality fields set "
+ "and {}=range is not supported yet in flink",
WRITE_DISTRIBUTION_MODE);
return input;
} else {
LOG.info(
"Distribute rows by equality fields, because there are equality fields set "
+ "and{}=range is not supported yet in flink",
WRITE_DISTRIBUTION_MODE);
return input.keyBy(
new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
}
default:
throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
}
}
}
static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
if (requestedSchema != null) {
// Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing
// iceberg schema.
Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
// We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will
// be promoted to
// iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1
// 'byte'), we will
// read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here
// we must use flink
// schema.
return (RowType) requestedSchema.toRowDataType().getLogicalType();
} else {
return FlinkSchemaUtil.convert(schema);
}
}
static IcebergStreamWriter<RowData> createStreamWriter(
SerializableSupplier<Table> tableSupplier,
FlinkWriteConf flinkWriteConf,
RowType flinkRowType,
List<Integer> equalityFieldIds) {
Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null");
Table initTable = tableSupplier.get();
FileFormat format = flinkWriteConf.dataFileFormat();
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
tableSupplier,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
format,
writeProperties(initTable, format, flinkWriteConf),
equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory);
}
/**
* Based on the {@link FileFormat} overwrites the table level compression properties for the table
* write.
*
* @param table The table to get the table level settings
* @param format The FileFormat to use
* @param conf The write configuration
* @return The properties to use for writing
*/
private static Map<String, String> writeProperties(
Table table, FileFormat format, FlinkWriteConf conf) {
Map<String, String> writeProperties = Maps.newHashMap(table.properties());
switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec());
String parquetCompressionLevel = conf.parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}
break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
String avroCompressionLevel = conf.avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel());
}
break;
case ORC:
writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy());
break;
default:
throw new IllegalArgumentException(String.format("Unknown file format %s", format));
}
return writeProperties;
}
}