Skip to content

Commit

Permalink
Merge pull request #169 from rayokota/4.0.x
Browse files Browse the repository at this point in the history
CC-1385:  Enhance connector to use text type with ES 5+
  • Loading branch information
rayokota committed Feb 8, 2018
2 parents 49ac983 + bc24949 commit 0ff9c21
Show file tree
Hide file tree
Showing 16 changed files with 945 additions and 268 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<!-- TODO: Undecided if this is too much -->
<suppress
checks="(ClassDataAbstractionCoupling)"
files="(BulkProcessor).java"
files="(BulkProcessor|JestElasticsearchClient).java"
/>

<!-- TODO: Pass some parameters in common config object? -->
Expand Down
18 changes: 9 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
<properties>
<es.version>2.4.1</es.version>
<lucene.version>5.5.2</lucene.version>
<jna.version>4.2.1</jna.version>
<hamcrest.version>2.0.0.0</hamcrest.version>
<jest.version>2.0.0</jest.version>
<hamcrest.version>1.3</hamcrest.version>
<mockito.version>2.13.0</mockito.version>
<jest.version>2.4.0</jest.version>
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
</properties>

Expand Down Expand Up @@ -72,15 +72,15 @@
<version>${jest.version}</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>${jna.version}</version>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-junit</artifactId>
<version>${hamcrest.version}</version>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,29 @@

package io.confluent.connect.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import io.confluent.connect.elasticsearch.bulk.BulkClient;
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import io.searchbox.client.JestClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;

