Skip to content

Commit e310353

Browse files
authored
[Feature][Connector-V2] Iceberg-sink supports writing data to branches (#6697)
1 parent c23804f commit e310353

File tree

7 files changed

+302
-1
lines changed

7 files changed

+302
-1
lines changed

docs/en/connector-v2/sink/Iceberg.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ libfb303-xxx.jar
7272
| iceberg.table.upsert-mode-enabled | boolean | no | false | Set to `true` to enable upsert mode, default is `false` |
7373
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below |
7474
| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below |
75+
| iceberg.table.commit-branch | string | no | - | Default branch for commits |
7576

7677
## Task Example
7778

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SinkConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,20 @@ public class SinkConfig extends CommonConfig {
109109
.defaultValue(DataSaveMode.APPEND_DATA)
110110
.withDescription("data save mode");
111111

112+
public static final Option<String> TABLES_DEFAULT_COMMIT_BRANCH =
113+
Options.key("iceberg.table.commit-branch")
114+
.stringType()
115+
.noDefaultValue()
116+
.withDescription("Default branch for commits");
117+
112118
@VisibleForTesting private static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))";
113119

114120
private final ReadonlyConfig readonlyConfig;
115121
private Map<String, String> autoCreateProps;
116122
private Map<String, String> writeProps;
117123
private List<String> primaryKeys;
118124
private List<String> partitionKeys;
125+
private String commitBranch;
119126

120127
private boolean upsertModeEnabled;
121128
private boolean tableSchemaEvolutionEnabled;
@@ -133,6 +140,7 @@ public SinkConfig(ReadonlyConfig readonlyConfig) {
133140
this.tableSchemaEvolutionEnabled = readonlyConfig.get(TABLE_SCHEMA_EVOLUTION_ENABLED_PROP);
134141
this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
135142
this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
143+
this.commitBranch = readonlyConfig.get(TABLES_DEFAULT_COMMIT_BRANCH);
136144
}
137145

