diff --git a/src/main/java/com/yahoo/bullet/querying/Querier.java b/src/main/java/com/yahoo/bullet/querying/Querier.java index d10156ae..cf7311dd 100644 --- a/src/main/java/com/yahoo/bullet/querying/Querier.java +++ b/src/main/java/com/yahoo/bullet/querying/Querier.java @@ -10,7 +10,6 @@ import com.yahoo.bullet.common.BulletConfig; import com.yahoo.bullet.common.BulletError; import com.yahoo.bullet.common.Monoidal; -import com.yahoo.bullet.parsing.Aggregation; import com.yahoo.bullet.parsing.Clause; import com.yahoo.bullet.parsing.Projection; import com.yahoo.bullet.parsing.Query; @@ -177,7 +176,7 @@ * {@link #isClosed()} and then {@link #reset()} for non time-based windows. You should put it in a buffer * for a little bit of time while the data arrives if {@link #isClosed()}, then {@link #getResult()} and * {@link #reset()}. Similarly, for {@link #isDone()}. You should only do this for queries for which - * {@link #isTimeBasedWindow()} is true. For record based windows, you can use {@link #isClosed()} to drive the + * {@link #shouldBuffer()} is true. For record based windows, you can use {@link #isClosed()} to drive the * emission of the results. * * @@ -226,7 +225,7 @@ * queryPubSubPublisher.emit( * else if (querier.isClosed()) * Clip clip = querier.getResult() - * // See note above regarding buffering if querier.isTimeBasedWindow() + * // See note above regarding buffering if querier.shouldBuffer() * emit(clip) * querier.reset() * @@ -535,19 +534,16 @@ public RateLimitError getRateLimitError() { } /** - * Returns if this query has a time based window. + * Returns if this query should be buffered for a bit before getting results out. You can use this to wait for the + * extra results in your Join or Combine stage after a query is {@link #isDone()} or {@link #isClosed()}. If so, + * you should wait for more results to trickle in from the Filter stage. * * @return A boolean that is true if this is a time based query window. */ - public boolean isTimeBasedWindow() { + public boolean shouldBuffer() { Window window = runningQuery.getQuery().getWindow(); - boolean noWindow = window == null; - // If it's a RAW query without a window, it is NOT time based. - if (noWindow && isRaw()) { - return false; - } - // No window means duration drives the query, it IS time based. Otherwise, check if the window is time based. - return noWindow || window.isTimeBased(); + // No window means duration drives the query -> time based. Otherwise, if the window is time based. + return window == null || window.isTimeBased(); } /** @@ -615,10 +611,6 @@ private boolean isLastWindow() { return window.getClass().equals(Basic.class); } - private boolean isRaw() { - return runningQuery.getQuery().getAggregation().getType() == Aggregation.Type.RAW; - } - private Meta getErrorMeta(Exception e) { return Meta.of(BulletError.makeError(e.getMessage(), TRY_AGAIN_LATER)); } @@ -628,5 +620,4 @@ private void incrementRate() { rateLimit.increment(); } } - } diff --git a/src/test/java/com/yahoo/bullet/querying/QuerierTest.java b/src/test/java/com/yahoo/bullet/querying/QuerierTest.java index dda94c36..3f8e99da 100644 --- a/src/test/java/com/yahoo/bullet/querying/QuerierTest.java +++ b/src/test/java/com/yahoo/bullet/querying/QuerierTest.java @@ -194,8 +194,8 @@ public void testDefaults() { Assert.assertFalse(querier.isDone()); Assert.assertFalse(querier.isExceedingRateLimit()); Assert.assertNull(querier.getRateLimitError()); - // RAW query - Assert.assertFalse(querier.isTimeBasedWindow()); + // No window => so duration based + Assert.assertTrue(querier.shouldBuffer()); Assert.assertEquals(querier.getResult().getRecords(), emptyList()); } @@ -640,7 +640,7 @@ public void testRawQueriesWithTimeWindowsAreNotChanged() { Assert.assertFalse(querier.isClosed()); Assert.assertFalse(querier.isClosedForPartition()); - Assert.assertTrue(querier.isTimeBasedWindow()); + Assert.assertTrue(querier.shouldBuffer()); querier.consume(RecordBox.get().getRecord()); @@ -666,7 +666,7 @@ public void testNonRawQueriesWithRecordWindowsAreErrors() { } @Test - public void testRawQueriesWithoutWindowsAreNotTimeBased() { + public void testRawQueriesWithoutWindowsThatAreClosedShouldBuffer() { BulletConfig config = new BulletConfig(); config.set(BulletConfig.RAW_AGGREGATION_MAX_SIZE, 10); config.validate(); @@ -680,19 +680,19 @@ public void testRawQueriesWithoutWindowsAreNotTimeBased() { Assert.assertFalse(querier.isClosed()); Assert.assertFalse(querier.isClosedForPartition()); - Assert.assertFalse(querier.isTimeBasedWindow()); + Assert.assertTrue(querier.shouldBuffer()); Assert.assertEquals(querier.getWindow().getClass(), Basic.class); querier.consume(RecordBox.get().getRecord()); Assert.assertFalse(querier.isClosed()); Assert.assertFalse(querier.isClosedForPartition()); - Assert.assertFalse(querier.isTimeBasedWindow()); + Assert.assertTrue(querier.shouldBuffer()); IntStream.range(0, 9).forEach(i -> querier.consume(RecordBox.get().getRecord())); Assert.assertTrue(querier.isClosed()); Assert.assertTrue(querier.isClosedForPartition()); - Assert.assertFalse(querier.isTimeBasedWindow()); + Assert.assertTrue(querier.shouldBuffer()); } }