Skip to content

Commit

Permalink
Enhanced ES-sink connector
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 20, 2021
1 parent 0f11747 commit 545edcb
Show file tree
Hide file tree
Showing 25 changed files with 2,672 additions and 229 deletions.
39 changes: 9 additions & 30 deletions pulsar-io/elastic-search/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@
</parent>
<artifactId>pulsar-io-elastic-search</artifactId>
<name>Pulsar IO :: ElasticSearch</name>

<repositories>
<repository>
<id>jcenter</id>
<url>https://jcenter.bintray.com/</url>
</repository>
</repositories>

<dependencies>

Expand All @@ -44,37 +37,23 @@
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>net.andreinc.mockneat</groupId>
<artifactId>mockneat</artifactId>
<version>0.2.2</version>
<scope>test</scope>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
Expand All @@ -46,9 +48,9 @@ public class ElasticSearchConfig implements Serializable {
private String elasticSearchUrl;

@FieldDoc(
required = true,
required = false,
defaultValue = "",
help = "The index name that the connector writes messages to"
help = "The index name that the connector writes messages to, the default is the topic name"
)
private String indexName;

Expand All @@ -73,7 +75,7 @@ public class ElasticSearchConfig implements Serializable {
defaultValue = "1",
help = "The number of replicas of the index"
)
private int indexNumberOfReplicas = 1;
private int indexNumberOfReplicas = 0;

@FieldDoc(
required = false,
Expand All @@ -91,6 +93,148 @@ public class ElasticSearchConfig implements Serializable {
)
private String password;

@FieldDoc(
required = false,
defaultValue = "-1",
help = "The maximum number of retries for elasticsearch requests. Use -1 to disable it."
)
private int maxRetries = -1;

@FieldDoc(
required = false,
defaultValue = "100",
help = "The base time in milliseconds to wait when retrying an elasticsearch request."
)
private long retryBackoffInMs = 100;

@FieldDoc(
required = false,
defaultValue = "86400",
help = "The maximum retry time interval in seconds for retrying an elasticsearch request."
)
private long maxRetryTimeInSec = 86400;

@FieldDoc(
required = false,
defaultValue = "false",
help = "Enable the elasticsearch bulk processor to flush write requests based on the number or size of requests, or after a given period."
)
private boolean bulkEnabled = false;

// bulk settings, see https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-bulk.html#java-rest-high-document-bulk-processor
@FieldDoc(
required = false,
defaultValue = "1000",
help = "The maximum number of actions per elasticsearch bulk request. Use -1 to disable it."
)
private int bulkActions = 1000;

@FieldDoc(
required = false,
defaultValue = "5",
help = "The maximum size in megabytes of elasticsearch bulk requests.Use -1 to disable it."
)
private long bulkSizeInMb = 5;

/**
* If more than bulkConcurrentRequests are pending, the next bulk request is blocking,
* meaning the connector.write() is blocking and keeps (bulkConcurrentRequests + 1) * bulkActions
* records into memory.
*/
@FieldDoc(
required = false,
defaultValue = "0",
help = "The maximum number of in flight elasticsearch bulk requests. The default 0 allows the execution of a single request. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests."
)
private int bulkConcurrentRequests = 0;

@FieldDoc(
required = false,
defaultValue = "-1",
help = "The bulk flush interval flushing any bulk request pending if the interval passes. Default is -1 meaning not set."
)
private long bulkFlushIntervalInMs = -1;

// connection settings, see https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low-config.html
@FieldDoc(
required = false,
defaultValue = "false",
help = "Enable elasticsearch request compression."
)
private boolean compressionEnabled = false;

@FieldDoc(
required = false,
defaultValue = "5000",
help = "The elasticsearch client connection timeout in milliseconds."
)
private int connectTimeoutInMs = 5000;

@FieldDoc(
required = false,
defaultValue = "1000",
help = "The time in milliseconds for getting a connection from the elasticsearch connection pool."
)
private int connectionRequestTimeoutInMs = 1000;

@FieldDoc(
required = false,
defaultValue = "5",
help = "Idle connection timeout to prevent a read timeout."
)
private int connectionIdleTimeoutInMs = 5;

@FieldDoc(
required = false,
defaultValue = "60000",
help = "The socket timeout in milliseconds waiting to read the elasticsearch response."
)
private int socketTimeoutInMs = 60000;



@FieldDoc(
required = false,
defaultValue = "false",
help = "Whether to ignore the record key to build the Elasticsearch document _id from the record value using the fields provided by the primaryFields parameter. If primaryFields is not defined, the connector use the unique pulsar messageId as the elasticsearch document id."
)
private boolean keyIgnore = false;

@FieldDoc(
required = false,
defaultValue = "id",
help = "The comma separated ordered list of field names used to build the Elasticsearch document _id from the record value."
)
private String primaryFields = "";

private ElasticSearchSslConfig ssl = new ElasticSearchSslConfig();

@FieldDoc(
required = false,
defaultValue = "DELETE",
help = "How to handle records with null values, possible options are IGNORE, DELETE or FAIL. Default is DELETE the Elasticsearch document."
)
private NullValueAction nullValueAction = NullValueAction.DELETE;

@FieldDoc(
required = false,
defaultValue = "FAIL",
help = "How to handle elasticsearch rejected documents due to some malformation. Possible options are IGNORE, DELETE or FAIL. Default is FAIL the Elasticsearch document."
)
private MalformedDocAction malformedDocAction = MalformedDocAction.FAIL;

public enum MalformedDocAction {
IGNORE,
WARN,
FAIL
}

public enum NullValueAction {
IGNORE,
DELETE,
FAIL
}

public static ElasticSearchConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), ElasticSearchConfig.class);
Expand All @@ -102,21 +246,56 @@ public static ElasticSearchConfig load(Map<String, Object> map) throws IOExcepti
}

