Skip to content

Commit 2a1fd50

Browse files
authored
[Feature][Connector-V2] new connecotor of Elasticsearch sink(#2326) (#2330)
1 parent 42e8c25 commit 2a1fd50

30 files changed

+1298
-0
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Elasticsearch
2+
3+
## Description
4+
5+
Output data to `Elasticsearch`.
6+
7+
:::tip
8+
9+
Engine Supported
10+
11+
* supported `ElasticSearch version is >= 2.x and < 8.x`
12+
13+
:::
14+
15+
## Options
16+
17+
| name | type | required | default value |
18+
|-------------------| ------ | -------- |---------------|
19+
| hosts | array | yes | - |
20+
| index | string | yes | - |
21+
| index_type | string | no | |
22+
| username | string | no | |
23+
| password | string | no | |
24+
| max_retry_size | int | no | 3 |
25+
| max_batch_size | int | no | 10 |
26+
27+
28+
29+
### hosts [array]
30+
`Elasticsearch` cluster http address, the format is `host:port` , allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.
31+
32+
### index [string]
33+
`Elasticsearch` `index` name.Index support contains variables of field name,such as `seatunnel_${age}`,and the field must appear at seatunnel row.
34+
35+
### index_type [string]
36+
`Elasticsearch` index type, it is recommended not to specify in elasticsearch 6 and above
37+
38+
### username [string]
39+
x-pack username
40+
41+
### password [string]
42+
x-pack password
43+
44+
### max_retry_size [int]
45+
one bulk request max try size
46+
47+
### max_batch_size [int]
48+
batch bulk doc max size
49+
50+
## Examples
51+
```bash
52+
Elasticsearch {
53+
hosts = ["localhost:9200"]
54+
index = "seatunnel-${age}"
55+
}
56+
```

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,3 +111,4 @@ seatunnel.sink.LocalFile = connector-file-local
111111
seatunnel.source.Pulsar = connector-pulsar
112112
seatunnel.source.Hudi = connector-hudi
113113
seatunnel.sink.DingTalk = connector-dingtalk
114+
seatunnel.sink.elasticsearch = connector-elasticsearch

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@
223223
<hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
224224
<jsoup.version>1.14.3</jsoup.version>
225225
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
226+
<elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
226227
</properties>
227228

228229
<dependencyManagement>

seatunnel-connectors-v2-dist/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@
111111
<artifactId>connector-email</artifactId>
112112
<version>${project.version}</version>
113113
</dependency>
114+
<dependency>
115+
<groupId>org.apache.seatunnel</groupId>
116+
<artifactId>connector-elasticsearch</artifactId>
117+
<version>${project.version}</version>
118+
</dependency>
114119
</dependencies>
115120

116121
<build>
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<parent>
24+
<groupId>org.apache.seatunnel</groupId>
25+
<artifactId>seatunnel-connectors-v2</artifactId>
26+
<version>${revision}</version>
27+
</parent>
28+
<modelVersion>4.0.0</modelVersion>
29+
30+
<artifactId>connector-elasticsearch</artifactId>
31+
32+
<dependencies>
33+
<dependency>
34+
<groupId>org.apache.seatunnel</groupId>
35+
<artifactId>seatunnel-api</artifactId>
36+
<version>${project.version}</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.elasticsearch.client</groupId>
40+
<artifactId>elasticsearch-rest-client</artifactId>
41+
<version>${elasticsearch-rest-client.version}</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>com.fasterxml.jackson.core</groupId>
45+
<artifactId>jackson-databind</artifactId>
46+
</dependency>
47+
</dependencies>
48+
49+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
19+
20+
import com.fasterxml.jackson.databind.JsonNode;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import org.apache.commons.lang3.StringUtils;
23+
import org.apache.http.HttpHost;
24+
import org.apache.http.HttpStatus;
25+
import org.apache.http.auth.AuthScope;
26+
import org.apache.http.auth.UsernamePasswordCredentials;
27+
import org.apache.http.client.CredentialsProvider;
28+
import org.apache.http.impl.client.BasicCredentialsProvider;
29+
import org.apache.http.util.EntityUtils;
30+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
31+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
32+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
33+
import org.elasticsearch.client.Request;
34+
import org.elasticsearch.client.Response;
35+
import org.elasticsearch.client.RestClient;
36+
import org.elasticsearch.client.RestClientBuilder;
37+
38+
import java.io.IOException;
39+
import java.util.List;
40+
41+
public class EsRestClient {
42+
43+
private static EsRestClient esRestClient;
44+
private static RestClient restClient;
45+
46+
private EsRestClient() {
47+
48+
}
49+
50+
private static RestClientBuilder getRestClientBuilder(List<String> hosts, String username, String password) {
51+
HttpHost[] httpHosts = new HttpHost[hosts.size()];
52+
for (int i = 0; i < hosts.size(); i++) {
53+
String[] hostInfo = hosts.get(i).replace("http://", "").split(":");
54+
httpHosts[i] = new HttpHost(hostInfo[0], Integer.parseInt(hostInfo[1]));
55+
}
56+
57+
RestClientBuilder builder = RestClient.builder(httpHosts)
58+
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
59+
.setConnectionRequestTimeout(10 * 1000)
60+
.setSocketTimeout(5 * 60 * 1000));
61+
62+
if (StringUtils.isNotEmpty(username)) {
63+
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
64+
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
65+
builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
66+
}
67+
return builder;
68+
}
69+
70+
public static EsRestClient getInstance(List<String> hosts, String username, String password) {
71+
if (restClient == null) {
72+
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
73+
restClient = restClientBuilder.build();
74+
esRestClient = new EsRestClient();
75+
}
76+
return esRestClient;
77+
}
78+
79+
public BulkResponse bulk(String requestBody) {
80+
Request request = new Request("POST", "_bulk");
81+
request.setJsonEntity(requestBody);
82+
try {
83+
Response response = restClient.performRequest(request);
84+
if (response == null) {
85+
throw new BulkElasticsearchException("bulk es Response is null");
86+
}
87+
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
88+
ObjectMapper objectMapper = new ObjectMapper();
89+
String entity = EntityUtils.toString(response.getEntity());
90+
JsonNode json = objectMapper.readTree(entity);
91+
int took = json.get("took").asInt();
92+
boolean errors = json.get("errors").asBoolean();
93+
return new BulkResponse(errors, took, entity);
94+
} else {
95+
throw new BulkElasticsearchException(String.format("bulk es response status code=%d,request boy=%s", response.getStatusLine().getStatusCode(), requestBody));
96+
}
97+
} catch (IOException e) {
98+
throw new BulkElasticsearchException(String.format("bulk es error,request boy=%s", requestBody), e);
99+
}
100+
}
101+
102+
/**
103+
* @return version.number, example:2.0.0
104+
*/
105+
public static String getClusterVersion() {
106+
Request request = new Request("GET", "/");
107+
try {
108+
Response response = restClient.performRequest(request);
109+
String result = EntityUtils.toString(response.getEntity());
110+
ObjectMapper objectMapper = new ObjectMapper();
111+
JsonNode jsonNode = objectMapper.readTree(result);
112+
JsonNode versionNode = jsonNode.get("version");
113+
return versionNode.get("number").asText();
114+
} catch (IOException e) {
115+
throw new GetElasticsearchVersionException("fail to get elasticsearch version.", e);
116+
}
117+
}
118+
119+
public void close() throws IOException {
120+
restClient.close();
121+
}
122+
123+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
21+
22+
public class SinkConfig {
23+
24+
public static final String INDEX = "index";
25+
26+
public static final String INDEX_TYPE = "index_type";
27+
28+
public static final String USERNAME = "username";
29+
30+
public static final String PASSWORD = "password";
31+
32+
public static final String HOSTS = "hosts";
33+
34+
public static final String MAX_BATCH_SIZE = "max_batch_size";
35+
36+
public static final String MAX_RETRY_SIZE = "max_retry_size";
37+
38+
public static void setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig){
39+
if(pluginConfig.hasPath(MAX_BATCH_SIZE)){
40+
BulkConfig.MAX_BATCH_SIZE = pluginConfig.getInt(MAX_BATCH_SIZE);
41+
}
42+
if(pluginConfig.hasPath(MAX_RETRY_SIZE)){
43+
BulkConfig.MAX_RETRY_SIZE = pluginConfig.getInt(MAX_RETRY_SIZE);
44+
}
45+
}
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
21+
22+
/**
23+
* bulk es config
24+
*/
25+
public class BulkConfig {
26+
/**
27+
* once bulk es include max document size
28+
* {@link SinkConfig#MAX_BATCH_SIZE}
29+
*/
30+
public static int MAX_BATCH_SIZE = 10;
31+
32+
/**
33+
* the max retry size of bulk es
34+
* {@link SinkConfig#MAX_RETRY_SIZE}
35+
*/
36+
public static int MAX_RETRY_SIZE = 3;
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
19+
20+
public enum ElasticsearchVersion {
21+
ES2(2), ES5(5), ES6(6), ES7(7), ES8(8);
22+
23+
private int version;
24+
25+
ElasticsearchVersion(int version) {
26+
this.version = version;
27+
}
28+
29+
public int getVersion() {
30+
return version;
31+
}
32+
33+
public void setVersion(int version) {
34+
this.version = version;
35+
}
36+
37+
public static ElasticsearchVersion get(int version) {
38+
for (ElasticsearchVersion elasticsearchVersion : ElasticsearchVersion.values()) {
39+
if (elasticsearchVersion.getVersion() == version) {
40+
return elasticsearchVersion;
41+
}
42+
}
43+
throw new IllegalArgumentException(String.format("version=%d,fail fo find ElasticsearchVersion.", version));
44+
}
45+
46+
public static ElasticsearchVersion get(String clusterVersion) {
47+
String[] versionArr = clusterVersion.split("\\.");
48+
int version = Integer.parseInt(versionArr[0]);
49+
return get(version);
50+
}
51+
}

0 commit comments

Comments
 (0)