Skip to content

Commit 7728e24

Browse files
zhilinli123zhilinlihailin0
authored
[Feature][formats][ogg] Support read ogg format message #4201 (#4225)
--------- Co-authored-by: zhilinli <lzl15844876351@163.com> Co-authored-by: hailin0 <wanghailin@apache.org>
1 parent 38132f5 commit 7728e24

File tree

34 files changed

+1576
-1431
lines changed

34 files changed

+1576
-1431
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Ogg Format
2+
3+
[Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a ogg) is a managed service providing a real-time data mesh platform, which uses replication to keep data highly available, and enabling real-time analysis. Customers can design, execute, and monitor their data replication and stream data processing solutions without the need to allocate or manage compute environments. Ogg provides a format schema for changelog and supports to serialize messages using JSON.
4+
5+
Seatunnel supports to interpret Ogg JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as
6+
7+
synchronizing incremental data from databases to other systems
8+
auditing logs
9+
real-time materialized views on databases
10+
temporal join changing history of a database table and so on.
11+
12+
Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel as Ogg JSON messages, and emit to storage like Kafka. However, currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Ogg messages.
13+
14+
# Format Options
15+
16+
| option | default | required | Description |
17+
|------------------------------|---------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
18+
| format | (none) | yes | Specify what format to use, here should be '-json'. |
19+
| ogg_json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
20+
| ogg_json.database.include | (none) | no | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
21+
| ogg_json.table.include | (none) | no | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
22+
23+
# How to use Ogg format
24+
25+
## Kafka uses example
26+
27+
Ogg provides a unified format for changelog, here is a simple example for an update operation captured from a Oracle products table:
28+
29+
```bash
30+
{
31+
"before": {
32+
"id": 111,
33+
"name": "scooter",
34+
"description": "Big 2-wheel scooter",
35+
"weight": 5.18
36+
},
37+
"after": {
38+
"id": 111,
39+
"name": "scooter",
40+
"description": "Big 2-wheel scooter",
41+
"weight": 5.15
42+
},
43+
"op_type": "U",
44+
"op_ts": "2020-05-13 15:40:06.000000",
45+
"current_ts": "2020-05-13 15:40:07.000000",
46+
"primary_keys": [
47+
"id"
48+
],
49+
"pos": "00000000000000000000143",
50+
"table": "PRODUCTS"
51+
}
52+
```
53+
54+
Note: please refer to documentation about the meaning of each fields.
55+
56+
The Oracle products table has 4 columns (id, name, description and weight).
57+
The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15.
58+
Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel to consume this topic and interpret the change events.
59+
60+
```bash
61+
env {
62+
execution.parallelism = 1
63+
job.mode = "STREAMING"
64+
}
65+
source {
66+
Kafka {
67+
bootstrap.servers = "127.0.0.1:9092"
68+
topic = "ogg"
69+
result_table_name = "kafka_name"
70+
start_mode = earliest
71+
schema = {
72+
fields {
73+
id = "int"
74+
name = "string"
75+
description = "string"
76+
weight = "double"
77+
}
78+
},
79+
format = ogg_json
80+
}
81+
}
82+
sink {
83+
jdbc {
84+
url = "jdbc:mysql://127.0.0.1/test"
85+
driver = "com.mysql.cj.jdbc.Driver"
86+
user = "root"
87+
password = "12345678"
88+
table = "ogg"
89+
primary_keys = ["id"]
90+
}
91+
}
92+
```
93+

docs/en/connector-v2/sink/Kafka.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ Currently two formats are supported:
5656

5757
For example, Upstream data is the following:
5858

59-
| name | age | data |
60-
|------|-----|---------------|
61-
| Jack | 16 | data-example1 |
62-
| Mary | 23 | data-example2 |
59+
| name | age | data |
60+
|------|-----|---------------|
61+
| Jack | 16 | data-example1 |
62+
| Mary | 23 | data-example2 |
6363

64-
If `${name}` is set as the topic. So the first row is sent to Jack topic, and the second row is sent to Mary topic.
64+
If `${name}` is set as the topic. So the first row is sent to Jack topic, and the second row is sent to Mary topic.
6565

6666
### Semantics
6767

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/MySqlContainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ public String getJdbcUrl(String databaseName) {
102102
+ additionalUrlParams;
103103
}
104104

