From 0febba044d2c309bce7f0b0a0de63cf14e3b9f1c Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 26 Jan 2022 16:47:44 -0600 Subject: [PATCH] Avoiding failure when using frozen indices (#1842) --- .../hadoop/rest/RestRepository.java | 6 ++-- .../hadoop/rest/ScrollQuery.java | 8 +++++ .../hadoop/serialization/ScrollReader.java | 6 ++++ .../hadoop/rest/ScrollQueryTest.java | 29 +++++++++++++++++++ .../serialization/ScrollReaderTest.java | 16 ++++++++++ .../no-scroll-id/scroll.json | 20 +++++++++++++ 6 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 mr/src/test/resources/org/elasticsearch/hadoop/serialization/scrollReaderTestData/no-scroll-id/scroll.json diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java index a7424f432..4dfdcd729 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java @@ -311,14 +311,16 @@ Scroll scroll(String query, BytesArray body, ScrollReader reader) throws IOExcep InputStream scroll = client.execute(POST, query, body).body(); try { Scroll scrollResult = reader.read(scroll); - if (settings.getInternalVersionOrThrow().onOrBefore(EsMajorVersion.V_2_X)) { + if (scrollResult == null) { + log.info(String.format("No scroll for query [%s/%s], likely because the index is frozen", query, body)); + } else if (settings.getInternalVersionOrThrow().onOrBefore(EsMajorVersion.V_2_X)) { // On ES 2.X and before, a scroll response does not contain any hits to start with. // Another request will be needed. scrollResult = new Scroll(scrollResult.getScrollId(), scrollResult.getTotalHits(), false); } return scrollResult; } finally { - if (scroll instanceof StatsAware) { + if (scroll != null && scroll instanceof StatsAware) { stats.aggregate(((StatsAware) scroll).stats()); } } diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java index fe3ac3002..702e10753 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java @@ -92,6 +92,10 @@ public boolean hasNext() { try { Scroll scroll = repository.scroll(query, body, reader); + if (scroll == null) { + finished = true; + return false; + } // size is passed as a limit (since we can't pass it directly into the request) - if it's not specified (<1) just scroll the whole index size = (size < 1 ? scroll.getTotalHits() : size); scrollId = scroll.getScrollId(); @@ -114,6 +118,10 @@ public boolean hasNext() { try { Scroll scroll = repository.scroll(scrollId, reader); + if (scroll == null) { + finished = true; + return false; + } scrollId = scroll.getScrollId(); batch = scroll.getHits(); finished = scroll.isConcluded(); diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/ScrollReader.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/ScrollReader.java index 24dcf9ac9..1a2d464a7 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/ScrollReader.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/ScrollReader.java @@ -268,6 +268,12 @@ public Scroll read(InputStream content) throws IOException { private Scroll read(Parser parser, BytesArray input) { // get scroll_id Token token = ParsingUtils.seek(parser, SCROLL_ID); + if (token == null) { // no scroll id is returned for frozen indices + if (log.isTraceEnabled()) { + log.info("No scroll id found, likely because the index is frozen"); + } + return null; + } Assert.isTrue(token == Token.VALUE_STRING, "invalid response"); String scrollId = parser.text(); diff --git a/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java b/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java index 4f7022770..ca3accf0a 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java @@ -83,6 +83,35 @@ private RestRepository mockRepository() throws Exception { + return mocked; + } + + @Test + public void testFrozen() throws Exception { + // Frozen indices return a null scroll + RestRepository repository = mockRepositoryFrozenIndex(); + ScrollReader scrollReader = Mockito.mock(ScrollReader.class); + + String query = "/index/type/_search?scroll=10m&etc=etc"; + BytesArray body = new BytesArray("{}"); + long size = 100; + + ScrollQuery scrollQuery = new ScrollQuery(repository, query, body, size, scrollReader); + + Assert.assertFalse(scrollQuery.hasNext()); + scrollQuery.close(); + Mockito.verify(repository).close(); + Stats stats = scrollQuery.stats(); + Assert.assertEquals(0, stats.docsReceived); + } + + private RestRepository mockRepositoryFrozenIndex() throws Exception { + RestRepository mocked = Mockito.mock(RestRepository.class); + Mockito.doReturn(null).when(mocked).scroll(Matchers.anyString(), Matchers.any(BytesArray.class), Matchers.any(ScrollReader.class)); + RestClient mockClient = Mockito.mock(RestClient.class); + Mockito.when(mockClient.deleteScroll(Matchers.eq("mnop"))).thenReturn(true); + Mockito.when(mockClient.deleteScroll(Matchers.anyString())).thenReturn(false); + Mockito.doReturn(mockClient).when(mocked).getRestClient(); return mocked; } } \ No newline at end of file diff --git a/mr/src/test/java/org/elasticsearch/hadoop/serialization/ScrollReaderTest.java b/mr/src/test/java/org/elasticsearch/hadoop/serialization/ScrollReaderTest.java index 9e3ebe8eb..71205da66 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/serialization/ScrollReaderTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/serialization/ScrollReaderTest.java @@ -585,6 +585,22 @@ public void testScrollWithHandlersThatCorrectsError() throws IOException { assertEquals(4L, JsonUtils.query("number").apply(scroll.getHits().get(0)[1])); } + @Test + public void testNoScrollIdFromFrozenIndex() throws IOException { + MappingSet mappings = getMappingSet("numbers-as-strings"); // The schema doesn't matter since there's no data + InputStream stream = getClass().getResourceAsStream(scrollData("no-scroll-id")); + Settings testSettings = new TestSettings(); + testSettings.setProperty(ConfigurationOptions.ES_READ_METADATA, "" + readMetadata); + testSettings.setProperty(ConfigurationOptions.ES_READ_METADATA_FIELD, "" + metadataField); + testSettings.setProperty(ConfigurationOptions.ES_OUTPUT_JSON, "" + readAsJson); + testSettings.setProperty(DeserializationHandlerLoader.ES_READ_DATA_ERROR_HANDLERS , "fix"); + testSettings.setProperty(DeserializationHandlerLoader.ES_READ_DATA_ERROR_HANDLER + ".fix" , CorrectingHandler.class.getName()); + JdkValueReader valueReader = ObjectUtils.instantiate(JdkValueReader.class.getName(), testSettings); + ScrollReader reader = new ScrollReader(ScrollReaderConfigBuilder.builder(valueReader, mappings.getResolvedView(), testSettings)); + ScrollReader.Scroll scroll = reader.read(stream); + assertNull(scroll); + } + /** * Case: Handler throws random Exceptions * Outcome: Processing fails fast. diff --git a/mr/src/test/resources/org/elasticsearch/hadoop/serialization/scrollReaderTestData/no-scroll-id/scroll.json b/mr/src/test/resources/org/elasticsearch/hadoop/serialization/scrollReaderTestData/no-scroll-id/scroll.json new file mode 100644 index 000000000..ddbc800f8 --- /dev/null +++ b/mr/src/test/resources/org/elasticsearch/hadoop/serialization/scrollReaderTestData/no-scroll-id/scroll.json @@ -0,0 +1,20 @@ +{ + "took":0, + "timed_out":false, + "_shards":{ + "total":0, + "successful":0, + "skipped":0, + "failed":0 + }, + "hits":{ + "total":{ + "value":0, + "relation":"eq" + }, + "max_score":0.0, + "hits":[ + + ] + } +} \ No newline at end of file