Skip to content

Commit 3ec47c6

Browse files
authored
[feature][connector-v2][elasticsearch] Support write cdc changelog event in elasticsearch sink (#3673)
1 parent 0a09562 commit 3ec47c6

File tree

9 files changed

+257
-30
lines changed

9 files changed

+257
-30
lines changed

docs/en/concept/connector-v2-features.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,8 @@ For sink connector, the sink connector supports exactly-once if any piece of dat
6262

6363
### schema projection
6464

65-
If a sink connector supports the fields and their types or redefine columns order written in the configuration, we think it supports schema projection.
65+
If a sink connector supports the fields and their types or redefine columns order written in the configuration, we think it supports schema projection.
66+
67+
### cdc(change data capture)
68+
69+
If a sink connector supports writing row kinds(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) based on primary key, we think it supports cdc(change data capture).

docs/en/connector-v2/sink/Elasticsearch.md

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Output data to `Elasticsearch`.
88

99
- [ ] [exactly-once](../../concept/connector-v2-features.md)
1010
- [ ] [schema projection](../../concept/connector-v2-features.md)
11+
- [x] [cdc](../../concept/connector-v2-features.md)
1112

1213
:::tip
1314

@@ -24,6 +25,8 @@ Engine Supported
2425
| hosts | array | yes | - |
2526
| index | string | yes | - |
2627
| index_type | string | no | |
28+
| primary_keys | list | no | |
29+
| key_delimiter | string | no | `_` |
2730
| username | string | no | |
2831
| password | string | no | |
2932
| max_retry_count | int | no | 3 |
@@ -41,6 +44,12 @@ If not, we will treat it as a normal index.
4144
### index_type [string]
4245
`Elasticsearch` index type, it is recommended not to specify in elasticsearch 6 and above
4346

47+
### primary_keys [list]
48+
Primary key fields used to generate the document `_id`, this is cdc required options.
49+
50+
### key_delimiter [string]
51+
Delimiter for composite keys ("_" by default), e.g., "$" would result in document `_id` "KEY1$KEY2$KEY3".
52+
4453
### username [string]
4554
x-pack username
4655

@@ -58,10 +67,29 @@ batch bulk doc max size
5867
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
5968

6069
## Examples
70+
71+
Simple
72+
6173
```bash
62-
Elasticsearch {
63-
hosts = ["localhost:9200"]
64-
index = "seatunnel-${age}"
74+
sink {
75+
Elasticsearch {
76+
hosts = ["localhost:9200"]
77+
index = "seatunnel-${age}"
78+
}
79+
}
80+
```
81+
82+
CDC(Change data capture) event
83+
84+
```bash
85+
sink {
86+
Elasticsearch {
87+
hosts = ["localhost:9200"]
88+
index = "seatunnel-${age}"
89+
90+
# cdc required options
91+
primary_keys = ["key1", "key2", ...]
92+
}
6593
}
6694
```
6795

@@ -70,3 +98,7 @@ Elasticsearch {
7098
### 2.2.0-beta 2022-09-26
7199

72100
- Add Elasticsearch Sink Connector
101+
102+
### next version
103+
104+
- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3673](https://github.com/apache/incubator-seatunnel/pull/3673))

docs/en/connector-v2/sink/Jdbc.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` f
1515
support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
1616

1717
- [ ] [schema projection](../../concept/connector-v2-features.md)
18+
- [x] [cdc](../../concept/connector-v2-features.md)
1819

1920
## Options
2021

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
2222

23+
import java.util.List;
24+
2325
public class SinkConfig {
2426

2527
public static final Option<String> INDEX = Options.key("index").stringType().noDefaultValue()
@@ -28,6 +30,12 @@ public class SinkConfig {
2830
public static final Option<String> INDEX_TYPE = Options.key("index_type").stringType().noDefaultValue()
2931
.withDescription("Elasticsearch index type, it is recommended not to specify in elasticsearch 6 and above");
3032

33+
public static final Option<List<String>> PRIMARY_KEYS = Options.key("primary_keys").listType(String.class).noDefaultValue()
34+
.withDescription("Primary key fields used to generate the document `_id`");
35+
36+
public static final Option<String> KEY_DELIMITER = Options.key("key_delimiter").stringType().defaultValue("_")
37+
.withDescription("Delimiter for composite keys (\"_\" by default), e.g., \"$\" would result in document `_id` \"KEY1$KEY2$KEY3\".");
38+
3139
@SuppressWarnings("checkstyle:MagicNumber")
3240
public static final Option<Integer> MAX_BATCH_SIZE = Options.key("max_batch_size").intType().defaultValue(10)
3341
.withDescription("batch bulk doc max size");

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,20 @@ public class IndexInfo {
3131

3232
private String index;
3333
private String type;
34+
private String[] primaryKeys;
35+
private String keyDelimiter;
3436

3537
public IndexInfo(Config pluginConfig) {
3638
index = pluginConfig.getString(SinkConfig.INDEX.key());
3739
if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE.key())) {
3840
type = pluginConfig.getString(SinkConfig.INDEX_TYPE.key());
3941
}
40-
}
41-
42-
public IndexInfo(String index, String type) {
43-
this.index = index;
44-
this.type = type;
42+
if (pluginConfig.hasPath(SinkConfig.PRIMARY_KEYS.key())) {
43+
primaryKeys = pluginConfig.getStringList(SinkConfig.PRIMARY_KEYS.key()).toArray(new String[0]);
44+
}
45+
keyDelimiter = SinkConfig.KEY_DELIMITER.defaultValue();
46+
if (pluginConfig.hasPath(SinkConfig.KEY_DELIMITER.key())) {
47+
keyDelimiter = pluginConfig.getString(SinkConfig.KEY_DELIMITER.key());
48+
}
4549
}
4650
}

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java

Lines changed: 84 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030

3131
import com.fasterxml.jackson.core.JsonProcessingException;
3232
import com.fasterxml.jackson.databind.ObjectMapper;
33+
import lombok.NonNull;
3334

3435
import java.time.temporal.Temporal;
3536
import java.util.HashMap;
3637
import java.util.Map;
38+
import java.util.function.Function;
3739

3840
/**
3941
* use in elasticsearch version >= 7.*
@@ -45,15 +47,84 @@ public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer {
4547
private final IndexSerializer indexSerializer;
4648

4749
private final IndexTypeSerializer indexTypeSerializer;
50+
private final Function<SeaTunnelRow, String> keyExtractor;
4851

4952
public ElasticsearchRowSerializer(ElasticsearchVersion elasticsearchVersion, IndexInfo indexInfo, SeaTunnelRowType seaTunnelRowType) {
5053
this.indexTypeSerializer = IndexTypeSerializerFactory.getIndexTypeSerializer(elasticsearchVersion, indexInfo.getType());
5154
this.indexSerializer = IndexSerializerFactory.getIndexSerializer(indexInfo.getIndex(), seaTunnelRowType);
5255
this.seaTunnelRowType = seaTunnelRowType;
56+
this.keyExtractor = KeyExtractor.createKeyExtractor(seaTunnelRowType, indexInfo.getPrimaryKeys(), indexInfo.getKeyDelimiter());
5357
}
5458

5559
@Override
5660
public String serializeRow(SeaTunnelRow row) {
61+
switch (row.getRowKind()) {
62+
case INSERT:
63+
case UPDATE_AFTER:
64+
return serializeUpsert(row);
65+
case UPDATE_BEFORE:
66+
case DELETE:
67+
return serializeDelete(row);
68+
default:
69+
throw new ElasticsearchConnectorException(
70+
CommonErrorCode.UNSUPPORTED_OPERATION, "Unsupported write row kind: " + row.getRowKind());
71+
}
72+
}
73+
74+
private String serializeUpsert(SeaTunnelRow row) {
75+
String key = keyExtractor.apply(row);
76+
Map<String, Object> document = toDocumentMap(row);
77+
78+
try {
79+
if (key != null) {
80+
Map<String, String> upsertMetadata = createMetadata(row, key);
81+
/**
82+
* format example:
83+
* { "update" : {"_index" : "${your_index}", "_id" : "${your_document_id}"} }\n
84+
* { "doc" : ${your_document_json}, "doc_as_upsert" : true }
85+
*/
86+
return new StringBuilder()
87+
.append("{ \"update\" :").append(objectMapper.writeValueAsString(upsertMetadata)).append("}")
88+
.append("\n")
89+
.append("{ \"doc\" :").append(objectMapper.writeValueAsString(document)).append(", \"doc_as_upsert\" : true }")
90+
.toString();
91+
} else {
92+
Map<String, String> indexMetadata = createMetadata(row);
93+
/**
94+
* format example:
95+
* { "index" : {"_index" : "${your_index}", "_id" : "${your_document_id}"} }\n
96+
* ${your_document_json}
97+
*/
98+
return new StringBuilder()
99+
.append("{ \"index\" :").append(objectMapper.writeValueAsString(indexMetadata)).append("}")
100+
.append("\n")
101+
.append(objectMapper.writeValueAsString(document))
102+
.toString();
103+
}
104+
} catch (JsonProcessingException e) {
105+
throw new ElasticsearchConnectorException(
106+
CommonErrorCode.JSON_OPERATION_FAILED, "Object json deserialization exception.", e);
107+
}
108+
}
109+
110+
private String serializeDelete(SeaTunnelRow row) {
111+
String key = keyExtractor.apply(row);
112+
Map<String, String> deleteMetadata = createMetadata(row, key);
113+
try {
114+
/**
115+
* format example:
116+
* { "delete" : {"_index" : "${your_index}", "_id" : "${your_document_id}"} }
117+
*/
118+
return new StringBuilder()
119+
.append("{ \"delete\" :").append(objectMapper.writeValueAsString(deleteMetadata)).append("}")
120+
.toString();
121+
} catch (JsonProcessingException e) {
122+
throw new ElasticsearchConnectorException(
123+
CommonErrorCode.JSON_OPERATION_FAILED, "Object json deserialization exception.", e);
124+
}
125+
}
126+
127+
private Map<String, Object> toDocumentMap(SeaTunnelRow row) {
57128
String[] fieldNames = seaTunnelRowType.getFieldNames();
58129
Map<String, Object> doc = new HashMap<>(fieldNames.length);
59130
Object[] fields = row.getFields();
@@ -66,25 +137,20 @@ public String serializeRow(SeaTunnelRow row) {
66137
doc.put(fieldNames[i], value);
67138
}
68139
}
140+
return doc;
141+
}
69142

70-
StringBuilder sb = new StringBuilder();
71-
72-
Map<String, String> indexInner = new HashMap<>();
73-
String index = indexSerializer.serialize(row);
74-
indexInner.put("_index", index);
75-
indexTypeSerializer.fillType(indexInner);
76-
77-
Map<String, Map<String, String>> indexParam = new HashMap<>();
78-
indexParam.put("index", indexInner);
79-
try {
80-
sb.append(objectMapper.writeValueAsString(indexParam));
81-
sb.append("\n");
82-
String indexDoc = objectMapper.writeValueAsString(doc);
83-
sb.append(indexDoc);
84-
} catch (JsonProcessingException e) {
85-
throw new ElasticsearchConnectorException(CommonErrorCode.JSON_OPERATION_FAILED, "Object json deserialization exception.", e);
86-
}
143+
private Map<String, String> createMetadata(@NonNull SeaTunnelRow row,
144+
@NonNull String key) {
145+
Map<String, String> actionMetadata = createMetadata(row);
146+
actionMetadata.put("_id", key);
147+
return actionMetadata;
148+
}
87149

88-
return sb.toString();
150+
private Map<String, String> createMetadata(@NonNull SeaTunnelRow row) {
151+
Map<String, String> actionMetadata = new HashMap<>(2);
152+
actionMetadata.put("_index", indexSerializer.serialize(row));
153+
indexTypeSerializer.fillType(actionMetadata);
154+
return actionMetadata;
89155
}
90156
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
19+
20+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23+
import org.apache.seatunnel.common.exception.CommonErrorCode;
24+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
25+
26+
import lombok.AllArgsConstructor;
27+
28+
import java.io.Serializable;
29+
import java.time.LocalDate;
30+
import java.time.LocalDateTime;
31+
import java.time.LocalTime;
32+
import java.util.ArrayList;
33+
import java.util.List;
34+
import java.util.function.Function;
35+
36+
@AllArgsConstructor
37+
public class KeyExtractor implements Function<SeaTunnelRow, String>, Serializable {
38+
private final FieldFormatter[] fieldFormatters;
39+
private final String keyDelimiter;
40+
41+
@Override
42+
public String apply(SeaTunnelRow row) {
43+
StringBuilder builder = new StringBuilder();
44+
for (int i = 0; i < fieldFormatters.length; i++) {
45+
if (i > 0) {
46+
builder.append(keyDelimiter);
47+
}
48+
String value = fieldFormatters[i].format(row);
49+
builder.append(value);
50+
}
51+
return builder.toString();
52+
}
53+
54+
public static Function<SeaTunnelRow, String> createKeyExtractor(SeaTunnelRowType rowType,
55+
String[] primaryKeys,
56+
String keyDelimiter) {
57+
if (primaryKeys == null) {
58+
return row -> null;
59+
}
60+
61+
List<FieldFormatter> fieldFormatters = new ArrayList<>(primaryKeys.length);
62+
for (String fieldName : primaryKeys) {
63+
int fieldIndex = rowType.indexOf(fieldName);
64+
SeaTunnelDataType<?> fieldType = rowType.getFieldType(fieldIndex);
65+
FieldFormatter fieldFormatter = createFieldFormatter(fieldIndex, fieldType);
66+
fieldFormatters.add(fieldFormatter);
67+
}
68+
return new KeyExtractor(fieldFormatters.toArray(new FieldFormatter[0]), keyDelimiter);
69+
}
70+
71+
private static FieldFormatter createFieldFormatter(int fieldIndex, SeaTunnelDataType fieldType) {
72+
return row -> {
73+
switch (fieldType.getSqlType()) {
74+
case ROW:
75+
case ARRAY:
76+
case MAP:
77+
throw new ElasticsearchConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, "Unsupported type: " + fieldType);
78+
case DATE:
79+
LocalDate localDate = (LocalDate) row.getField(fieldIndex);
80+
return localDate.toString();
81+
case TIME:
82+
LocalTime localTime = (LocalTime) row.getField(fieldIndex);
83+
return localTime.toString();
84+
case TIMESTAMP:
85+
LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex);
86+
return localDateTime.toString();
87+
default:
88+
return row.getField(fieldIndex).toString();
89+
}
90+
};
91+
}
92+
93+
private interface FieldFormatter extends Serializable {
94+
String format(SeaTunnelRow row);
95+
}
96+
}

0 commit comments

Comments
 (0)