Skip to content

Commit 20e1255

Browse files
wfrongfurong.wang
andauthored
[Improve][SourceConnector] Unifie Iceberg source fields to schema (#3959)
* Unifie Iceberg source fields to schema * add compatibility adaptation * e2e module struct nested field name change to uppercase * e2e module select fields remove struct type * make better code style --------- Co-authored-by: furong.wang <furong.wang@ly.com>
1 parent 83ceb8a commit 20e1255

File tree

4 files changed

+46
-22
lines changed

4 files changed

+46
-22
lines changed

docs/en/connector-v2/source/Iceberg.md

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Source connector for Apache Iceberg. It can support batch and stream mode.
3232
| warehouse | string | yes | - |
3333
| namespace | string | yes | - |
3434
| table | string | yes | - |
35+
| schema | config | no | - |
3536
| case_sensitive | boolean | no | false |
3637
| start_snapshot_timestamp | long | no | - |
3738
| start_snapshot_id | long | no | - |
@@ -69,12 +70,27 @@ The iceberg table name in the backend catalog.
6970

7071
### case_sensitive [boolean]
7172

72-
If data columns where selected via fields(Collection), controls whether the match to the schema will be done with case sensitivity.
73+
If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity.
7374

74-
### fields [array]
75+
### schema [config]
76+
77+
#### fields [Config]
7578

7679
Use projection to select data columns and columns order.
7780

81+
e.g.
82+
83+
```
84+
schema {
85+
fields {
86+
f2 = "boolean"
87+
f1 = "bigint"
88+
f3 = "int"
89+
f4 = "bigint"
90+
}
91+
}
92+
```
93+
7894
### start_snapshot_id [long]
7995

8096
Instructs this scan to look for changes starting from a particular snapshot (exclusive).

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,15 @@ private SeaTunnelRowType loadSeaTunnelRowType(Schema tableSchema, Config pluginC
105105
columnDataTypes.toArray(new SeaTunnelDataType[0]));
106106

107107
CheckResult checkResult =
108-
CheckConfigUtil.checkAllExists(pluginConfig, CommonConfig.KEY_FIELDS.key());
108+
CheckConfigUtil.checkAtLeastOneExists(
109+
pluginConfig, CommonConfig.KEY_FIELDS.key(), SeaTunnelSchema.SCHEMA.key());
110+
109111
if (checkResult.isSuccess()) {
110-
SeaTunnelSchema configSchema = SeaTunnelSchema.buildWithConfig(pluginConfig);
112+
Config config =
113+
pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())
114+
? pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key())
115+
: pluginConfig;
116+
SeaTunnelSchema configSchema = SeaTunnelSchema.buildWithConfig(config);
111117
SeaTunnelRowType projectedRowType = configSchema.getSeaTunnelRowType();
112118
for (int i = 0; i < projectedRowType.getFieldNames().length; i++) {
113119
String fieldName = projectedRowType.getFieldName(i);

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import org.apache.seatunnel.api.source.SeaTunnelSource;
2222
import org.apache.seatunnel.api.table.factory.Factory;
2323
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
24+
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
2425

2526
import com.google.auto.service.AutoService;
2627

2728
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CASE_SENSITIVE;
2829
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CATALOG_NAME;
2930
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CATALOG_TYPE;
30-
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_FIELDS;
3131
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_NAMESPACE;
3232
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_TABLE;
3333
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_URI;
@@ -55,7 +55,7 @@ public OptionRule optionRule() {
5555
KEY_CATALOG_NAME, KEY_CATALOG_TYPE, KEY_WAREHOUSE, KEY_NAMESPACE, KEY_TABLE)
5656
.conditional(KEY_CATALOG_TYPE, HIVE, KEY_URI)
5757
.optional(
58-
KEY_FIELDS,
58+
SeaTunnelSchema.SCHEMA,
5959
KEY_CASE_SENSITIVE,
6060
KEY_START_SNAPSHOT_TIMESTAMP,
6161
KEY_START_SNAPSHOT_ID,

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,24 @@ env {
2626

2727
source {
2828
Iceberg {
29-
fields {
30-
f2 = "boolean"
31-
f1 = "bigint"
32-
f3 = "int"
33-
f4 = "bigint"
34-
f5 = "float"
35-
f6 = "double"
36-
f7 = "date"
37-
f9 = "timestamp"
38-
f10 = "timestamp"
39-
f11 = "string"
40-
f12 = "bytes"
41-
f13 = "bytes"
42-
f14 = "decimal(19,9)"
43-
f15 = "array<int>"
44-
f16 = "map<string, int>"
29+
schema {
30+
fields {
31+
f2 = "boolean"
32+
f1 = "bigint"
33+
f3 = "int"
34+
f4 = "bigint"
35+
f5 = "float"
36+
f6 = "double"
37+
f7 = "date"
38+
f9 = "timestamp"
39+
f10 = "timestamp"
40+
f11 = "string"
41+
f12 = "bytes"
42+
f13 = "bytes"
43+
f14 = "decimal(19,9)"
44+
f15 = "array<int>"
45+
f16 = "map<string, int>"
46+
}
4547
}
4648
catalog_name = "seatunnel"
4749
catalog_type = "hadoop"

0 commit comments

Comments
 (0)