138146
@VisibleForTesting

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public OptionRule optionRule() {
6262
SinkConfig.TABLE_PRIMARY_KEYS,
6363
SinkConfig.TABLE_DEFAULT_PARTITION_KEYS,
6464
SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP,
65-
SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP)
65+
SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP,
66+
SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH)
6667
.build();
6768
}
6869

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@
3939
public class IcebergFilesCommitter implements Serializable {
4040
private IcebergTableLoader icebergTableLoader;
4141
private boolean caseSensitive;
42+
private String branch;
4243

4344
private IcebergFilesCommitter(SinkConfig config, IcebergTableLoader icebergTableLoader) {
4445
this.icebergTableLoader = icebergTableLoader;
4546
this.caseSensitive = config.isCaseSensitive();
47+
this.branch = config.getCommitBranch();
4648
}
4749

4850
public static IcebergFilesCommitter of(
@@ -77,10 +79,16 @@ private void commit(TableIdentifier tableIdentifier, Table table, List<WriteResu
7779
} else {
7880
if (deleteFiles.isEmpty()) {
7981
AppendFiles append = table.newAppend();
82+
if (branch != null) {
83+
append.toBranch(branch);
84+
}
8085
dataFiles.forEach(append::appendFile);
8186
append.commit();
8287
} else {
8388
RowDelta delta = table.newRowDelta();
89+
if (branch != null) {
90+
delta.toBranch(branch);
91+
}
8492
delta.caseSensitive(caseSensitive);
8593
dataFiles.forEach(delta::addRows);
8694
deleteFiles.forEach(delta::addDeletes);

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ public RecordWriter createWriter(SeaTunnelRowType rowType) {
8181
tableLoader.getTableIdentifier(),
8282
config,
8383
rowType);
84+
// Create an empty snapshot for the branch
85+
if (config.getCommitBranch() != null) {
86+
table.manageSnapshots().createBranch(config.getCommitBranch()).commit();
87+
}
8488
break;
8589
default:
8690
throw exception;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.e2e.connector.iceberg;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.common.utils.FileUtils;
22+
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader;
23+
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
24+
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
25+
import org.apache.seatunnel.e2e.common.TestSuiteBase;
26+
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
27+
import org.apache.seatunnel.e2e.common.container.TestContainer;
28+
import org.apache.seatunnel.e2e.common.container.TestContainerId;
29+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
30+
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
31+
32+
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.fs.Path;
34+
import org.apache.iceberg.DataFile;
35+
import org.apache.iceberg.FileScanTask;
36+
import org.apache.iceberg.SnapshotRef;
37+
import org.apache.iceberg.Table;
38+
import org.apache.iceberg.TableScan;
39+
import org.apache.iceberg.io.CloseableIterable;
40+
import org.apache.parquet.avro.AvroParquetReader;
41+
import org.apache.parquet.hadoop.ParquetReader;
42+
import org.apache.parquet.hadoop.util.HadoopInputFile;
43+
44+
import org.junit.jupiter.api.Assertions;
45+
import org.junit.jupiter.api.TestTemplate;
46+
import org.junit.jupiter.api.condition.DisabledOnOs;
47+
import org.junit.jupiter.api.condition.OS;
48+
import org.testcontainers.containers.Container;
49+
import org.testcontainers.containers.GenericContainer;
50+
51+
import lombok.extern.slf4j.Slf4j;
52+
53+
import java.io.IOException;
54+
import java.util.ArrayList;
55+
import java.util.HashMap;
56+
import java.util.List;
57+
import java.util.Map;
58+
import java.util.concurrent.TimeUnit;
59+
60+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
61+
import static org.awaitility.Awaitility.given;
62+
63+
@Slf4j
64+
@DisabledOnContainer(
65+
value = {TestContainerId.SPARK_2_4},
66+
type = {},
67+
disabledReason = "")
68+
@DisabledOnOs(OS.WINDOWS)
69+
public class IcebergSinkWithBranchIT extends TestSuiteBase {
70+
71+
private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/hadoop-sink/";
72+
73+
private static final String NAMESPACE = "seatunnel_namespace";
74+
75+
private static final String commitBranch = "commit-branch";
76+
77+
private String zstdUrl() {
78+
return "https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.5-5/zstd-jni-1.5.5-5.jar";
79+
}
80+
81+
@TestContainerExtension
82+
protected final ContainerExtendedFactory extendedFactory =
83+
container -> {
84+
container.execInContainer("sh", "-c", "mkdir -p " + CATALOG_DIR);
85+
container.execInContainer("sh", "-c", "chmod -R 777 " + CATALOG_DIR);
86+
container.execInContainer(
87+
"sh",
88+
"-c",
89+
"mkdir -p /tmp/seatunnel/plugins/Iceberg/lib && cd /tmp/seatunnel/plugins/Iceberg/lib && wget "
90+
+ zstdUrl());
91+
};
92+
93+
private final String NAMESPACE_TAR = NAMESPACE + ".tar.gz";
94+
protected final ContainerExtendedFactory containerExtendedFactory =
95+
new ContainerExtendedFactory() {
96+
@Override
97+
public void extend(GenericContainer<?> container)
98+
throws IOException, InterruptedException {
99+
FileUtils.createNewDir(CATALOG_DIR);
100+
container.execInContainer(
101+
"sh",
102+
"-c",
103+
"cd "
104+
+ CATALOG_DIR
105+
+ " && tar -czvf "
106+
+ NAMESPACE_TAR
107+
+ " "
108+
+ NAMESPACE);
109+
container.copyFileFromContainer(
110+
CATALOG_DIR + NAMESPACE_TAR, CATALOG_DIR + NAMESPACE_TAR);
111+
extractFiles();
112+
}
113+
114+
private void extractFiles() {
115+
ProcessBuilder processBuilder = new ProcessBuilder();
116+
processBuilder.command(
117+
"sh", "-c", "cd " + CATALOG_DIR + " && tar -zxvf " + NAMESPACE_TAR);
118+
try {
119+
Process process = processBuilder.start();
120+
int exitCode = process.waitFor();
121+
if (exitCode == 0) {
122+
log.info("Extract files successful.");
123+
} else {
124+
log.error("Extract files failed with exit code " + exitCode);
125+
}
126+
} catch (IOException | InterruptedException e) {
127+
log.error("Extract data files from container error :", e);
128+
}
129+
}
130+
};
131+
132+
@TestTemplate
133+
public void testInsertAndCheckDataE2e(TestContainer container)
134+
throws IOException, InterruptedException {
135+
Container.ExecResult textWriteResult =
136+
container.executeJob("/iceberg/fake_to_iceberg_with_branch.conf");
137+
Assertions.assertEquals(0, textWriteResult.getExitCode());
138+
// stream stage
139+
given().ignoreExceptions()
140+
.await()
141+
.atMost(60000, TimeUnit.MILLISECONDS)
142+
.untilAsserted(
143+
() -> {
144+
// copy iceberg to local
145+
container.executeExtraCommands(containerExtendedFactory);
146+
// check branch exists
147+
Assertions.assertEquals(true, checkBranchExists());
148+
// load from branch
149+
Assertions.assertEquals(100, loadDataFromIcebergTableBranch().size());
150+
});
151+
}
152+
153+
private boolean checkBranchExists() {
154+
Table table = getTable();
155+
Map<String, SnapshotRef> refs = table.refs();
156+
if (refs.containsKey(commitBranch)) {
157+
return true;
158+
}
159+
return false;
160+
}
161+
162+
private List<Object> loadDataFromIcebergTableBranch() {
163+
List<Object> results = new ArrayList<>();
164+
Table table = getTable();
165+
TableScan branchRead = table.newScan().useRef(commitBranch);
166+
CloseableIterable<FileScanTask> fileScanTasks = branchRead.planFiles();
167+
fileScanTasks.forEach(
168+
fileScanTask -> {
169+
try {
170+
DataFile file = fileScanTask.file();
171+
HadoopInputFile inputFile =
172+
HadoopInputFile.fromPath(
173+
new Path(file.path().toString()), new Configuration());
174+
try (ParquetReader<Object> reader =
175+
AvroParquetReader.builder(inputFile).build()) {
176+
Object record;
177+
while ((record = reader.read()) != null) {
178+
results.add(record);
179+
}
180+
}
181+
} catch (IOException e) {
182+
log.error("Table scan branch error :", e);
183+
}
184+
});
185+
return results;
186+
}
187+
188+
public Table getTable() {
189+
190+
Map<String, Object> configs = new HashMap<>();
191+
Map<String, Object> catalogProps = new HashMap<>();
192+
catalogProps.put("type", HADOOP.getType());
193+
catalogProps.put("warehouse", "file://" + CATALOG_DIR);
194+
configs.put(CommonConfig.KEY_CATALOG_NAME.key(), "seatunnel_test");
195+
configs.put(CommonConfig.KEY_NAMESPACE.key(), "seatunnel_namespace");
196+
configs.put(CommonConfig.KEY_TABLE.key(), "iceberg_sink_table");
197+
configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps);
198+
IcebergTableLoader tableLoader =
199+
IcebergTableLoader.create(new SourceConfig(ReadonlyConfig.fromMap(configs)));
200+
tableLoader.open();
201+
// from branch
202+
return tableLoader.loadTable();
203+
}
204+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
22+
# You can set spark configuration here
23+
spark.app.name = "SeaTunnel"
24+
spark.executor.instances = 2
25+
spark.executor.cores = 1
26+
spark.executor.memory = "1g"
27+
spark.master = local
28+
}
29+
30+
source {
31+
FakeSource {
32+
row.num = 100
33+
schema = {
34+
fields {
35+
c_map = "map<string, string>"
36+
c_array = "array<int>"
37+
c_string = string
38+
c_boolean = boolean
39+
c_tinyint = tinyint
40+
c_smallint = smallint
41+
c_int = int
42+
c_bigint = bigint
43+
c_float = float
44+
c_double = double
45+
c_decimal = "decimal(30, 8)"
46+
c_bytes = bytes
47+
c_date = date
48+
c_timestamp = timestamp
49+
}
50+
}
51+
result_table_name = "fake"
52+
}
53+
}
54+
55+
transform {
56+
}
57+
58+
sink {
59+
Iceberg {
60+
catalog_name="seatunnel_test"
61+
iceberg.catalog.config={
62+
"type"="hadoop"
63+
"warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
64+
}
65+
namespace="seatunnel_namespace"
66+
table="iceberg_sink_table"
67+
iceberg.table.write-props={
68+
write.format.default="parquet"
69+
write.target-file-size-bytes=10
70+
}
71+
iceberg.table.commit-branch="commit-branch"
72+
iceberg.table.partition-keys="c_timestamp"
73+
case_sensitive=true
74+
}
75+
}

0 commit comments

Comments
 (0)