105+
public void setDatabaseName(String databaseName) {
106+
this.databaseName = databaseName;
107+
}
108+
105109
@Override
106110
public String getJdbcUrl() {
107111
return getJdbcUrl(databaseName);

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,25 +59,38 @@ public class UniqueDatabase {
5959
private final String username;
6060
private final String password;
6161

62+
/**
63+
* @param container mysql docker container
64+
* @param databaseName name of the database
65+
* @param username Connection user name
66+
* @param password Connection password
67+
* @param templateName Execute ddl/ directory file name
68+
*/
6269
public UniqueDatabase(
63-
MySqlContainer container, String databaseName, String username, String password) {
70+
MySqlContainer container,
71+
String databaseName,
72+
String username,
73+
String password,
74+
String templateName) {
6475
this(
6576
container,
6677
databaseName,
6778
Integer.toUnsignedString(new Random().nextInt(), 36),
6879
username,
69-
password);
80+
password,
81+
(!templateName.isEmpty() && templateName != null) ? templateName : password);
7082
}
7183

7284
private UniqueDatabase(
7385
MySqlContainer container,
7486
String databaseName,
7587
final String identifier,
7688
String username,
77-
String password) {
89+
String password,
90+
String templateName) {
7891
this.container = container;
7992
this.databaseName = databaseName + "_" + identifier;
80-
this.templateName = databaseName;
93+
this.templateName = templateName;
8194
this.username = username;
8295
this.password = password;
8396
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ public enum MessageFormat {
2323
CANAL_JSON,
2424
DEBEZIUM_JSON,
2525
COMPATIBLE_DEBEZIUM_JSON,
26-
COMPATIBLE_KAFKA_CONNECT_JSON
26+
COMPATIBLE_KAFKA_CONNECT_JSON,
27+
OGG_JSON
2728
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
3131
import org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema;
3232
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
33+
import org.apache.seatunnel.format.json.ogg.OggJsonSerializationSchema;
3334
import org.apache.seatunnel.format.text.TextSerializationSchema;
3435

3536
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -220,6 +221,8 @@ private static SerializationSchema createSerializationSchema(
220221
.build();
221222
case CANAL_JSON:
222223
return new CanalJsonSerializationSchema(rowType);
224+
case OGG_JSON:
225+
return new OggJsonSerializationSchema(rowType);
223226
case DEBEZIUM_JSON:
224227
return new DebeziumJsonSerializationSchema(rowType);
225228
case COMPATIBLE_DEBEZIUM_JSON:

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ public OptionRule optionRule() {
4343
.conditional(
4444
Config.FORMAT,
4545
Arrays.asList(
46-
MessageFormat.JSON, MessageFormat.CANAL_JSON, MessageFormat.TEXT),
46+
MessageFormat.JSON,
47+
MessageFormat.CANAL_JSON,
48+
MessageFormat.TEXT,
49+
MessageFormat.OGG_JSON),
4750
Config.TOPIC)
4851
.optional(
4952
Config.KAFKA_CONFIG,

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
4040
import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
4141
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
42+
import org.apache.seatunnel.format.json.ogg.OggJsonDeserializationSchema;
4243
import org.apache.seatunnel.format.text.TextDeserializationSchema;
4344
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
4445

@@ -218,6 +219,10 @@ private DeserializationSchema<SeaTunnelRow> createDeserializationSchema(
218219
return CanalJsonDeserializationSchema.builder(seaTunnelRowType)
219220
.setIgnoreParseErrors(true)
220221
.build();
222+
case OGG_JSON:
223+
return OggJsonDeserializationSchema.builder(seaTunnelRowType)
224+
.setIgnoreParseErrors(true)
225+
.build();
221226
case COMPATIBLE_KAFKA_CONNECT_JSON:
222227
Boolean keySchemaEnable =
223228
readonlyConfig.get(

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public class MysqlCDCIT extends TestSuiteBase implements TestResource {
7070
private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
7171

7272
private final UniqueDatabase inventoryDatabase =
73-
new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
73+
new UniqueDatabase(
74+
MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", MYSQL_DATABASE);
7475

7576
// mysql source table query sql
7677
private static final String SOURCE_SQL_TEMPLATE =

0 commit comments

Comments
 (0)