Skip to content

Commit bd9a313

Browse files
FWLambyangbinbin
andauthored
[Feature][Connector V2] expose configurable options in Iceberg (#3394)
* expose configurable options in Iceberg Co-authored-by: yangbinbin <yangbinbin@aspirecn.com>
1 parent 4f824fe commit bd9a313

File tree

9 files changed

+227
-51
lines changed

9 files changed

+227
-51
lines changed

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.iceberg;
1919

20-
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
20+
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
2121

2222
import lombok.NonNull;
2323
import org.apache.hadoop.conf.Configuration;
@@ -37,12 +37,12 @@ public class IcebergCatalogFactory implements Serializable {
3737
private static final long serialVersionUID = -6003040601422350869L;
3838

3939
private final String catalogName;
40-
private final String catalogType;
40+
private final IcebergCatalogType catalogType;
4141
private final String warehouse;
4242
private final String uri;
4343

4444
public IcebergCatalogFactory(@NonNull String catalogName,
45-
@NonNull String catalogType,
45+
@NonNull IcebergCatalogType catalogType,
4646
@NonNull String warehouse,
4747
String uri) {
4848
this.catalogName = catalogName;
@@ -58,9 +58,9 @@ public Catalog create() {
5858
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
5959

6060
switch (catalogType) {
61-
case CommonConfig.CATALOG_TYPE_HADOOP:
61+
case HADOOP:
6262
return hadoop(catalogName, serializableConf, properties);
63-
case CommonConfig.CATALOG_TYPE_HIVE:
63+
case HIVE:
6464
properties.put(CatalogProperties.URI, uri);
6565
return hive(catalogName, serializableConf, properties);
6666
default:

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,58 +17,92 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.iceberg.config;
1919

20+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
21+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HIVE;
2022
import static com.google.common.base.Preconditions.checkArgument;
2123
import static com.google.common.base.Preconditions.checkNotNull;
2224

25+
import org.apache.seatunnel.api.configuration.Option;
26+
import org.apache.seatunnel.api.configuration.Options;
27+
2328
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2429

2530
import lombok.Getter;
2631
import lombok.ToString;
2732

2833
import java.io.Serializable;
34+
import java.util.List;
2935

3036
@Getter
3137
@ToString
3238
public class CommonConfig implements Serializable {
3339
private static final long serialVersionUID = 239821141534421580L;
3440

35-
private static final String KEY_CATALOG_NAME = "catalog_name";
36-
private static final String KEY_CATALOG_TYPE = "catalog_type";
37-
private static final String KEY_NAMESPACE = "namespace";
38-
private static final String KEY_TABLE = "table";
39-
private static final String KEY_URI = "uri";
40-
private static final String KEY_WAREHOUSE = "warehouse";
41-
private static final String KEY_CASE_SENSITIVE = "case_sensitive";
41+
public static final Option<String> KEY_CATALOG_NAME = Options.key("catalog_name")
42+
.stringType()
43+
.noDefaultValue()
44+
.withDescription(" the iceberg catalog name");
45+
46+
public static final Option<IcebergCatalogType> KEY_CATALOG_TYPE = Options.key("catalog_type")
47+
.enumType(IcebergCatalogType.class)
48+
.noDefaultValue()
49+
.withDescription(" the iceberg catalog type");
50+
51+
public static final Option<String> KEY_NAMESPACE = Options.key("namespace")
52+
.stringType()
53+
.noDefaultValue()
54+
.withDescription(" the iceberg namespace");
55+
56+
public static final Option<String> KEY_TABLE = Options.key("table")
57+
.stringType()
58+
.noDefaultValue()
59+
.withDescription(" the iceberg table");
60+
61+
public static final Option<String> KEY_URI = Options.key("uri")
62+
.stringType()
63+
.noDefaultValue()
64+
.withDescription(" the iceberg server uri");
65+
66+
public static final Option<String> KEY_WAREHOUSE = Options.key("warehouse")
67+
.stringType()
68+
.noDefaultValue()
69+
.withDescription(" the iceberg warehouse");
70+
71+
public static final Option<Boolean> KEY_CASE_SENSITIVE = Options.key("case_sensitive")
72+
.booleanType()
73+
.defaultValue(false)
74+
.withDescription(" the iceberg case_sensitive");
4275

43-
public static final String KEY_FIELDS = "fields";
44-
public static final String CATALOG_TYPE_HADOOP = "hadoop";
45-
public static final String CATALOG_TYPE_HIVE = "hive";
76+
public static final Option<List<String>> KEY_FIELDS = Options.key("fields")
77+
.listType()
78+
.noDefaultValue()
79+
.withDescription(" the iceberg table fields");
4680

4781
private String catalogName;
48-
private String catalogType;
82+
private IcebergCatalogType catalogType;
4983
private String uri;
5084
private String warehouse;
5185
private String namespace;
5286
private String table;
5387
private boolean caseSensitive;
5488

5589
public CommonConfig(Config pluginConfig) {
56-
String catalogType = checkArgumentNotNull(pluginConfig.getString(KEY_CATALOG_TYPE));
57-
checkArgument(CATALOG_TYPE_HADOOP.equals(catalogType)
58-
|| CATALOG_TYPE_HIVE.equals(catalogType),
90+
String catalogType = checkArgumentNotNull(pluginConfig.getString(KEY_CATALOG_TYPE.key()));
91+
checkArgument(HADOOP.getType().equals(catalogType)
92+
|| HIVE.getType().equals(catalogType),
5993
"Illegal catalogType: " + catalogType);
6094

61-
this.catalogType = catalogType;
62-
this.catalogName = checkArgumentNotNull(pluginConfig.getString(KEY_CATALOG_NAME));
63-
if (pluginConfig.hasPath(KEY_URI)) {
64-
this.uri = checkArgumentNotNull(pluginConfig.getString(KEY_URI));
95+
this.catalogType = IcebergCatalogType.valueOf(catalogType.toUpperCase());
96+
this.catalogName = checkArgumentNotNull(pluginConfig.getString(KEY_CATALOG_NAME.key()));
97+
if (pluginConfig.hasPath(KEY_URI.key())) {
98+
this.uri = checkArgumentNotNull(pluginConfig.getString(KEY_URI.key()));
6599
}
66-
this.warehouse = checkArgumentNotNull(pluginConfig.getString(KEY_WAREHOUSE));
67-
this.namespace = checkArgumentNotNull(pluginConfig.getString(KEY_NAMESPACE));
68-
this.table = checkArgumentNotNull(pluginConfig.getString(KEY_TABLE));
100+
this.warehouse = checkArgumentNotNull(pluginConfig.getString(KEY_WAREHOUSE.key()));
101+
this.namespace = checkArgumentNotNull(pluginConfig.getString(KEY_NAMESPACE.key()));
102+
this.table = checkArgumentNotNull(pluginConfig.getString(KEY_TABLE.key()));
69103

70-
if (pluginConfig.hasPath(KEY_CASE_SENSITIVE)) {
71-
this.caseSensitive = pluginConfig.getBoolean(KEY_CASE_SENSITIVE);
104+
if (pluginConfig.hasPath(KEY_CASE_SENSITIVE.key())) {
105+
this.caseSensitive = pluginConfig.getBoolean(KEY_CASE_SENSITIVE.key());
72106
}
73107
}
74108

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.iceberg.config;
19+
20+
public enum IcebergCatalogType {
21+
22+
HADOOP("hadoop"),
23+
HIVE("hive");
24+
25+
final String type;
26+
27+
IcebergCatalogType(String type) {
28+
this.type = type;
29+
}
30+
31+
public String getType() {
32+
return type;
33+
}
34+
}

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceConfig.java

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
package org.apache.seatunnel.connectors.seatunnel.iceberg.config;
2121

22+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergStreamScanStrategy.FROM_LATEST_SNAPSHOT;
23+
24+
import org.apache.seatunnel.api.configuration.Option;
25+
import org.apache.seatunnel.api.configuration.Options;
2226
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergStreamScanStrategy;
2327

2428
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -32,12 +36,35 @@
3236
public class SourceConfig extends CommonConfig {
3337
private static final long serialVersionUID = -1965861967575264253L;
3438

35-
private static final String KEY_START_SNAPSHOT_TIMESTAMP = "start_snapshot_timestamp";
36-
private static final String KEY_START_SNAPSHOT_ID = "start_snapshot_id";
37-
private static final String KEY_END_SNAPSHOT_ID = "end_snapshot_id";
38-
private static final String KEY_USE_SNAPSHOT_ID = "use_snapshot_id";
39-
private static final String KEY_USE_SNAPSHOT_TIMESTAMP = "use_snapshot_timestamp";
40-
private static final String KEY_STREAM_SCAN_STRATEGY = "stream_scan_strategy";
39+
public static final Option<Long> KEY_START_SNAPSHOT_TIMESTAMP = Options.key("start_snapshot_timestamp")
40+
.longType()
41+
.noDefaultValue()
42+
.withDescription(" the iceberg timestamp of starting snapshot ");
43+
44+
public static final Option<Long> KEY_START_SNAPSHOT_ID = Options.key("start_snapshot_id")
45+
.longType()
46+
.noDefaultValue()
47+
.withDescription(" the iceberg id of starting snapshot ");
48+
49+
public static final Option<Long> KEY_END_SNAPSHOT_ID = Options.key("end_snapshot_id")
50+
.longType()
51+
.noDefaultValue()
52+
.withDescription(" the iceberg id of ending snapshot ");
53+
54+
public static final Option<Long> KEY_USE_SNAPSHOT_ID = Options.key("use_snapshot_id")
55+
.longType()
56+
.noDefaultValue()
57+
.withDescription(" the iceberg used snapshot id");
58+
59+
public static final Option<Long> KEY_USE_SNAPSHOT_TIMESTAMP = Options.key("use_snapshot_timestamp")
60+
.longType()
61+
.noDefaultValue()
62+
.withDescription(" the iceberg used snapshot timestamp");
63+
64+
public static final Option<IcebergStreamScanStrategy> KEY_STREAM_SCAN_STRATEGY = Options.key("stream_scan_strategy")
65+
.enumType(IcebergStreamScanStrategy.class)
66+
.defaultValue(FROM_LATEST_SNAPSHOT)
67+
.withDescription(" the iceberg strategy of stream scanning");
4168

4269
private Long startSnapshotTimestamp;
4370
private Long startSnapshotId;
@@ -46,32 +73,32 @@ public class SourceConfig extends CommonConfig {
4673
private Long useSnapshotId;
4774
private Long useSnapshotTimestamp;
4875

49-
private IcebergStreamScanStrategy streamScanStrategy = IcebergStreamScanStrategy.FROM_LATEST_SNAPSHOT;
76+
private IcebergStreamScanStrategy streamScanStrategy = KEY_STREAM_SCAN_STRATEGY.defaultValue();
5077
private Expression filter;
5178
private Long splitSize;
5279
private Integer splitLookback;
5380
private Long splitOpenFileCost;
5481

5582
public SourceConfig(Config pluginConfig) {
5683
super(pluginConfig);
57-
if (pluginConfig.hasPath(KEY_START_SNAPSHOT_TIMESTAMP)) {
58-
this.startSnapshotTimestamp = pluginConfig.getLong(KEY_START_SNAPSHOT_TIMESTAMP);
84+
if (pluginConfig.hasPath(KEY_START_SNAPSHOT_TIMESTAMP.key())) {
85+
this.startSnapshotTimestamp = pluginConfig.getLong(KEY_START_SNAPSHOT_TIMESTAMP.key());
5986
}
60-
if (pluginConfig.hasPath(KEY_START_SNAPSHOT_ID)) {
61-
this.startSnapshotId = pluginConfig.getLong(KEY_START_SNAPSHOT_ID);
87+
if (pluginConfig.hasPath(KEY_START_SNAPSHOT_ID.key())) {
88+
this.startSnapshotId = pluginConfig.getLong(KEY_START_SNAPSHOT_ID.key());
6289
}
63-
if (pluginConfig.hasPath(KEY_END_SNAPSHOT_ID)) {
64-
this.endSnapshotId = pluginConfig.getLong(KEY_END_SNAPSHOT_ID);
90+
if (pluginConfig.hasPath(KEY_END_SNAPSHOT_ID.key())) {
91+
this.endSnapshotId = pluginConfig.getLong(KEY_END_SNAPSHOT_ID.key());
6592
}
66-
if (pluginConfig.hasPath(KEY_USE_SNAPSHOT_ID)) {
67-
this.useSnapshotId = pluginConfig.getLong(KEY_USE_SNAPSHOT_ID);
93+
if (pluginConfig.hasPath(KEY_USE_SNAPSHOT_ID.key())) {
94+
this.useSnapshotId = pluginConfig.getLong(KEY_USE_SNAPSHOT_ID.key());
6895
}
69-
if (pluginConfig.hasPath(KEY_USE_SNAPSHOT_TIMESTAMP)) {
70-
this.useSnapshotTimestamp = pluginConfig.getLong(KEY_USE_SNAPSHOT_TIMESTAMP);
96+
if (pluginConfig.hasPath(KEY_USE_SNAPSHOT_TIMESTAMP.key())) {
97+
this.useSnapshotTimestamp = pluginConfig.getLong(KEY_USE_SNAPSHOT_TIMESTAMP.key());
7198
}
72-
if (pluginConfig.hasPath(KEY_STREAM_SCAN_STRATEGY)) {
99+
if (pluginConfig.hasPath(KEY_STREAM_SCAN_STRATEGY.key())) {
73100
this.streamScanStrategy = pluginConfig.getEnum(
74-
IcebergStreamScanStrategy.class, KEY_STREAM_SCAN_STRATEGY);
101+
IcebergStreamScanStrategy.class, KEY_STREAM_SCAN_STRATEGY.key());
75102
}
76103
}
77104

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private SeaTunnelRowType loadSeaTunnelRowType(Schema tableSchema,
9797
columnNames.toArray(new String[0]),
9898
columnDataTypes.toArray(new SeaTunnelDataType[0]));
9999

100-
CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, CommonConfig.KEY_FIELDS);
100+
CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, CommonConfig.KEY_FIELDS.key());
101101
if (checkResult.isSuccess()) {
102102
SeaTunnelSchema configSchema = SeaTunnelSchema.buildWithConfig(pluginConfig);
103103
SeaTunnelRowType projectedRowType = configSchema.getSeaTunnelRowType();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.iceberg.source;
19+
20+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CASE_SENSITIVE;
21+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CATALOG_NAME;
22+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CATALOG_TYPE;
23+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_FIELDS;
24+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_NAMESPACE;
25+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_TABLE;
26+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_URI;
27+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_WAREHOUSE;
28+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HIVE;
29+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_END_SNAPSHOT_ID;
30+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_START_SNAPSHOT_ID;
31+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_START_SNAPSHOT_TIMESTAMP;
32+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_STREAM_SCAN_STRATEGY;
33+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_USE_SNAPSHOT_ID;
34+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_USE_SNAPSHOT_TIMESTAMP;
35+
36+
import org.apache.seatunnel.api.configuration.util.OptionRule;
37+
import org.apache.seatunnel.api.table.factory.Factory;
38+
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
39+
40+
import com.google.auto.service.AutoService;
41+
42+
@AutoService(Factory.class)
43+
public class IcebergSourceFactory implements TableSourceFactory {
44+
45+
@Override
46+
public String factoryIdentifier() {
47+
return "Iceberg";
48+
}
49+
50+
@Override
51+
public OptionRule optionRule() {
52+
return OptionRule.builder()
53+
.required(
54+
KEY_CATALOG_NAME,
55+
KEY_CATALOG_TYPE,
56+
KEY_WAREHOUSE,
57+
KEY_NAMESPACE,
58+
KEY_TABLE
59+
)
60+
.conditional(KEY_CATALOG_TYPE, HIVE, KEY_URI)
61+
.optional(
62+
KEY_FIELDS,
63+
KEY_CASE_SENSITIVE,
64+
KEY_START_SNAPSHOT_TIMESTAMP,
65+
KEY_START_SNAPSHOT_ID,
66+
KEY_END_SNAPSHOT_ID,
67+
KEY_USE_SNAPSHOT_ID,
68+
KEY_USE_SNAPSHOT_TIMESTAMP,
69+
KEY_STREAM_SCAN_STRATEGY
70+
)
71+
.build();
72+
}
73+
}

seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/TestIcebergMetastore.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.iceberg;
1919

20+
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HIVE;
21+
2022
import org.apache.hadoop.hive.conf.HiveConf;
2123
import org.apache.iceberg.catalog.Namespace;
2224
import org.apache.iceberg.hive.HiveCatalog;
@@ -48,7 +50,7 @@ public void testUseHiveMetastore() {
4850
new File(warehousePath).mkdirs();
4951

5052
HiveCatalog catalog = (HiveCatalog) new IcebergCatalogFactory("seatunnel",
51-
"hive",
53+
HIVE,
5254
"file://" + warehousePath,
5355
METASTORE_URI)
5456
.create();

0 commit comments

Comments
 (0)