Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
## I/Os

* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Allow splitting apart document serialization and IO for ElasticsearchIO
* Support Bulk API request size optimization through addition of ElasticsearchIO.Write.withStatefulBatches

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ public void testWriteVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWrite();
}

@Test
public void testWriteVolumeStateful() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteStateful();
}

@Test
public void testSizesVolume() throws Exception {
elasticsearchIOTestCommon.testSizes();
Expand All @@ -123,6 +132,23 @@ public void testWriteVolumeWithFullAddressing() throws Exception {
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

/**
* This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned
* and then a new field is added to each document using a partial update. The test then asserts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,29 @@ public void testWritePartialUpdate() throws Exception {
elasticsearchIOTestCommon.testWritePartialUpdate();
}

@Test
public void testWriteWithAllowedErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

@Test
public void testMaxParallelRequestsPerWindow() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow();
}

@Test
public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime")
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version"
// TODO(egalpin): Remove painless plugin in favour of containerized tests
testImplementation group: 'org.codelibs.elasticsearch.module', name: 'lang-painless', version: "$elastic_search_version"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.5.0"
testCompile "org.elasticsearch:elasticsearch:$elastic_search_version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ public void testWriteVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWrite();
}

@Test
public void testWriteVolumeStateful() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteStateful();
}

@Test
public void testSizesVolume() throws Exception {
elasticsearchIOTestCommon.testSizes();
Expand All @@ -127,6 +136,29 @@ public void testWriteWithFullAddressingVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

/**
* This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned
* and then a new field is added to each document using a partial update. The test then asserts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.painless.PainlessPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.Netty4Plugin;
Expand Down Expand Up @@ -85,6 +86,7 @@ public Settings indexSettings() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(Netty4Plugin.class);
plugins.add(PainlessPlugin.class);
return plugins;
}

Expand Down Expand Up @@ -199,6 +201,35 @@ public void testWritePartialUpdate() throws Exception {
elasticsearchIOTestCommon.testWritePartialUpdate();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

@Test
public void testMaxParallelRequestsPerWindow() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow();
}

@Test
public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime")
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version"
// TODO(egalpin): Remove painless plugin in favour of containerized tests
testImplementation group: 'org.codelibs.elasticsearch.module', name: 'lang-painless', version: "$elastic_search_version"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.5.2"
testCompile "org.elasticsearch:elasticsearch:$elastic_search_version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ public void testWriteVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWrite();
}

@Test
public void testWriteVolumeStateful() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteStateful();
}

@Test
public void testSizesVolume() throws Exception {
elasticsearchIOTestCommon.testSizes();
Expand All @@ -127,6 +136,29 @@ public void testWriteWithFullAddressingVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

/**
* This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned
* and then a new field is added to each document using a partial update. The test then asserts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.painless.PainlessPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.Netty4Plugin;
Expand Down Expand Up @@ -85,6 +86,7 @@ public Settings indexSettings() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(Netty4Plugin.class);
plugins.add(PainlessPlugin.class);
return plugins;
}

Expand Down Expand Up @@ -193,6 +195,35 @@ public void testWritePartialUpdate() throws Exception {
elasticsearchIOTestCommon.testWritePartialUpdate();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

@Test
public void testMaxParallelRequestsPerWindow() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow();
}

@Test
public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
testCompile project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntime")
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version"
// TODO(egalpin): Remove painless plugin in favour of containerized tests
testImplementation group: 'org.codelibs.elasticsearch.module', name: 'lang-painless', version: "$elastic_search_version"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.7.5"
testCompile "org.elasticsearch:elasticsearch:$elastic_search_version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ public void testWriteVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWrite();
}

@Test
public void testWriteVolumeStateful() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteStateful();
}

@Test
public void testSizesVolume() throws Exception {
elasticsearchIOTestCommon.testSizes();
Expand All @@ -127,6 +136,29 @@ public void testWriteWithFullAddressingVolume() throws Exception {
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

/**
* This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned
* and then a new field is added to each document using a partial update. The test then asserts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.painless.PainlessPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.Netty4Plugin;
Expand Down Expand Up @@ -89,6 +90,7 @@ public Settings indexSettings() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(Netty4Plugin.class);
plugins.add(PainlessPlugin.class);
return plugins;
}

Expand Down Expand Up @@ -197,6 +199,35 @@ public void testWritePartialUpdate() throws Exception {
elasticsearchIOTestCommon.testWritePartialUpdate();
}

@Test
public void testWriteWithAllowableErrors() throws Exception {
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
}

@Test
public void testWriteWithRouting() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithRouting();
}

@Test
public void testWriteScriptedUpsert() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteScriptedUpsert();
}

@Test
public void testWriteWithDocVersion() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithDocVersion();
}

@Test
public void testMaxParallelRequestsPerWindow() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow();
}

@Test
public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
Expand Down
Loading