Skip to content

Commit 699d165

Browse files
authored
[Feature][Transform-v2] Add metadata transform (#7899)
1 parent a89b86c commit 699d165

File tree

58 files changed

+1819
-48
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1819
-48
lines changed

docs/en/transform-v2/dynamic-compile.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ transform {
8888
compile_pattern="SOURCE_CODE"
8989
source_code="""
9090
import org.apache.seatunnel.api.table.catalog.Column
91-
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
91+
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
9292
import org.apache.seatunnel.api.table.catalog.CatalogTable
9393
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
9494
import org.apache.seatunnel.api.table.type.*;
@@ -146,7 +146,7 @@ transform {
146146
compile_pattern="SOURCE_CODE"
147147
source_code="""
148148
import org.apache.seatunnel.api.table.catalog.Column;
149-
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
149+
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
150150
import org.apache.seatunnel.api.table.catalog.*;
151151
import org.apache.seatunnel.api.table.type.*;
152152
import java.util.ArrayList;

docs/en/transform-v2/metadata.md

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# Metadata
2+
3+
> Metadata transform plugin
4+
5+
## Description
6+
Metadata transform plugin for adding metadata fields to data
7+
8+
## Available Metadata
9+
10+
| Key | DataType | Description |
11+
|:---------:|:--------:|:---------------------------------------------------------------------------------------------------|
12+
| Database | string | Name of the table that contain the row. |
13+
| Table | string | Name of the table that contain the row. |
14+
| RowKind | string | The type of operation |
15+
| EventTime | Long | The time at which the connector processed the event. |
16+
| Delay | Long | The difference between data extraction time and database change time |
17+
| Partition | string | Contains the partition field of the corresponding number table of the row, multiple using `,` join |
18+
19+
### note
20+
`Delay` `Partition` only worked on cdc series connectors for now , except TiDB-CDC
21+
22+
## Options
23+
24+
| name | type | required | default value | Description |
25+
|:---------------:|------|----------|---------------|---------------------------------------------------------------------------|
26+
| metadata_fields | map | yes | | A mapping metadata input fields and their corresponding output fields. |
27+
28+
### metadata_fields [map]
29+
30+
A mapping between metadata fields and their respective output fields.
31+
32+
```hocon
33+
metadata_fields {
34+
Database = c_database
35+
Table = c_table
36+
RowKind = c_rowKind
37+
EventTime = c_ts_ms
38+
Delay = c_delay
39+
}
40+
```
41+
42+
## Examples
43+
44+
```yaml
45+
46+
env {
47+
parallelism = 1
48+
job.mode = "STREAMING"
49+
checkpoint.interval = 5000
50+
read_limit.bytes_per_second = 7000000
51+
read_limit.rows_per_second = 400
52+
}
53+
54+
source {
55+
MySQL-CDC {
56+
result_table_name = "customers_mysql_cdc"
57+
server-id = 5652
58+
username = "root"
59+
password = "zdyk_Dev@2024"
60+
table-names = ["source.user"]
61+
base-url = "jdbc:mysql://172.16.17.123:3306/source"
62+
}
63+
}
64+
65+
transform {
66+
Metadata {
67+
metadata_fields {
68+
Database = database
69+
Table = table
70+
RowKind = rowKind
71+
EventTime = ts_ms
72+
Delay = delay
73+
}
74+
result_table_name = "trans_result"
75+
}
76+
}
77+
78+
sink {
79+
Console {
80+
source_table_name = "custom_name"
81+
}
82+
}
83+
84+
```
85+

docs/zh/transform-v2/dynamic-compile.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ transform {
8585
compile_pattern="SOURCE_CODE"
8686
source_code="""
8787
import org.apache.seatunnel.api.table.catalog.Column
88-
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
88+
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
8989
import org.apache.seatunnel.api.table.catalog.CatalogTable
9090
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
9191
import org.apache.seatunnel.api.table.type.*;
@@ -143,7 +143,7 @@ transform {
143143
compile_pattern="SOURCE_CODE"
144144
source_code="""
145145
import org.apache.seatunnel.api.table.catalog.Column;
146-
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
146+
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
147147
import org.apache.seatunnel.api.table.catalog.*;
148148
import org.apache.seatunnel.api.table.type.*;
149149
import java.util.ArrayList;

docs/zh/transform-v2/metadata.md

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# Metadata
2+
3+
> Metadata transform plugin
4+
5+
## Description
6+
元数据转换插件,用于将元数据字段添加到数据中
7+
8+
## 支持的元数据
9+
10+
| Key | DataType | Description |
11+
|:---------:|:--------:|:-----------------------:|
12+
| Database | string | 包含该行的数据库名 |
13+
| Table | string | 包含该行的数表名 |
14+
| RowKind | string | 行类型 |
15+
| EventTime | Long | |
16+
| Delay | Long | 数据抽取时间与数据库变更时间的差 |
17+
| Partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 |
18+
19+
### 注意事项
20+
`Delay` `Partition`目前只适用于cdc系列连接器,除外TiDB-CDC
21+
22+
## 配置选项
23+
24+
| name | type | required | default value | Description |
25+
|:---------------:|------|:--------:|:-------------:|-------------------|
26+
| metadata_fields | map || - | 元数据字段与输入字段相应的映射关系 |
27+
28+
### metadata_fields [map]
29+
30+
元数据字段和相应的输出字段之间的映射关系
31+
32+
```hocon
33+
metadata_fields {
34+
database = c_database
35+
table = c_table
36+
rowKind = c_rowKind
37+
ts_ms = c_ts_ms
38+
delay = c_delay
39+
}
40+
```
41+
42+
## 示例
43+
44+
```yaml
45+
46+
env {
47+
parallelism = 1
48+
job.mode = "STREAMING"
49+
checkpoint.interval = 5000
50+
read_limit.bytes_per_second = 7000000
51+
read_limit.rows_per_second = 400
52+
}
53+
54+
source {
55+
MySQL-CDC {
56+
result_table_name = "customers_mysql_cdc"
57+
server-id = 5652
58+
username = "root"
59+
password = "zdyk_Dev@2024"
60+
table-names = ["source.user"]
61+
base-url = "jdbc:mysql://172.16.17.123:3306/source"
62+
}
63+
}
64+
65+
transform {
66+
Metadata {
67+
metadata_fields {
68+
Database = database
69+
Table = table
70+
RowKind = rowKind
71+
EventTime = ts_ms
72+
Delay = delay
73+
}
74+
result_table_name = "trans_result"
75+
}
76+
}
77+
78+
sink {
79+
Console {
80+
source_table_name = "custom_name"
81+
}
82+
}
83+
84+
```
85+

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,4 @@ seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
154154
seatunnel.transform.LLM = seatunnel-transforms-v2
155155
seatunnel.transform.Embedding = seatunnel-transforms-v2
156156
seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
157+
seatunnel.transform.Metadata = seatunnel-transforms-v2

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,44 @@ public enum CommonOptions {
3030
/**
3131
* The key of {@link Column#getOptions()} to specify the column value is a json format string.
3232
*/
33-
JSON("Json"),
33+
JSON("Json", false),
3434
/** The key of {@link Column#getOptions()} to specify the column value is a metadata field. */
35-
METADATA("Metadata"),
35+
METADATA("Metadata", false),
3636
/**
3737
* The key of {@link SeaTunnelRow#getOptions()} to store the partition value of the row value.
3838
*/
39-
PARTITION("Partition"),
40-
;
39+
PARTITION("Partition", true),
40+
/**
41+
* The key of {@link SeaTunnelRow#getOptions()} to store the DATABASE value of the row value.
42+
*/
43+
DATABASE("Database", true),
44+
/** The key of {@link SeaTunnelRow#getOptions()} to store the TABLE value of the row value. */
45+
TABLE("Table", true),
46+
/**
47+
* The key of {@link SeaTunnelRow#getOptions()} to store the ROW_KIND value of the row value.
48+
*/
49+
ROW_KIND("RowKind", true),
50+
/**
51+
* The key of {@link SeaTunnelRow#getOptions()} to store the EVENT_TIME value of the row value.
52+
*/
53+
EVENT_TIME("EventTime", true),
54+
/** The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value of the row value. */
55+
DELAY("Delay", true);
4156

4257
private final String name;
58+
private final boolean supportMetadataTrans;
4359

44-
CommonOptions(String name) {
60+
CommonOptions(String name, boolean supportMetadataTrans) {
4561
this.name = name;
62+
this.supportMetadataTrans = supportMetadataTrans;
63+
}
64+
65+
public static CommonOptions fromName(String name) {
66+
for (CommonOptions option : CommonOptions.values()) {
67+
if (option.getName().equals(name)) {
68+
return option;
69+
}
70+
}
71+
throw new IllegalArgumentException("Unknown option name: " + name);
4672
}
4773
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.table.type;
19+
20+
import org.apache.seatunnel.api.table.catalog.TablePath;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.Objects;
25+
import java.util.stream.Stream;
26+
27+
import static org.apache.seatunnel.api.table.type.CommonOptions.DELAY;
28+
import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
29+
import static org.apache.seatunnel.api.table.type.CommonOptions.PARTITION;
30+
31+
public class MetadataUtil {
32+
33+
public static final List<String> METADATA_FIELDS;
34+
35+
static {
36+
METADATA_FIELDS = new ArrayList<>();
37+
Stream.of(CommonOptions.values())
38+
.filter(CommonOptions::isSupportMetadataTrans)
39+
.map(CommonOptions::getName)
40+
.forEach(METADATA_FIELDS::add);
41+
}
42+
43+
public static void setDelay(SeaTunnelRow row, Long delay) {
44+
row.getOptions().put(DELAY.getName(), delay);
45+
}
46+
47+
public static void setPartition(SeaTunnelRow row, String[] partition) {
48+
row.getOptions().put(PARTITION.getName(), partition);
49+
}
50+
51+
public static void setEventTime(SeaTunnelRow row, Long delay) {
52+
row.getOptions().put(EVENT_TIME.getName(), delay);
53+
}
54+
55+
public static Long getDelay(SeaTunnelRowAccessor row) {
56+
return (Long) row.getOptions().get(DELAY.getName());
57+
}
58+
59+
public static String getDatabase(SeaTunnelRowAccessor row) {
60+
if (row.getTableId() == null) {
61+
return null;
62+
}
63+
return TablePath.of(row.getTableId()).getDatabaseName();
64+
}
65+
66+
public static String getTable(SeaTunnelRowAccessor row) {
67+
if (row.getTableId() == null) {
68+
return null;
69+
}
70+
return TablePath.of(row.getTableId()).getTableName();
71+
}
72+
73+
public static String getRowKind(SeaTunnelRowAccessor row) {
74+
return row.getRowKind().shortString();
75+
}
76+
77+
public static String getPartitionStr(SeaTunnelRowAccessor row) {
78+
Object partition = row.getOptions().get(PARTITION.getName());
79+
return Objects.nonNull(partition) ? String.join(",", (String[]) partition) : null;
80+
}
81+
82+
public static String[] getPartition(SeaTunnelRowAccessor row) {
83+
return (String[]) row.getOptions().get(PARTITION.getName());
84+
}
85+
86+
public static Long getEventTime(SeaTunnelRowAccessor row) {
87+
return (Long) row.getOptions().get(EVENT_TIME.getName());
88+
}
89+
90+
public static boolean isMetadataField(String fieldName) {
91+
return METADATA_FIELDS.contains(fieldName);
92+
}
93+
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ public final class SeaTunnelRow implements Serializable {
3434
/** The array to store the actual internal format values. */
3535
private final Object[] fields;
3636

37-
private volatile int size;
38-
3937
private Map<String, Object> options;
4038

39+
private volatile int size;
40+
4141
public SeaTunnelRow(int arity) {
4242
this.fields = new Object[arity];
4343
}

0 commit comments

Comments
 (0)