Skip to content

Commit

Permalink
Elasticsearch enable Point In Time based searches (#30824)
Browse files Browse the repository at this point in the history
* first implementation for a PIT iterator on read PTransform
  • Loading branch information
prodriguezdefino committed Apr 30, 2024
1 parent 5f71e6a commit 9612fe1
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ public void testReadVolume() throws Exception {
elasticsearchIOTestCommon.testRead();
}

@Test
public void testReadPITVolume() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testReadPIT();
}

@Test
public void testWriteVolume() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,31 @@ public void testRead() throws Exception {
elasticsearchIOTestCommon.testRead();
}

@Test
public void testReadPIT() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
// for the indexSettings() to be run
createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testReadPIT();
}

@Test
public void testReadWithQueryString() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
// for the indexSettings() to be run
createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testRead();
elasticsearchIOTestCommon.testReadWithQueryString();
}

@Test
public void testReadWithQueryStringAndPIT() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
// for the indexSettings() to be run
createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testReadWithQueryAndPIT();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,36 @@ void testRead() throws Exception {
pipeline.run();
}

/** Point in Time search is currently available for Elasticsearch version 8+. */
void testReadPIT() throws Exception {
if (!useAsITests) {
ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
}

PCollection<String> output =
pipeline.apply(
ElasticsearchIO.read()
.withConnectionConfiguration(connectionConfiguration)
.withPointInTimeSearch());
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(numDocs);
pipeline.run();
}

void testReadWithQueryString() throws Exception {
testReadWithQueryInternal(Read::withQuery);
testReadWithQueryInternal(Read::withQuery, true);
}

void testReadWithQueryAndPIT() throws Exception {
testReadWithQueryInternal(Read::withQuery, false);
}

void testReadWithQueryValueProvider() throws Exception {
testReadWithQueryInternal(
(read, query) -> read.withQuery(ValueProvider.StaticValueProvider.of(query)));
(read, query) -> read.withQuery(ValueProvider.StaticValueProvider.of(query)), true);
}

private void testReadWithQueryInternal(BiFunction<Read, String, Read> queryConfigurer)
throws IOException {
private void testReadWithQueryInternal(
BiFunction<Read, String, Read> queryConfigurer, boolean useScrollAPI) throws IOException {
if (!useAsITests) {
ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
}
Expand All @@ -257,14 +276,16 @@ private void testReadWithQueryInternal(BiFunction<Read, String, Read> queryConfi
+ " \"scientist\" : {\n"
+ " \"query\" : \"Einstein\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}";

Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);

read = queryConfigurer.apply(read, query);

read = useScrollAPI ? read : read.withPointInTimeSearch();

PCollection<String> output = pipeline.apply(read);

PAssert.thatSingleton(output.apply("Count", Count.globally()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -334,14 +335,18 @@ static long refreshIndexAndGetCurrentNumDocs(
static List<String> createDocuments(long numDocs, InjectionMode injectionMode) {

ArrayList<String> data = new ArrayList<>();
LocalDateTime baseDateTime = LocalDateTime.now();
for (int i = 0; i < numDocs; i++) {
int index = i % FAMOUS_SCIENTISTS.length;
// insert 2 malformed documents
if (InjectionMode.INJECT_SOME_INVALID_DOCS.equals(injectionMode)
&& INVALID_DOCS_IDS.contains(i)) {
data.add(String.format("{\"scientist\";\"%s\", \"id\":%s}", FAMOUS_SCIENTISTS[index], i));
} else {
data.add(String.format("{\"scientist\":\"%s\", \"id\":%s}", FAMOUS_SCIENTISTS[index], i));
data.add(
String.format(
"{\"scientist\":\"%s\", \"id\":%s, \"@timestamp\" : \"%s\"}",
FAMOUS_SCIENTISTS[index], i, baseDateTime.plusSeconds(i).toString()));
}
}
return data;
Expand Down

0 comments on commit 9612fe1

Please sign in to comment.