Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ flexible messaging model and an intuitive client API.</description>
<mariadb-jdbc.version>2.6.0</mariadb-jdbc.version>
<hdfs-offload-version3>3.3.1</hdfs-offload-version3>
<json-smart.version>2.4.7</json-smart.version>
<elasticsearch.version>7.9.1</elasticsearch.version>
<elasticsearch.version>7.16.3</elasticsearch.version>
<presto.version>332</presto.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.6</scala-library.version>
Expand Down Expand Up @@ -1327,6 +1327,7 @@ flexible messaging model and an intuitive client API.</description>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
Expand Down Expand Up @@ -1844,6 +1845,7 @@ flexible messaging model and an intuitive client API.</description>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<!-- for some reason, setting maven.compiler.release property alone doesn't work -->
Expand Down
14 changes: 13 additions & 1 deletion pulsar-io/elastic-search/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
<artifactId>pulsar-io-elastic-search</artifactId>
<name>Pulsar IO :: ElasticSearch</name>

<properties>
<!--
Work-around for "Container exited with code 137" (OOM)
-->
<testReuseFork>false</testReuseFork>
<testForkCount>1</testForkCount>
</properties>

<dependencies>

<dependency>
Expand Down Expand Up @@ -83,7 +91,11 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;

@Slf4j
public class ElasticSearchClient implements AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.TimeValue;

public class RandomExponentialBackoffPolicy extends BackoffPolicy {
private final RandomExponentialRetry randomExponentialRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class ElasticSearchClientSslTests {

public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.10.2-amd64");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");

final static String INDEX = "myindex";

Expand All @@ -44,9 +44,8 @@ public class ElasticSearchClientSslTests {
@Test
public void testSslBasic() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
.withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
.withPassword("elastic")
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
Expand Down Expand Up @@ -80,9 +79,8 @@ public void testSslBasic() throws IOException {
@Test
public void testSslWithHostnameVerification() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
.withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
.withPassword("elastic")
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
Expand Down Expand Up @@ -119,9 +117,8 @@ public void testSslWithHostnameVerification() throws IOException {
@Test
public void testSslWithClientAuth() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
.withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
.withPassword("elastic")
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
*/
package org.apache.pulsar.io.elasticsearch;

import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.elasticsearch.testcontainers.ChaosContainer;
import org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer;
import org.awaitility.Awaitility;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.junit.AfterClass;
import org.mockito.Mockito;
import org.testcontainers.containers.Network;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -45,21 +47,22 @@

@Slf4j
public class ElasticSearchClientTests {

public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");

static ElasticsearchContainer container;
static Network network = Network.newNetwork();

@BeforeClass
public static final void initBeforeClass() throws IOException {
container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE);
container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE).withNetwork(network);
container.start();
}

@AfterClass
public static void closeAfterClass() {
container.close();
network.close();
}

static class MockRecord<T> implements Record<T> {
Expand Down Expand Up @@ -222,101 +225,111 @@ public void testMalformedDocIgnore() throws Exception {

@Test
public void testBulkRetry() throws Exception {
final String index = "indexbulktest-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://"+container.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setMaxRetries(1000)
.setBulkActions(2)
.setRetryBackoffInMs(100)
// disabled, we want to have full control over flush() method
.setBulkFlushIntervalInMs(-1);

try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try {
assertTrue(client.createIndexIfNeeded(index));
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);
try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) {
toxiproxy.start();

final String index = "indexbulktest-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setMaxRetries(1000)
.setBulkActions(2)
.setRetryBackoffInMs(100)
// disabled, we want to have full control over flush() method
.setBulkFlushIntervalInMs(-1);

try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try {
assertTrue(client.createIndexIfNeeded(index));
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);

ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 15);
chaosContainer.start();
log.info("starting the toxic");
toxiproxy.getProxy().setConnectionCut(false);
toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 15000);
toxiproxy.removeToxicAfterDelay("elasticpause", 15000);

client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);
client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);

chaosContainer.stop();
client.flush();
assertEquals(mockRecord.acked, 3);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 3);
} finally {
client.delete(index);
client.flush();
assertEquals(mockRecord.acked, 3);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 3);
} finally {
client.delete(index);
}
}
}
}

@Test
public void testBulkBlocking() throws Exception {
final String index = "indexblocking-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://"+container.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setMaxRetries(1000)
.setBulkActions(2)
.setBulkConcurrentRequests(2)
.setRetryBackoffInMs(100)
.setBulkFlushIntervalInMs(10000);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
assertTrue(client.createIndexIfNeeded(index));

try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
for (int i = 1; i <= 5; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
}
try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) {
toxiproxy.start();

final String index = "indexblocking-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setMaxRetries(1000)
.setBulkActions(2)
.setBulkConcurrentRequests(2)
.setRetryBackoffInMs(100)
.setBulkFlushIntervalInMs(10000);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
assertTrue(client.createIndexIfNeeded(index));

Awaitility.await().untilAsserted(() -> {
assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
assertEquals(mockRecord.failed, 0);
assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
});
client.flush();
Awaitility.await().untilAsserted(() -> {
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
for (int i = 1; i <= 5; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
}

Awaitility.await().untilAsserted(() -> {
assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
assertEquals(mockRecord.failed, 0);
assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
});
client.flush();
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.acked, 5);
assertEquals(client.totalHits(index), 5);
});

log.info("starting the toxic");
toxiproxy.getProxy().setConnectionCut(false);
toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 35000);
toxiproxy.removeToxicAfterDelay("elasticpause", 30000);

long start = System.currentTimeMillis();

// 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked.
for (int i = 6; i <= 15; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
log.info("{} index {}", System.currentTimeMillis(), i);
}
long elapsed = System.currentTimeMillis() - start;
log.info("elapsed = {}", elapsed);
assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy

Thread.sleep(1000L);
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.acked, 5);
assertEquals(client.totalHits(index), 5);
});

ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 30);
chaosContainer.start();
Thread.sleep(1000L);

// 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked.
long start = System.currentTimeMillis();
for (int i = 6; i <= 15; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
log.info("{} index {}", System.currentTimeMillis(), i);
}
long elapsed = System.currentTimeMillis() - start;
log.info("elapsed = {}", elapsed);
assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy
assertEquals(client.records.size(), 0);

Thread.sleep(1000L);
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(client.records.size(), 0);

chaosContainer.stop();
} finally {
client.delete(index);
} finally {
client.delete(index);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public class ElasticSearchSinkRawDataTests {

public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");

private static ElasticsearchContainer container;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
public class ElasticSearchSinkTests {

public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");

private static ElasticsearchContainer container;

Expand Down
Loading