public void validate() {
if (StringUtils.isEmpty(elasticSearchUrl) || StringUtils.isEmpty(indexName)) {
throw new IllegalArgumentException("Required property not set.");
if (StringUtils.isEmpty(elasticSearchUrl)) {
throw new IllegalArgumentException("elasticSearchUrl not set.");
}

if (StringUtils.isNotEmpty(indexName)) {
// see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-api-path-params
if (!indexName.toLowerCase(Locale.ROOT).equals(indexName)) {
throw new IllegalArgumentException("indexName should be lowercase only.");
}
if (indexName.startsWith("-") || indexName.startsWith("_") || indexName.startsWith("+")) {
throw new IllegalArgumentException("indexName start with an invalid character.");
}
if (indexName.equals(".") || indexName.equals("..")) {
throw new IllegalArgumentException("indexName cannot be . or .."); }
if (indexName.getBytes(StandardCharsets.UTF_8).length > 255) {
throw new IllegalArgumentException("indexName cannot be longer than 255 bytes.");
}
}

if ((StringUtils.isNotEmpty(username) && StringUtils.isEmpty(password))
|| (StringUtils.isEmpty(username) && StringUtils.isNotEmpty(password))) {
throw new IllegalArgumentException("Values for both Username & password are required.");
}

if (indexNumberOfShards < 1) {
throw new IllegalArgumentException("indexNumberOfShards must be a strictly positive integer");
if (indexNumberOfShards <= 0) {
throw new IllegalArgumentException("indexNumberOfShards must be a strictly positive integer.");
}

if (indexNumberOfReplicas < 0) {
throw new IllegalArgumentException("indexNumberOfReplicas must be a positive integer");
throw new IllegalArgumentException("indexNumberOfReplicas must be a positive integer.");
}

if (keyIgnore && StringUtils.isEmpty(primaryFields ) && NullValueAction.DELETE.equals(nullValueAction)) {
throw new IllegalArgumentException("If keyIgnore is true and primaryFields is empty, nullValueAction cannot be DELETE.");
}

if (connectTimeoutInMs < 0) {
throw new IllegalArgumentException("connectTimeoutInMs must be a positive integer.");
}

if (connectionRequestTimeoutInMs < 0) {
throw new IllegalArgumentException("connectionRequestTimeoutInMs must be a positive integer.");
}

if (socketTimeoutInMs < 0) {
throw new IllegalArgumentException("socketTimeoutInMs must be a positive integer.");
}

if (bulkConcurrentRequests < 0) {
throw new IllegalArgumentException("bulkConcurrentRequests must be a positive integer.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.elasticsearch;

public class ElasticSearchConnectionException extends RuntimeException {
public ElasticSearchConnectionException(String s) {
super(s);
}

public ElasticSearchConnectionException(String s, Throwable throwable) {
super(s, throwable);
}

public ElasticSearchConnectionException(Throwable throwable) {
super(throwable);
}
}
Loading

0 comments on commit 545edcb

Please sign in to comment.