public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {

private static final Logger LOG = LoggerFactory.getLogger(BulkIndexingClient.class);
import java.io.IOException;
import java.util.List;

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public class BulkIndexingClient implements BulkClient<IndexableRecord, BulkRequest> {

private final JestClient client;
private final ElasticsearchClient client;

public BulkIndexingClient(JestClient client) {
public BulkIndexingClient(ElasticsearchClient client) {
this.client = client;
}

@Override
public Bulk bulkRequest(List<IndexableRecord> batch) {
final Bulk.Builder builder = new Bulk.Builder();
for (IndexableRecord record : batch) {
builder.addAction(record.toBulkableAction());
}
return builder.build();
public BulkRequest bulkRequest(List<IndexableRecord> batch) {
return client.createBulkRequest(batch);
}

@Override
public BulkResponse execute(Bulk bulk) throws IOException {
final BulkResult result = client.execute(bulk);

if (result.isSucceeded()) {
return BulkResponse.success();
}

boolean retriable = true;

final List<Key> versionConflicts = new ArrayList<>();
final List<String> errors = new ArrayList<>();

for (BulkResult.BulkResultItem item : result.getItems()) {
if (item.error != null) {
final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(item.error);
final String errorType = parsedError.get("type").asText("");
if ("version_conflict_engine_exception".equals(errorType)) {
versionConflicts.add(new Key(item.index, item.type, item.id));
} else if ("mapper_parse_exception".equals(errorType)) {
retriable = false;
errors.add(item.error);
} else {
errors.add(item.error);
}
}
}

if (!versionConflicts.isEmpty()) {
LOG.debug("Ignoring version conflicts for items: {}", versionConflicts);
if (errors.isEmpty()) {
// The only errors were version conflicts
return BulkResponse.success();
}
}

final String errorInfo = errors.isEmpty() ? result.getErrorMessage() : errors.toString();

return BulkResponse.failure(retriable, errorInfo);
public BulkResponse execute(BulkRequest bulk) throws IOException {
return client.executeBulk(bulk);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
**/

package io.confluent.connect.elasticsearch;

import com.google.gson.JsonObject;
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import org.apache.kafka.connect.data.Schema;

import java.io.IOException;
import java.util.List;
import java.util.Set;

public interface ElasticsearchClient extends AutoCloseable {

enum Version {
ES_V1, ES_V2, ES_V5, ES_V6
}

/**
* Gets the Elasticsearch version.
*
* @return the version, not null
*/
Version getVersion();

/**
* Creates indices.
*
* @param indices the set of index names to create, not null
*/
void createIndices(Set<String> indices);

/**
* Creates an explicit mapping.
*
* @param index the index to write
* @param type the type for which to create the mapping
* @param schema the schema used to infer the mapping
* @throws IOException if the client cannot execute the request
*/
void createMapping(String index, String type, Schema schema) throws IOException;

/**
* Gets the JSON mapping for the given index and type. Returns {@code null} if it does not exist.
*
* @param index the index
* @param type the type
* @throws IOException if the client cannot execute the request
*/
JsonObject getMapping(String index, String type) throws IOException;

/**
* Creates a bulk request for the list of {@link IndexableRecord} records.
*
* @param batch the list of records
* @return the bulk request
*/
BulkRequest createBulkRequest(List<IndexableRecord> batch);

/**
* Executes a bulk action.
*
* @param bulk the bulk request
* @return the bulk response
* @throws IOException if the client cannot execute the request
*/
BulkResponse executeBulk(BulkRequest bulk) throws IOException;

/**
* Executes a search.
*
* @param query the search query
* @param index the index to search
* @param type the type to search
* @return the search result
* @throws IOException if the client cannot execute the request
*/
JsonObject search(String query, String index, String type) throws IOException;

/**
* Shuts down the client.
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@

package io.confluent.connect.elasticsearch;

import org.apache.kafka.connect.data.Schema.Type;

import java.util.HashMap;
import java.util.Map;

public class ElasticsearchSinkConnectorConstants {
public static final String MAP_KEY = "key";
public static final String MAP_VALUE = "value";
Expand All @@ -34,19 +29,7 @@ public class ElasticsearchSinkConnectorConstants {
public static final String FLOAT_TYPE = "float";
public static final String DOUBLE_TYPE = "double";
public static final String STRING_TYPE = "string";
public static final String TEXT_TYPE = "text";
public static final String KEYWORD_TYPE = "keyword";
public static final String DATE_TYPE = "date";

static final Map<Type, String> TYPES = new HashMap<>();

static {
TYPES.put(Type.BOOLEAN, BOOLEAN_TYPE);
TYPES.put(Type.INT8, BYTE_TYPE);
TYPES.put(Type.INT16, SHORT_TYPE);
TYPES.put(Type.INT32, INTEGER_TYPE);
TYPES.put(Type.INT64, LONG_TYPE);
TYPES.put(Type.FLOAT32, FLOAT_TYPE);
TYPES.put(Type.FLOAT64, DOUBLE_TYPE);
TYPES.put(Type.STRING, STRING_TYPE);
TYPES.put(Type.BYTES, BINARY_TYPE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -33,17 +34,11 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;

public class ElasticsearchSinkTask extends SinkTask {

private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
private ElasticsearchWriter writer;
private JestClient client;
private ElasticsearchClient client;

@Override
public String version() {
Expand All @@ -56,7 +51,7 @@ public void start(Map<String, String> props) {
}

// public for testing
public void start(Map<String, String> props, JestClient client) {
public void start(Map<String, String> props, ElasticsearchClient client) {
try {
log.info("Starting ElasticsearchSinkTask.");

Expand Down Expand Up @@ -95,40 +90,27 @@ public void start(Map<String, String> props, JestClient client) {
boolean dropInvalidMessage =
config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);

BehaviorOnNullValues behaviorOnNullValues =
BehaviorOnNullValues.forValue(
DataConverter.BehaviorOnNullValues behaviorOnNullValues =
DataConverter.BehaviorOnNullValues.forValue(
config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
);

// Calculate the maximum possible backoff time ...
long maxRetryBackoffMs = RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
long maxRetryBackoffMs =
RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
log.warn("This connector uses exponential backoff with jitter for retries, "
+ "and using '{}={}' and '{}={}' results in an impractical but possible maximum "
+ "backoff time greater than {} hours.",
ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG, maxRetry,
ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
TimeUnit.MILLISECONDS.toHours(maxRetryBackoffMs));
ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG, maxRetry,
ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
TimeUnit.MILLISECONDS.toHours(maxRetryBackoffMs));
}

int connTimeout = config.getInt(
ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG);
int readTimeout = config.getInt(
ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG);

if (client != null) {
this.client = client;
} else {
List<String> address =
config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig.Builder(address)
.connTimeout(connTimeout)
.readTimeout(readTimeout)
.multiThreaded(true)
.build()
);
this.client = factory.getObject();
this.client = new JestElasticsearchClient(props);
}

ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
Expand Down Expand Up @@ -191,13 +173,13 @@ public void stop() throws ConnectException {
writer.stop();
}
if (client != null) {
client.shutdownClient();
client.close();
}
}

private Map<String, String> parseMapConfig(List<String> values) {
Map<String, String> map = new HashMap<>();
for (String value: values) {
for (String value : values) {
String[] parts = value.split(":");
String topic = parts[0];
String type = parts[1];
Expand Down
Loading

0 comments on commit 0ff9c21

Please sign in to comment.