From 7003586a17ce5f334a5aeaf0036888888534f597 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 26 Jan 2022 16:37:38 -0600 Subject: [PATCH] Fixing docsReceived counter (#1840) This commit fixes a bug that had been introduced to the "Documents Received" mapreduce counter. It had been missing counting the first set of hits that were returned by each scroll. Closes #1470 --- .../hadoop/rest/ScrollQuery.java | 3 ++- .../hadoop/rest/ScrollQueryTest.java | 26 ++++++++++++++----- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java index fe3ac3002..30826483f 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java @@ -100,7 +100,8 @@ public boolean hasNext() { } catch (IOException ex) { throw new EsHadoopIllegalStateException(String.format("Cannot create scroll for query [%s/%s]", query, body), ex); } - + read += batch.size(); + stats.docsReceived += batch.size(); // no longer needed body = null; query = null; diff --git a/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java b/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java index 4f7022770..fc992eefa 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java @@ -36,9 +36,8 @@ public class ScrollQueryTest { - @Test - public void test() throws Exception { - RestRepository repository = mockRepository(); + public void test(boolean firstScrollReturnsHits) throws Exception { + RestRepository repository = mockRepository(firstScrollReturnsHits); ScrollReader scrollReader = Mockito.mock(ScrollReader.class); String query = "/index/type/_search?scroll=10m&etc=etc"; @@ -54,9 +53,20 @@ public void test() throws Exception { Mockito.verify(repository).close(); Stats stats = scrollQuery.stats(); Assert.assertEquals(1, stats.docsReceived); + Assert.assertEquals(1, scrollQuery.getRead()); + } + + @Test + public void testWithEmptyFirstScroll() throws Exception { + test(false); + } + + @Test + public void testWithNonEmptyFirstScroll() throws Exception { + test(true); } - private RestRepository mockRepository() throws Exception { + private RestRepository mockRepository(boolean firstScrollReturnsHits) throws Exception { Map data = new HashMap(); data.put("field", "value"); String id = "1"; @@ -64,13 +74,17 @@ private RestRepository mockRepository() throws Exception { RestRepository mocked = Mockito.mock(RestRepository.class); - ScrollReader.Scroll start = new ScrollReader.Scroll("abcd", 10, Collections.emptyList(), 5, 5); + ScrollReader.Scroll start = new ScrollReader.Scroll("abcd", 10, + firstScrollReturnsHits ? Collections.singletonList(hit) : Collections.emptyList(), + 5, 5); Mockito.doReturn(start).when(mocked).scroll(Matchers.anyString(), Matchers.any(BytesArray.class), Matchers.any(ScrollReader.class)); ScrollReader.Scroll middle = new ScrollReader.Scroll("efgh", 10, Collections.emptyList(), 3, 3); Mockito.doReturn(middle).when(mocked).scroll(Matchers.eq("abcd"), Matchers.any(ScrollReader.class)); - ScrollReader.Scroll end = new ScrollReader.Scroll("ijkl", 10, Collections.singletonList(hit), 2, 1); + ScrollReader.Scroll end = new ScrollReader.Scroll("ijkl", 10, + firstScrollReturnsHits ? Collections.emptyList() : Collections.singletonList(hit), + 2, 1); Mockito.doReturn(end).when(mocked).scroll(Matchers.eq("efgh"), Matchers.any(ScrollReader.class)); ScrollReader.Scroll finalScroll = new ScrollReader.Scroll("mnop", 10, true); Mockito.doReturn(finalScroll).when(mocked).scroll(Matchers.eq("ijkl"), Matchers.any(ScrollReader.class));