diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java index fe3ac3002..30826483f 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java @@ -100,7 +100,8 @@ public boolean hasNext() { } catch (IOException ex) { throw new EsHadoopIllegalStateException(String.format("Cannot create scroll for query [%s/%s]", query, body), ex); } - + read += batch.size(); + stats.docsReceived += batch.size(); // no longer needed body = null; query = null; diff --git a/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java b/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java index 4f7022770..fc992eefa 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/rest/ScrollQueryTest.java @@ -36,9 +36,8 @@ public class ScrollQueryTest { - @Test - public void test() throws Exception { - RestRepository repository = mockRepository(); + public void test(boolean firstScrollReturnsHits) throws Exception { + RestRepository repository = mockRepository(firstScrollReturnsHits); ScrollReader scrollReader = Mockito.mock(ScrollReader.class); String query = "/index/type/_search?scroll=10m&etc=etc"; @@ -54,9 +53,20 @@ public void test() throws Exception { Mockito.verify(repository).close(); Stats stats = scrollQuery.stats(); Assert.assertEquals(1, stats.docsReceived); + Assert.assertEquals(1, scrollQuery.getRead()); + } + + @Test + public void testWithEmptyFirstScroll() throws Exception { + test(false); + } + + @Test + public void testWithNonEmptyFirstScroll() throws Exception { + test(true); } - private RestRepository mockRepository() throws Exception { + private RestRepository mockRepository(boolean firstScrollReturnsHits) throws Exception { Map data = new HashMap(); data.put("field", "value"); String id = "1"; @@ -64,13 +74,17 @@ private RestRepository mockRepository() throws Exception { RestRepository mocked = Mockito.mock(RestRepository.class); - ScrollReader.Scroll start = new ScrollReader.Scroll("abcd", 10, Collections.emptyList(), 5, 5); + ScrollReader.Scroll start = new ScrollReader.Scroll("abcd", 10, + firstScrollReturnsHits ? Collections.singletonList(hit) : Collections.emptyList(), + 5, 5); Mockito.doReturn(start).when(mocked).scroll(Matchers.anyString(), Matchers.any(BytesArray.class), Matchers.any(ScrollReader.class)); ScrollReader.Scroll middle = new ScrollReader.Scroll("efgh", 10, Collections.emptyList(), 3, 3); Mockito.doReturn(middle).when(mocked).scroll(Matchers.eq("abcd"), Matchers.any(ScrollReader.class)); - ScrollReader.Scroll end = new ScrollReader.Scroll("ijkl", 10, Collections.singletonList(hit), 2, 1); + ScrollReader.Scroll end = new ScrollReader.Scroll("ijkl", 10, + firstScrollReturnsHits ? Collections.emptyList() : Collections.singletonList(hit), + 2, 1); Mockito.doReturn(end).when(mocked).scroll(Matchers.eq("efgh"), Matchers.any(ScrollReader.class)); ScrollReader.Scroll finalScroll = new ScrollReader.Scroll("mnop", 10, true); Mockito.doReturn(finalScroll).when(mocked).scroll(Matchers.eq("ijkl"), Matchers.any(ScrollReader.class));