Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions src/main/java/com/yahoo/bullet/querying/Querier.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.common.BulletError;
import com.yahoo.bullet.common.Monoidal;
import com.yahoo.bullet.parsing.Aggregation;
import com.yahoo.bullet.parsing.Clause;
import com.yahoo.bullet.parsing.Projection;
import com.yahoo.bullet.parsing.Query;
Expand Down Expand Up @@ -177,7 +176,7 @@
* {@link #isClosed()} and then {@link #reset()} for <em>non time-based windows</em>. You should put it in a buffer
* for a little bit of time while the data arrives if {@link #isClosed()}, then {@link #getResult()} and
* {@link #reset()}. Similarly, for {@link #isDone()}. You should only do this for queries for which
* {@link #isTimeBasedWindow()} is true. For record based windows, you can use {@link #isClosed()} to drive the
* {@link #shouldBuffer()} is true. For record based windows, you can use {@link #isClosed()} to drive the
* emission of the results.
* </li>
* </ol>
Expand Down Expand Up @@ -226,7 +225,7 @@
* queryPubSubPublisher.emit(
* else if (querier.isClosed())
* Clip clip = querier.getResult()
* // See note above regarding buffering if querier.isTimeBasedWindow()
* // See note above regarding buffering if querier.shouldBuffer()
* emit(clip)
* querier.reset()
*
Expand Down Expand Up @@ -535,19 +534,16 @@ public RateLimitError getRateLimitError() {
}

/**
* Returns if this query has a time based window.
* Returns if this query should be buffered for a bit before getting results out. You can use this to wait for the
* extra results in your Join or Combine stage after a query is {@link #isDone()} or {@link #isClosed()}. If so,
* you should wait for more results to trickle in from the Filter stage.
*
* @return A boolean that is true if this is a time based query window.
*/
public boolean isTimeBasedWindow() {
public boolean shouldBuffer() {
Window window = runningQuery.getQuery().getWindow();
boolean noWindow = window == null;
// If it's a RAW query without a window, it is NOT time based.
if (noWindow && isRaw()) {
return false;
}
// No window means duration drives the query, it IS time based. Otherwise, check if the window is time based.
return noWindow || window.isTimeBased();
// No window means duration drives the query -> time based. Otherwise, if the window is time based.
return window == null || window.isTimeBased();
}

/**
Expand Down Expand Up @@ -615,10 +611,6 @@ private boolean isLastWindow() {
return window.getClass().equals(Basic.class);
}

private boolean isRaw() {
return runningQuery.getQuery().getAggregation().getType() == Aggregation.Type.RAW;
}

private Meta getErrorMeta(Exception e) {
return Meta.of(BulletError.makeError(e.getMessage(), TRY_AGAIN_LATER));
}
Expand All @@ -628,5 +620,4 @@ private void incrementRate() {
rateLimit.increment();
}
}

}
14 changes: 7 additions & 7 deletions src/test/java/com/yahoo/bullet/querying/QuerierTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ public void testDefaults() {
Assert.assertFalse(querier.isDone());
Assert.assertFalse(querier.isExceedingRateLimit());
Assert.assertNull(querier.getRateLimitError());
// RAW query
Assert.assertFalse(querier.isTimeBasedWindow());
// No window => so duration based
Assert.assertTrue(querier.shouldBuffer());
Assert.assertEquals(querier.getResult().getRecords(), emptyList());
}

Expand Down Expand Up @@ -640,7 +640,7 @@ public void testRawQueriesWithTimeWindowsAreNotChanged() {

Assert.assertFalse(querier.isClosed());
Assert.assertFalse(querier.isClosedForPartition());
Assert.assertTrue(querier.isTimeBasedWindow());
Assert.assertTrue(querier.shouldBuffer());

querier.consume(RecordBox.get().getRecord());

Expand All @@ -666,7 +666,7 @@ public void testNonRawQueriesWithRecordWindowsAreErrors() {
}

@Test
public void testRawQueriesWithoutWindowsAreNotTimeBased() {
public void testRawQueriesWithoutWindowsThatAreClosedShouldBuffer() {
BulletConfig config = new BulletConfig();
config.set(BulletConfig.RAW_AGGREGATION_MAX_SIZE, 10);
config.validate();
Expand All @@ -680,19 +680,19 @@ public void testRawQueriesWithoutWindowsAreNotTimeBased() {

Assert.assertFalse(querier.isClosed());
Assert.assertFalse(querier.isClosedForPartition());
Assert.assertFalse(querier.isTimeBasedWindow());
Assert.assertTrue(querier.shouldBuffer());
Assert.assertEquals(querier.getWindow().getClass(), Basic.class);

querier.consume(RecordBox.get().getRecord());

Assert.assertFalse(querier.isClosed());
Assert.assertFalse(querier.isClosedForPartition());
Assert.assertFalse(querier.isTimeBasedWindow());
Assert.assertTrue(querier.shouldBuffer());

IntStream.range(0, 9).forEach(i -> querier.consume(RecordBox.get().getRecord()));

Assert.assertTrue(querier.isClosed());
Assert.assertTrue(querier.isClosedForPartition());
Assert.assertFalse(querier.isTimeBasedWindow());
Assert.assertTrue(querier.shouldBuffer());
}
}