Skip to content

Commit 4e98eb8

Browse files
authored
[Bug][Improve][Connector-v2][ElasticsearchSource] Fix behavior when source empty,Support SourceConfig.SOURCE field empty. (#6425)
1 parent ce426fb commit 4e98eb8

File tree

9 files changed

+313
-22
lines changed

9 files changed

+313
-22
lines changed

docs/en/connector-v2/source/Elasticsearch.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ support version >= 2.x and <= 8.x.
2929
| query | json | no | {"match_all": {}} |
3030
| scroll_time | string | no | 1m |
3131
| scroll_size | int | no | 100 |
32-
| schema | | no | - |
3332
| tls_verify_certificate | boolean | no | true |
3433
| tls_verify_hostnames | boolean | no | true |
34+
| array_column | map | no | |
3535
| tls_keystore_path | string | no | - |
3636
| tls_keystore_password | string | no | - |
3737
| tls_truststore_path | string | no | - |
@@ -58,7 +58,12 @@ Elasticsearch index name, support * fuzzy matching.
5858

5959
The fields of index.
6060
You can get the document id by specifying the field `_id`.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit.
61-
If you don't config source, you must config `schema`.
61+
If you don't config source, it is automatically retrieved from the mapping of the index.
62+
63+
### array_column [array]
64+
65+
The fields of array type.
66+
Since there is no array index in es,so need assign array type,just like `{c_array = "array<tinyint>"}`.
6267

6368
### query [json]
6469

@@ -73,11 +78,6 @@ Amount of time Elasticsearch will keep the search context alive for scroll reque
7378

7479
Maximum number of hits to be returned with each Elasticsearch scroll request.
7580

76-
### schema
77-
78-
The structure of the data, including field names and field types.
79-
If you don't config schema, you must config `source`.
80-
8181
### tls_verify_certificate [boolean]
8282

8383
Enable certificates validation for HTTPS endpoints

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ public class SourceConfig {
4040
.withDescription(
4141
"The fields of index. You can get the document id by specifying the field _id.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit");
4242

43+
public static final Option<Map<String, String>> ARRAY_COLUMN =
44+
Options.key("array_column")
45+
.mapType()
46+
.defaultValue(new HashMap<>())
47+
.withDescription(
48+
"Because there is no array type in es,so need specify array Type.");
49+
4350
public static final Option<String> SCROLL_TIME =
4451
Options.key("scroll_time")
4552
.stringType()

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
2121
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
2222
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
23+
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.NullNode;
2324
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
2425
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
2526

@@ -117,7 +118,9 @@ SeaTunnelRow convert(ElasticsearchRecord rowRecord) {
117118
value = recursiveGet(rowRecord.getDoc(), fieldName);
118119
if (value != null) {
119120
seaTunnelDataType = rowTypeInfo.getFieldType(i);
120-
if (value instanceof TextNode) {
121+
if (value instanceof NullNode) {
122+
seaTunnelFields[i] = null;
123+
} else if (value instanceof TextNode) {
121124
seaTunnelFields[i] =
122125
convertValue(seaTunnelDataType, ((TextNode) value).textValue());
123126
} else {

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
1919

20+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
21+
2022
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2123
import org.apache.seatunnel.api.source.Boundedness;
2224
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -27,6 +29,7 @@
2729
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2830
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
2931
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
32+
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
3033
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
3134
import org.apache.seatunnel.api.table.catalog.TableSchema;
3235
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
@@ -38,11 +41,17 @@
3841
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
3942
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
4043

44+
import org.apache.commons.collections4.CollectionUtils;
45+
46+
import lombok.extern.slf4j.Slf4j;
47+
48+
import java.util.ArrayList;
4149
import java.util.Arrays;
4250
import java.util.Collections;
4351
import java.util.List;
4452
import java.util.Map;
4553

54+
@Slf4j
4655
public class ElasticsearchSource
4756
implements SeaTunnelSource<
4857
SeaTunnelRow, ElasticsearchSourceSplit, ElasticsearchSourceState>,
@@ -55,27 +64,40 @@ public class ElasticsearchSource
5564

5665
private List<String> source;
5766

67+
private Map<String, String> arrayColumn;
68+
5869
public ElasticsearchSource(ReadonlyConfig config) {
5970
this.config = config;
6071
if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
6172
// todo: We need to remove the schema in ES.
73+
log.warn(
74+
"The schema config in ElasticSearch sink is deprecated, please use source config instead!");
6275
catalogTable = CatalogTableUtil.buildWithConfig(config);
6376
source = Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
6477
} else {
6578
source = config.get(SourceConfig.SOURCE);
79+
arrayColumn = config.get(SourceConfig.ARRAY_COLUMN);
6680
EsRestClient esRestClient = EsRestClient.createInstance(config);
6781
Map<String, BasicTypeDefine<EsType>> esFieldType =
6882
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
6983
esRestClient.close();
70-
SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[source.size()];
71-
for (int i = 0; i < source.size(); i++) {
72-
BasicTypeDefine<EsType> esType = esFieldType.get(source.get(i));
73-
SeaTunnelDataType<?> seaTunnelDataType =
74-
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
75-
fieldTypes[i] = seaTunnelDataType;
84+
85+
if (CollectionUtils.isEmpty(source)) {
86+
source = new ArrayList<>(esFieldType.keySet());
7687
}
88+
SeaTunnelDataType[] fieldTypes = getSeaTunnelDataType(esFieldType, source);
7789
TableSchema.Builder builder = TableSchema.builder();
90+
7891
for (int i = 0; i < source.size(); i++) {
92+
String key = source.get(i);
93+
if (arrayColumn.containsKey(key)) {
94+
String value = arrayColumn.get(key);
95+
SeaTunnelDataType<?> dataType =
96+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(key, value);
97+
builder.column(PhysicalColumn.of(key, dataType, 0, true, null, null));
98+
continue;
99+
}
100+
79101
builder.column(
80102
PhysicalColumn.of(source.get(i), fieldTypes[i], 0, true, null, null));
81103
}
@@ -127,4 +149,17 @@ public SourceReader<SeaTunnelRow, ElasticsearchSourceSplit> createReader(
127149
return new ElasticsearchSourceSplitEnumerator(
128150
enumeratorContext, sourceState, config, source);
129151
}
152+
153+
@VisibleForTesting
154+
public static SeaTunnelDataType[] getSeaTunnelDataType(
155+
Map<String, BasicTypeDefine<EsType>> esFieldType, List<String> source) {
156+
SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[source.size()];
157+
for (int i = 0; i < source.size(); i++) {
158+
BasicTypeDefine<EsType> esType = esFieldType.get(source.get(i));
159+
SeaTunnelDataType<?> seaTunnelDataType =
160+
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
161+
fieldTypes[i] = seaTunnelDataType;
162+
}
163+
return fieldTypes;
164+
}
130165
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch;
19+
20+
import org.apache.seatunnel.api.common.PrepareFailException;
21+
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
23+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
24+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSource;
25+
26+
import org.junit.jupiter.api.Assertions;
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.util.ArrayList;
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
33+
public class ElasticsearchSourceTest {
34+
@Test
35+
public void testPrepareWithEmptySource() throws PrepareFailException {
36+
BasicTypeDefine.BasicTypeDefineBuilder<EsType> typeDefine =
37+
BasicTypeDefine.<EsType>builder()
38+
.name("field1")
39+
.columnType("text")
40+
.dataType("text");
41+
Map<String, BasicTypeDefine<EsType>> esFieldType = new HashMap<>();
42+
esFieldType.put("field1", typeDefine.build());
43+
SeaTunnelDataType[] seaTunnelDataTypes =
44+
ElasticsearchSource.getSeaTunnelDataType(
45+
esFieldType, new ArrayList<>(esFieldType.keySet()));
46+
Assertions.assertNotNull(seaTunnelDataTypes);
47+
Assertions.assertEquals(seaTunnelDataTypes[0].getTypeClass(), String.class);
48+
}
49+
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323

2424
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2525
import org.apache.seatunnel.api.table.catalog.TablePath;
26+
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
2627
import org.apache.seatunnel.common.utils.JsonUtils;
2728
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
2829
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
30+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
2931
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
3032
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
3133
import org.apache.seatunnel.e2e.common.TestResource;
@@ -57,6 +59,7 @@
5759
import java.time.LocalDate;
5860
import java.time.LocalDateTime;
5961
import java.time.ZoneOffset;
62+
import java.time.format.DateTimeFormatter;
6063
import java.util.ArrayList;
6164
import java.util.Arrays;
6265
import java.util.Collections;
@@ -111,6 +114,7 @@ public void startUp() throws Exception {
111114
testDataset = generateTestDataSet();
112115
createIndexDocs();
113116
createIndexWithFullType();
117+
createIndexForResourceNull();
114118
}
115119

116120
/** create a index,and bulk some documents */
@@ -156,6 +160,16 @@ private void createIndexWithFullType() throws IOException, InterruptedException
156160
2, esRestClient.getIndexDocsCount("st_index_full_type").get(0).getDocsCount());
157161
}
158162

163+
private void createIndexForResourceNull() throws IOException {
164+
String mapping =
165+
IOUtils.toString(
166+
ContainerUtil.getResourcesFile(
167+
"/elasticsearch/st_index_source_without_schema_and_sink.json")
168+
.toURI(),
169+
StandardCharsets.UTF_8);
170+
esRestClient.createIndex("st_index4", mapping);
171+
}
172+
159173
@TestTemplate
160174
public void testElasticsearch(TestContainer container)
161175
throws IOException, InterruptedException {
@@ -179,6 +193,19 @@ public void testElasticsearchWithFullType(TestContainer container)
179193
esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount());
180194
}
181195

196+
@TestTemplate
197+
public void testElasticsearchWithoutSchema(TestContainer container)
198+
throws IOException, InterruptedException {
199+
200+
Container.ExecResult execResult =
201+
container.executeJob(
202+
"/elasticsearch/elasticsearch_source_without_schema_and_sink.conf");
203+
Assertions.assertEquals(0, execResult.getExitCode());
204+
List<String> sinkData = readSinkDataWithOutSchema();
205+
// for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
206+
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
207+
}
208+
182209
private List<String> generateTestDataSet() throws JsonProcessingException {
183210
String[] fields =
184211
new String[] {
@@ -188,12 +215,12 @@ private List<String> generateTestDataSet() throws JsonProcessingException {
188215
"c_boolean",
189216
"c_tinyint",
190217
"c_smallint",
191-
"c_int",
192218
"c_bigint",
193219
"c_float",
194220
"c_double",
195221
"c_decimal",
196222
"c_bytes",
223+
"c_int",
197224
"c_date",
198225
"c_timestamp"
199226
};
@@ -227,6 +254,14 @@ private List<String> generateTestDataSet() throws JsonProcessingException {
227254
return documents;
228255
}
229256

257+
private List<String> readSinkDataWithOutSchema() throws InterruptedException {
258+
Map<String, BasicTypeDefine<EsType>> esFieldType =
259+
esRestClient.getFieldTypeMapping("st_index4", Lists.newArrayList());
260+
Thread.sleep(2000);
261+
List<String> source = new ArrayList<>(esFieldType.keySet());
262+
return getDocsWithTransformDate(source, "st_index4");
263+
}
264+
230265
private List<String> readSinkData() throws InterruptedException {
231266
// wait for index refresh
232267
Thread.sleep(2000);
@@ -238,32 +273,33 @@ private List<String> readSinkData() throws InterruptedException {
238273
"c_boolean",
239274
"c_tinyint",
240275
"c_smallint",
241-
"c_int",
242276
"c_bigint",
243277
"c_float",
244278
"c_double",
245279
"c_decimal",
246280
"c_bytes",
281+
"c_int",
247282
"c_date",
248283
"c_timestamp");
284+
return getDocsWithTransformTimestamp(source, "st_index2");
285+
}
286+
287+
private List<String> getDocsWithTransformTimestamp(List<String> source, String index) {
249288
HashMap<String, Object> rangeParam = new HashMap<>();
250289
rangeParam.put("gte", 10);
251290
rangeParam.put("lte", 20);
252291
HashMap<String, Object> range = new HashMap<>();
253292
range.put("c_int", rangeParam);
254293
Map<String, Object> query = new HashMap<>();
255294
query.put("range", range);
256-
ScrollResult scrollResult =
257-
esRestClient.searchByScroll("st_index2", source, query, "1m", 1000);
295+
ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000);
258296
scrollResult
259297
.getDocs()
260298
.forEach(
261299
x -> {
262300
x.remove("_index");
263301
x.remove("_type");
264302
x.remove("_id");
265-
// I don’t know if converting the test cases in this way complies with
266-
// the CI specification
267303
x.replace(
268304
"c_timestamp",
269305
LocalDateTime.parse(x.get("c_timestamp").toString())
@@ -280,6 +316,40 @@ private List<String> readSinkData() throws InterruptedException {
280316
return docs;
281317
}
282318

319+
private List<String> getDocsWithTransformDate(List<String> source, String index) {
320+
HashMap<String, Object> rangeParam = new HashMap<>();
321+
rangeParam.put("gte", 10);
322+
rangeParam.put("lte", 20);
323+
HashMap<String, Object> range = new HashMap<>();
324+
range.put("c_int", rangeParam);
325+
Map<String, Object> query = new HashMap<>();
326+
query.put("range", range);
327+
ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000);
328+
scrollResult
329+
.getDocs()
330+
.forEach(
331+
x -> {
332+
x.remove("_index");
333+
x.remove("_type");
334+
x.remove("_id");
335+
x.replace(
336+
"c_date",
337+
LocalDate.parse(
338+
x.get("c_date").toString(),
339+
DateTimeFormatter.ofPattern(
340+
"yyyy-MM-dd'T'HH:mm"))
341+
.toString());
342+
});
343+
List<String> docs =
344+
scrollResult.getDocs().stream()
345+
.sorted(
346+
Comparator.comparingInt(
347+
o -> Integer.valueOf(o.get("c_int").toString())))
348+
.map(JsonUtils::toJsonString)
349+
.collect(Collectors.toList());
350+
return docs;
351+
}
352+
283353
private List<String> mapTestDatasetForDSL() {
284354
return testDataset.stream()
285355
.map(JsonUtils::parseObject)

0 commit comments

Comments
 (0)