Skip to content

Commit

Permalink
[BEAM-5107] Add support for ES-6.x to ElasticsearchIO
Browse files Browse the repository at this point in the history
  • Loading branch information
Dat Tran committed Sep 6, 2018
1 parent 2e13877 commit 5da843e
Show file tree
Hide file tree
Showing 16 changed files with 565 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,10 @@
*/
package org.apache.beam.sdk.io.elasticsearch;

import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.junit.Assert.assertEquals;

import java.util.List;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.client.RestClient;
import org.junit.AfterClass;
Expand Down Expand Up @@ -93,26 +86,7 @@ public static void afterClass() throws Exception {

@Test
public void testSplitsVolume() throws Exception {
Read read = ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
BoundedElasticsearchSource initialSource =
new BoundedElasticsearchSource(read, null, null, null);
// desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get
// as many bundles as ES shards and bundle size is shard size
long desiredBundleSizeBytes = 0;
List<? extends BoundedSource<String>> splits =
initialSource.split(desiredBundleSizeBytes, options);
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
// this is the number of ES shards
// (By default, each index in Elasticsearch is allocated 5 primary shards)
long expectedNumSplits = 5;
assertEquals(expectedNumSplits, splits.size());
int nonEmptySplits = 0;
for (BoundedSource<String> subSource : splits) {
if (readFromSource(subSource, options).size() > 0) {
nonEmptySplits += 1;
}
}
assertEquals(expectedNumSplits, nonEmptySplits);
elasticsearchIOTestCommon.testSplit(0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,13 @@
*/
package org.apache.beam.sdk.io.elasticsearch;

import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.util.List;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -173,32 +160,7 @@ public void testWriteWithMaxBatchSizeBytes() throws Exception {

@Test
public void testSplit() throws Exception {
ElasticSearchIOTestUtils.insertTestDocuments(
connectionConfiguration, NUM_DOCS_UTESTS, restClient);
PipelineOptions options = PipelineOptionsFactory.create();
Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
BoundedElasticsearchSource initialSource =
new BoundedElasticsearchSource(read, null, null, null);
//desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get
// as many bundles as ES shards and bundle size is shard size
int desiredBundleSizeBytes = 0;
List<? extends BoundedSource<String>> splits =
initialSource.split(desiredBundleSizeBytes, options);
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
//this is the number of ES shards
// (By default, each index in Elasticsearch is allocated 5 primary shards)
int expectedNumSources = 5;
assertEquals("Wrong number of splits", expectedNumSources, splits.size());
int emptySplits = 0;
for (BoundedSource<String> subSource : splits) {
if (readFromSource(subSource, options).isEmpty()) {
emptySplits += 1;
}
}
assertThat(
"There are too many empty splits, parallelism is sub-optimal",
emptySplits,
lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size())));
elasticsearchIOTestCommon.testSplit(0);
}

@Test
Expand All @@ -216,7 +178,7 @@ public void testWriteWithIndexFn() throws Exception {
@Test
public void testWriteWithTypeFn() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithTypeFn();
elasticsearchIOTestCommon.testWriteWithTypeFn2x5x();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#
################################################################################

#Create an ELK (Elasticsearch Logstash Kibana) container for ES v2.4 and compatible Logstash and Kibana versions,
#Create an ELK (Elasticsearch Logstash Kibana) container for ES v5.4.3 and compatible Logstash and Kibana versions,
#bind then on host ports, allow shell access to container and mount current directory on /home/$USER inside the container

docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-2.4 sebp/elk:es240_l240_k460
docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-5.4.3 sebp/elk:543
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,10 @@
*/
package org.apache.beam.sdk.io.elasticsearch;

import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.junit.Assert.assertEquals;

import java.util.List;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.client.RestClient;
import org.junit.AfterClass;
Expand Down Expand Up @@ -93,24 +86,7 @@ public static void afterClass() throws Exception {

@Test
public void testSplitsVolume() throws Exception {
Read read = ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
BoundedElasticsearchSource initialSource =
new BoundedElasticsearchSource(read, null, null, null);
int desiredBundleSizeBytes = 10000;
List<? extends BoundedSource<String>> splits =
initialSource.split(desiredBundleSizeBytes, options);
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
long indexSize = BoundedElasticsearchSource.estimateIndexSize(readConnectionConfiguration);
float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes;
int expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat);
assertEquals(expectedNumSources, splits.size());
int nonEmptySplits = 0;
for (BoundedSource<String> subSource : splits) {
if (readFromSource(subSource, options).size() > 0) {
nonEmptySplits += 1;
}
}
assertEquals("Wrong number of empty splits", expectedNumSources, nonEmptySplits);
elasticsearchIOTestCommon.testSplit(10_000);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,16 @@
*/
package org.apache.beam.sdk.io.elasticsearch;

import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.hamcrest.Matchers.lessThan;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -160,33 +149,10 @@ public void testWriteWithMaxBatchSizeBytes() throws Exception {

@Test
public void testSplit() throws Exception {
//need to create the index using the helper method (not create it at first insertion)
// need to create the index using the helper method (not create it at first insertion)
// for the indexSettings() to be run
createIndex(getEsIndex());
ElasticSearchIOTestUtils.insertTestDocuments(
connectionConfiguration, NUM_DOCS_UTESTS, getRestClient());
PipelineOptions options = PipelineOptionsFactory.create();
Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
BoundedElasticsearchSource initialSource =
new BoundedElasticsearchSource(read, null, null, null);
int desiredBundleSizeBytes = 2000;
List<? extends BoundedSource<String>> splits =
initialSource.split(desiredBundleSizeBytes, options);
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
long indexSize = BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration);
float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes;
int expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat);
assertEquals("Wrong number of splits", expectedNumSources, splits.size());
int emptySplits = 0;
for (BoundedSource<String> subSource : splits) {
if (readFromSource(subSource, options).isEmpty()) {
emptySplits += 1;
}
}
assertThat(
"There are too many empty splits, parallelism is sub-optimal",
emptySplits,
lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size())));
elasticsearchIOTestCommon.testSplit(2_000);
}

@Test
Expand All @@ -204,7 +170,7 @@ public void testWriteWithIndexFn() throws Exception {
@Test
public void testWriteWithTypeFn() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithTypeFn();
elasticsearchIOTestCommon.testWriteWithTypeFn2x5x();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: org.apache.beam.gradle.BeamModulePlugin
applyJavaNature()
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 6.x"
ext.summary = "Tests of ElasticsearchIO on Elasticsearch 6.x"

test {
// needed for ESIntegTestCase
systemProperty "tests.security.manager", "false"
}

def jna_version = "4.1.0"
def log4j_version = "2.6.2"
def elastic_search_version = "6.4.0"

configurations.all {
resolutionStrategy {
// Make sure the log4j versions for api and core match instead of taking the default
// Gradle rule of using the latest.
force "org.apache.logging.log4j:log4j-api:$log4j_version"
force "org.apache.logging.log4j:log4j-core:$log4j_version"
}
}

dependencies {
testCompile project(path: ":beam-sdks-java-io-elasticsearch-tests-common", configuration: "shadowTest")
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.5.2"
testCompile "org.elasticsearch:elasticsearch:$elastic_search_version"

testCompile project(path: ":beam-sdks-java-core", configuration: "shadow")
testCompile project(path: ":beam-sdks-java-io-elasticsearch", configuration: "shadow")
testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadowTest")
testCompile project(path: ":beam-runners-direct-java", configuration: "shadow")
testCompile "org.apache.logging.log4j:log4j-core:$log4j_version"
testCompile "org.apache.logging.log4j:log4j-api:$log4j_version"
testCompile library.java.slf4j_api
testCompile "net.java.dev.jna:jna:$jna_version"
testCompile library.java.hamcrest_core
testCompile library.java.hamcrest_library
testCompile library.java.slf4j_jdk14
testCompile library.java.commons_io_1x
testCompile library.java.junit
testCompile "org.elasticsearch.client:elasticsearch-rest-client:$elastic_search_version"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/sh
################################################################################
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
################################################################################

#Create an ELK (Elasticsearch Logstash Kibana) container for ES v6.4.0 and compatible Logstash and Kibana versions,
#bind then on host ports, allow shell access to container and mount current directory on /home/$USER inside the container

docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-6.4.0 sebp/elk:640

0 comments on commit 5da843e

Please sign in to comment.