Skip to content

Commit bb9fd51

Browse files
authored
[Fix][Connector-V2][Elasticsearch]Fix sink configuration for DROP_DATA (#7124)
1 parent dc271dc commit bb9fd51

File tree

4 files changed

+88
-16
lines changed

4 files changed

+88
-16
lines changed

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,7 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
217217

218218
@Override
219219
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
220-
dropTable(tablePath, ignoreIfNotExists);
221-
createTable(tablePath, null, ignoreIfNotExists);
220+
esRestClient.clearIndexData(tablePath.getTableName());
222221
}
223222

224223
@Override

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,35 @@ public void dropIndex(String tableName) {
480480
}
481481
}
482482

483+
public void clearIndexData(String indexName) {
484+
String endpoint = String.format("/%s/_delete_by_query", indexName);
485+
Request request = new Request("POST", endpoint);
486+
String jsonString = "{ \"query\": { \"match_all\": {} } }";
487+
request.setJsonEntity(jsonString);
488+
489+
try {
490+
Response response = restClient.performRequest(request);
491+
if (response == null) {
492+
throw new ElasticsearchConnectorException(
493+
ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED,
494+
"POST " + endpoint + " response null");
495+
}
496+
// todo: if the index doesn't exist, the response status code is 200?
497+
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
498+
return;
499+
} else {
500+
throw new ElasticsearchConnectorException(
501+
ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED,
502+
String.format(
503+
"POST %s response status code=%d",
504+
endpoint, response.getStatusLine().getStatusCode()));
505+
}
506+
} catch (IOException ex) {
507+
throw new ElasticsearchConnectorException(
508+
ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, ex);
509+
}
510+
}
511+
483512
/**
484513
* get es field name and type mapping realtion
485514
*

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ public enum ElasticsearchConnectorErrorCode implements SeaTunnelErrorCode {
2828
LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"),
2929
DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"),
3030
CREATE_INDEX_FAILED("ELASTICSEARCH-07", "Create elasticsearch index failed"),
31-
ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the elasticsearch field type");
31+
ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the elasticsearch field type"),
32+
CLEAR_INDEX_DATA_FAILED("ELASTICSEARCH-09", "Clear elasticsearch index data failed");
3233
;
3334

3435
private final String code;

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -447,35 +447,78 @@ public void tearDown() {
447447
}
448448

449449
@Test
450-
public void testCatalog() {
450+
public void testCatalog() throws InterruptedException, JsonProcessingException {
451451
Map<String, Object> configMap = new HashMap<>();
452452
configMap.put("username", "elastic");
453453
configMap.put("password", "elasticsearch");
454-
configMap.put("hosts", Arrays.asList("https://" + container.getHttpHostAddress()));
454+
configMap.put(
455+
"hosts", Collections.singletonList("https://" + container.getHttpHostAddress()));
455456
configMap.put("index", "st_index3");
456457
configMap.put("tls_verify_certificate", false);
457458
configMap.put("tls_verify_hostname", false);
458459
configMap.put("index_type", "st");
460+
459461
final ElasticSearchCatalog elasticSearchCatalog =
460462
new ElasticSearchCatalog("Elasticsearch", "", ReadonlyConfig.fromMap(configMap));
461463
elasticSearchCatalog.open();
464+
462465
TablePath tablePath = TablePath.of("", "st_index3");
463-
// index exists
466+
467+
// Verify index does not exist initially
464468
final boolean existsBefore = elasticSearchCatalog.tableExists(tablePath);
465-
Assertions.assertFalse(existsBefore);
466-
// create index
469+
Assertions.assertFalse(existsBefore, "Index should not exist initially");
470+
471+
// Create index
467472
elasticSearchCatalog.createTable(tablePath, null, false);
468473
final boolean existsAfter = elasticSearchCatalog.tableExists(tablePath);
469-
Assertions.assertTrue(existsAfter);
470-
// data exists?
471-
final boolean existsData = elasticSearchCatalog.isExistsData(tablePath);
472-
Assertions.assertFalse(existsData);
473-
// truncate
474+
Assertions.assertTrue(existsAfter, "Index should be created");
475+
476+
// Generate and add multiple records
477+
List<String> data = generateTestData();
478+
StringBuilder requestBody = new StringBuilder();
479+
String indexHeader = "{\"index\":{\"_index\":\"st_index3\"}}\n";
480+
for (String record : data) {
481+
requestBody.append(indexHeader);
482+
requestBody.append(record);
483+
requestBody.append("\n");
484+
}
485+
esRestClient.bulk(requestBody.toString());
486+
Thread.sleep(2000); // Wait for data to be indexed
487+
488+
// Verify data exists
489+
List<String> sourceFields = Arrays.asList("field1", "field2");
490+
Map<String, Object> query = new HashMap<>();
491+
query.put("match_all", new HashMap<>());
492+
ScrollResult scrollResult =
493+
esRestClient.searchByScroll("st_index3", sourceFields, query, "1m", 100);
494+
Assertions.assertFalse(scrollResult.getDocs().isEmpty(), "Data should exist in the index");
495+
496+
// Truncate the table
474497
elasticSearchCatalog.truncateTable(tablePath, false);
475-
Assertions.assertTrue(elasticSearchCatalog.tableExists(tablePath));
476-
// drop
498+
Thread.sleep(2000); // Wait for data to be indexed
499+
500+
// Verify data is deleted
501+
scrollResult = esRestClient.searchByScroll("st_index3", sourceFields, query, "1m", 100);
502+
Assertions.assertTrue(
503+
scrollResult.getDocs().isEmpty(), "Data should be deleted from the index");
504+
505+
// Drop the table
477506
elasticSearchCatalog.dropTable(tablePath, false);
478-
Assertions.assertFalse(elasticSearchCatalog.tableExists(tablePath));
507+
Assertions.assertFalse(
508+
elasticSearchCatalog.tableExists(tablePath), "Index should be dropped");
509+
479510
elasticSearchCatalog.close();
480511
}
512+
513+
private List<String> generateTestData() throws JsonProcessingException {
514+
List<String> data = new ArrayList<>();
515+
ObjectMapper objectMapper = new ObjectMapper();
516+
for (int i = 0; i < 10; i++) {
517+
Map<String, Object> record = new HashMap<>();
518+
record.put("field1", "value" + i);
519+
record.put("field2", i);
520+
data.add(objectMapper.writeValueAsString(record));
521+
}
522+
return data;
523+
}
481524
}

0 commit comments

Comments
 (0)