From 8d13624b864249ca2b028feaaf94c0c7c53a8228 Mon Sep 17 00:00:00 2001 From: Akshai Sarma Date: Mon, 9 Apr 2018 18:02:41 -0700 Subject: [PATCH 1/5] Changing shouldBuffer and time based window emit logic --- .../pubsub/rest/RESTQueryPublisher.java | 3 +- .../bullet/pubsub/rest/RESTSubscriber.java | 14 ++-- .../com/yahoo/bullet/querying/Querier.java | 52 ++++++++------ .../yahoo/bullet/querying/RunningQuery.java | 13 +++- .../bullet/windowing/AdditiveTumbling.java | 2 +- .../com/yahoo/bullet/windowing/Tumbling.java | 14 ++-- .../yahoo/bullet/common/BulletConfigTest.java | 2 +- .../com/yahoo/bullet/pubsub/PubSubTest.java | 8 +-- .../pubsub/rest/RESTSubscriberTest.java | 6 +- .../yahoo/bullet/querying/QuerierTest.java | 72 ++++++++++++++++--- .../windowing/AdditiveTumblingTest.java | 22 +++--- .../yahoo/bullet/windowing/TumblingTest.java | 22 +++--- 12 files changed, 148 insertions(+), 82 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java index 19d1796c..800fc720 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java @@ -6,7 +6,6 @@ package com.yahoo.bullet.pubsub.rest; import com.yahoo.bullet.pubsub.Metadata; -import com.yahoo.bullet.pubsub.PubSubException; import com.yahoo.bullet.pubsub.PubSubMessage; import lombok.AccessLevel; import lombok.Getter; @@ -34,7 +33,7 @@ public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String re } @Override - public void send(PubSubMessage message) throws PubSubException { + public void send(PubSubMessage message) { // Put responseURL in the metadata so the ResponsePublisher knows to which host to send the response Metadata metadata = message.getMetadata(); metadata = metadata == null ? new Metadata() : metadata; diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java index 0cd38351..9949eba3 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java @@ -6,27 +6,28 @@ package com.yahoo.bullet.pubsub.rest; import com.yahoo.bullet.pubsub.BufferingSubscriber; -import com.yahoo.bullet.pubsub.PubSubException; import com.yahoo.bullet.pubsub.PubSubMessage; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import lombok.AccessLevel; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; - import org.apache.http.util.EntityUtils; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + @Slf4j public class RESTSubscriber extends BufferingSubscriber { @Getter(AccessLevel.PACKAGE) private List urls; private CloseableHttpClient client; private long minWait; + @Setter(AccessLevel.PACKAGE) private long lastRequest; private int connectTimeout; @@ -37,6 +38,7 @@ public class RESTSubscriber extends BufferingSubscriber { * @param urls The URLs which will be used to make the http request. * @param client The client to use to make http requests. * @param minWait The minimum time (ms) to wait between subsequent http requests. + * @param connectTimeout The minimum time (ms) to wait for a connection to be made. */ public RESTSubscriber(int maxUncommittedMessages, List urls, CloseableHttpClient client, long minWait, int connectTimeout) { super(maxUncommittedMessages); @@ -48,7 +50,7 @@ public RESTSubscriber(int maxUncommittedMessages, List urls, CloseableHt } @Override - public List getMessages() throws PubSubException { + public List getMessages() { List messages = new ArrayList<>(); long currentTime = System.currentTimeMillis(); if (currentTime - lastRequest <= minWait) { diff --git a/src/main/java/com/yahoo/bullet/querying/Querier.java b/src/main/java/com/yahoo/bullet/querying/Querier.java index 4473329d..6a763d3a 100644 --- a/src/main/java/com/yahoo/bullet/querying/Querier.java +++ b/src/main/java/com/yahoo/bullet/querying/Querier.java @@ -109,7 +109,7 @@ * if (metadata.hasSignal(Signal.KILL) || metadata.hasSignal(Signal.COMPLETE)) * remove Querier for id * else - * create new Querier(id, queryBody, config) and initialize it; + * create new Querier(id, queryBody, config) and initialize it (see note below regarding delaying start). * * *
Case 2: BulletRecord record
@@ -178,20 +178,21 @@ * data volume is too low. * *
  • - * Optional but recommended if you are processing event by event not microbatches: Since data from the Filter stage - * partitions may not arrive at the same time and since the querier may be {@link #isClosed()} at the same time the - * data from the Filter stage partitions arrive, you should not immediately emit {@link #getResult()} if - * {@link #isClosed()} and then {@link #reset()} for non time-based windows. You should put it in a buffer - * for a little bit of time while the data arrives if {@link #isClosed()}, then {@link #getResult()} and - * {@link #reset()}. Similarly, for {@link #isDone()}. You should only do this for queries for which - * {@link #shouldBuffer()} is true. For record based windows, you can use {@link #isClosed()} to drive the - * emission of the results. + * Optional Delayed start and End (recommended if you are processing event by event): Since data from the + * Filter stage partitions may not arrive at the same time and since the querier may be {@link #isClosed()} at the + * same time the data from the Filter stage partitions arrive, you should not immediately emit {@link #getResult()} if + * {@link #isClosed()} and then {@link #reset()} for non time-based windows. You can use the negation of + * {@link #shouldBuffer()} to find out if this kind of query can be started after a bit of delay. This delay will + * ensure that results from the Filter phase always start. To aid you in doing this, you can put it in a buffer and + * use {@link #restart()} to mark the delayed start of the query. + * Similarly, some queries need to be buffered after they are {@link #isDone()} (these include queries that were not + * delayed). You should only do this for queries for which {@link #shouldBuffer()} is true. *
  • * * *

    Pseudo Code

    * - *
    Case 1: Query
    + *
    Case 1: Query
    *
      * (String id, String queryBody, Metadata metadata) = Query
      * if (metadata.hasSignal(Signal.KILL))
    @@ -393,6 +394,19 @@ public Optional> initialize() {
             return window.initialize();
         }
     
    +    /**
    +     * Forces a restart of a valid query (must have previously called {@link #initialize()}) to mark the
    +     * correct start of this object if it was previously created but delayed in starting it (by using the negation of
    +     * {@link #shouldBuffer()}. You might be using this if you were delaying the start of the query in the
    +     * Join phase. This does not revalidate the query or reset any data this might have already consumed.
    +     */
    +    public void restart() {
    +        // Only timestamps are in RunningQuery and Scheme.
    +        // For RunningQuery, we just need to fix the start time.
    +        runningQuery.start();
    +        window.initialize();
    +    }
    +
         /**
          * Consume a {@link BulletRecord} for this query. The record may or not be actually incorporated into the query
          * results. This depends on whether the query can accept more data, if it is expired or not or if the record matches
    @@ -577,22 +591,16 @@ public RateLimitError getRateLimitError() {
         }
     
         /**
    -     * Returns if this query should buffer before emitting results. You can use this to wait for the final results
    -     * for your window (or your final results if you have no window) in your Join or Combine stage after a query is
    -     * {@link #isDone()} or {@link #isClosed()}.
    +     * Returns if this query should buffer before emitting the final results. You can use this to wait for the final
    +     * results in your Join or Combine stage after a query is {@link #isDone()}.
          *
    -     * @return A boolean that is true if the query results should be buffered.
    +     * @return A boolean that is true if the query results should be buffered in the Join phase.
          */
         public boolean shouldBuffer() {
             Window window = runningQuery.getQuery().getWindow();
    -        boolean noWindow = window == null;
    -        // If it's a RAW query without a window, it should be buffered if and only if it timed out. This means that the
    -        // query is not yet done. So this tells the driver to buffer the query to wait for more potential results.
    -        if (noWindow && isRaw()) {
    -            return runningQuery.isTimedOut();
    -        }
    -        // No window (and not raw) is a duration based query => do buffer. Otherwise, buffer if the window is time based.
    -        return noWindow || window.isTimeBased();
    +        boolean noWindow =  window == null;
    +        // Only buffer if there is no window (including Raw) or if it's a record based window.
    +        return noWindow || !window.isTimeBased();
         }
     
         /**
    diff --git a/src/main/java/com/yahoo/bullet/querying/RunningQuery.java b/src/main/java/com/yahoo/bullet/querying/RunningQuery.java
    index 53a1cb17..8b4394ec 100644
    --- a/src/main/java/com/yahoo/bullet/querying/RunningQuery.java
    +++ b/src/main/java/com/yahoo/bullet/querying/RunningQuery.java
    @@ -22,10 +22,10 @@ public class RunningQuery implements Initializable {
         @Getter
         private final String id;
         @Getter
    -    private final long startTime;
    -    @Getter
         private final Query query;
     
    +    @Getter
    +    private long startTime;
         private String queryString;
     
         /**
    @@ -52,11 +52,11 @@ public RunningQuery(String id, String queryString, BulletConfig config) {
         RunningQuery(String id, Query query) {
             this.id = id;
             this.query = query;
    -        startTime = System.currentTimeMillis();
         }
     
         @Override
         public Optional> initialize() {
    +        start();
             return query.initialize();
         }
     
    @@ -75,4 +75,11 @@ public boolean isTimedOut() {
             // Never add to query.getDuration since it can be infinite (Long.MAX_VALUE)
             return System.currentTimeMillis() - startTime >= query.getDuration();
         }
    +
    +    /**
    +     * Exposed for package only. Starts the query.
    +     */
    +    void start() {
    +        startTime = System.currentTimeMillis();
    +    }
     }
    diff --git a/src/main/java/com/yahoo/bullet/windowing/AdditiveTumbling.java b/src/main/java/com/yahoo/bullet/windowing/AdditiveTumbling.java
    index 567c9d89..bec85609 100644
    --- a/src/main/java/com/yahoo/bullet/windowing/AdditiveTumbling.java
    +++ b/src/main/java/com/yahoo/bullet/windowing/AdditiveTumbling.java
    @@ -23,7 +23,7 @@ public AdditiveTumbling(Strategy aggregation, Window window, BulletConfig config
     
         @Override
         public void reset() {
    -        startedAt = System.currentTimeMillis();
    +        nextCloseTime = nextCloseTime + windowLength;
             windowCount++;
         }
     
    diff --git a/src/main/java/com/yahoo/bullet/windowing/Tumbling.java b/src/main/java/com/yahoo/bullet/windowing/Tumbling.java
    index 45ad3707..d740d09a 100644
    --- a/src/main/java/com/yahoo/bullet/windowing/Tumbling.java
    +++ b/src/main/java/com/yahoo/bullet/windowing/Tumbling.java
    @@ -21,8 +21,8 @@
     public class Tumbling extends Basic {
         public static final String NAME = "Tumbling";
     
    -    protected long startedAt;
    -    protected long closeAfter;
    +    protected long nextCloseTime;
    +    protected long windowLength;
     
         /**
          * Creates an instance of this windowing scheme with the provided {@link Strategy} and {@link BulletConfig}.
    @@ -39,27 +39,27 @@ public Tumbling(Strategy aggregation, Window window, BulletConfig config) {
         public Optional> initialize() {
             // Window is initialized so every is present and positive.
             Number every = Utilities.getCasted(window.getEmit(), Window.EMIT_EVERY_FIELD, Number.class);
    -        closeAfter = every.longValue();
    -        startedAt = System.currentTimeMillis();
    +        windowLength = every.longValue();
    +        nextCloseTime = System.currentTimeMillis() + windowLength;
             return Optional.empty();
         }
     
         @Override
         protected Map getMetadata(Map metadataKeys) {
             Map meta = super.getMetadata(metadataKeys);
    -        addIfNonNull(meta, metadataKeys, Meta.Concept.WINDOW_EXPECTED_EMIT_TIME, () -> this.startedAt + this.closeAfter);
    +        addIfNonNull(meta, metadataKeys, Meta.Concept.WINDOW_EXPECTED_EMIT_TIME, () -> this.nextCloseTime);
             return meta;
         }
     
         @Override
         public void reset() {
             super.reset();
    -        startedAt = System.currentTimeMillis();
    +        nextCloseTime = nextCloseTime + windowLength;
         }
     
         @Override
         public boolean isClosed() {
    -        return System.currentTimeMillis() >= startedAt + closeAfter;
    +        return System.currentTimeMillis() >= nextCloseTime;
         }
     
         @Override
    diff --git a/src/test/java/com/yahoo/bullet/common/BulletConfigTest.java b/src/test/java/com/yahoo/bullet/common/BulletConfigTest.java
    index 5563a0c4..1a7849fe 100644
    --- a/src/test/java/com/yahoo/bullet/common/BulletConfigTest.java
    +++ b/src/test/java/com/yahoo/bullet/common/BulletConfigTest.java
    @@ -22,7 +22,7 @@
     import static com.yahoo.bullet.TestHelpers.assertJSONEquals;
     
     public class BulletConfigTest {
    -    private static Map allMetadataAsMap() {
    +    public static Map allMetadataAsMap() {
             Map meta = new HashMap<>();
             for (Map m : BulletConfig.DEFAULT_RESULT_METADATA_METRICS) {
                 meta.put(m.get(BulletConfig.RESULT_METADATA_METRICS_CONCEPT_KEY), m.get(BulletConfig.RESULT_METADATA_METRICS_NAME_KEY));
    diff --git a/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java b/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java
    index a58d7c92..cc91f59a 100644
    --- a/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java
    +++ b/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java
    @@ -66,15 +66,15 @@ public void testSwitchingContext() throws PubSubException {
             Assert.assertTrue(pubSub.getSubscriber().receive().getContent().isEmpty());
     
             // No switch
    -        pubSub.switchContext(PubSub.Context.QUERY_PROCESSING, new BulletConfig());
    -        Assert.assertEquals(pubSub.getContext(), PubSub.Context.QUERY_PROCESSING);
    +        pubSub.switchContext(PubSub.Context.QUERY_SUBMISSION, new BulletConfig());
    +        Assert.assertEquals(pubSub.getContext(), PubSub.Context.QUERY_SUBMISSION);
             Assert.assertTrue(pubSub.getSubscriber().receive().getContent().isEmpty());
     
             // Switch
             BulletConfig newConfig = new BulletConfig("src/test/resources/test_config.yaml");
             newConfig.set(MockPubSub.MOCK_MESSAGE_NAME, "foo");
    -        pubSub.switchContext(PubSub.Context.QUERY_SUBMISSION, newConfig);
    -        Assert.assertEquals(pubSub.getContext(), PubSub.Context.QUERY_SUBMISSION);
    +        pubSub.switchContext(PubSub.Context.QUERY_PROCESSING, newConfig);
    +        Assert.assertEquals(pubSub.getContext(), PubSub.Context.QUERY_PROCESSING);
             Assert.assertEquals(pubSub.getSubscriber().receive().getContent(), "foo");
         }
     }
    diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java
    index c23bfa87..c983ef60 100644
    --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java
    +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java
    @@ -111,7 +111,7 @@ public void testCloseDoesNotThrow() throws Exception {
         public void testMinWait() throws Exception {
             String message = new PubSubMessage("someID", "someContent").asJSON();
             CloseableHttpClient mockClient = mockClient(RESTPubSub.OK_200, message);
    -        RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 100, 3000);
    +        RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 600000, 3000);
     
             // First response should give content (2 events since we have 2 endpoints in the config)
             List messages = subscriber.getMessages();
    @@ -120,8 +120,8 @@ public void testMinWait() throws Exception {
             messages = subscriber.getMessages();
             Assert.assertEquals(messages.size(), 0);
     
    -        // After waiting it should return messages again
    -        Thread.sleep(150);
    +        // After waiting (mocking that by changing the lastRequest time), it should return messages again
    +        subscriber.setLastRequest(600000 * 2);
             messages = subscriber.getMessages();
             Assert.assertEquals(messages.size(), 2);
         }
    diff --git a/src/test/java/com/yahoo/bullet/querying/QuerierTest.java b/src/test/java/com/yahoo/bullet/querying/QuerierTest.java
    index ea01d822..812769d2 100644
    --- a/src/test/java/com/yahoo/bullet/querying/QuerierTest.java
    +++ b/src/test/java/com/yahoo/bullet/querying/QuerierTest.java
    @@ -10,6 +10,7 @@
     import com.yahoo.bullet.aggregations.Strategy;
     import com.yahoo.bullet.aggregations.grouping.GroupOperation;
     import com.yahoo.bullet.common.BulletConfig;
    +import com.yahoo.bullet.common.BulletConfigTest;
     import com.yahoo.bullet.common.BulletError;
     import com.yahoo.bullet.parsing.Aggregation;
     import com.yahoo.bullet.parsing.Clause;
    @@ -217,8 +218,8 @@ public void testDefaults() {
             Assert.assertFalse(querier.isDone());
             Assert.assertFalse(querier.isExceedingRateLimit());
             Assert.assertNull(querier.getRateLimitError());
    -        // RAW query without window and not timed out.
    -        Assert.assertFalse(querier.shouldBuffer());
    +        // RAW query without window should buffer
    +        Assert.assertTrue(querier.shouldBuffer());
             Assert.assertEquals(querier.getResult().getRecords(), emptyList());
         }
     
    @@ -677,7 +678,7 @@ public void testRawQueriesWithTimeWindowsAreNotChanged() {
             Querier querier = make(Querier.Mode.PARTITION, query, config);
     
             Assert.assertFalse(querier.isClosed());
    -        Assert.assertTrue(querier.shouldBuffer());
    +        Assert.assertFalse(querier.shouldBuffer());
     
             querier.consume(RecordBox.get().getRecord());
     
    @@ -702,7 +703,7 @@ public void testNonRawQueriesWithRecordWindowsAreErrors() {
         }
     
         @Test
    -    public void testRawQueriesWithoutWindowsThatAreClosedAreNotTimeBased() {
    +    public void testRawQueriesWithoutWindowsThatAreClosedAreRecordBased() {
             BulletConfig config = new BulletConfig();
             config.set(BulletConfig.RAW_AGGREGATION_MAX_SIZE, 10);
             config.validate();
    @@ -715,22 +716,22 @@ public void testRawQueriesWithoutWindowsThatAreClosedAreNotTimeBased() {
             Querier querier = make(Querier.Mode.ALL, query, config);
     
             Assert.assertFalse(querier.isClosed());
    -        Assert.assertFalse(querier.shouldBuffer());
    +        Assert.assertTrue(querier.shouldBuffer());
             Assert.assertEquals(querier.getWindow().getClass(), Basic.class);
     
             querier.consume(RecordBox.get().getRecord());
     
             Assert.assertFalse(querier.isClosed());
    -        Assert.assertFalse(querier.shouldBuffer());
    +        Assert.assertTrue(querier.shouldBuffer());
     
             IntStream.range(0, 9).forEach(i -> querier.consume(RecordBox.get().getRecord()));
     
             Assert.assertTrue(querier.isClosed());
    -        Assert.assertFalse(querier.shouldBuffer());
    +        Assert.assertTrue(querier.shouldBuffer());
         }
     
         @Test
    -    public void testRawQueriesWithoutWindowsThatAreTimedOutAreTimeBased() {
    +    public void testRawQueriesWithoutWindowsThatAreTimedOutAreStillRecordBased() {
             BulletConfig config = new BulletConfig();
             config.set(BulletConfig.RAW_AGGREGATION_MAX_SIZE, 10);
             config.validate();
    @@ -748,7 +749,7 @@ public void testRawQueriesWithoutWindowsThatAreTimedOutAreTimeBased() {
             querier.initialize();
     
             Assert.assertFalse(querier.isClosed());
    -        Assert.assertFalse(querier.shouldBuffer());
    +        Assert.assertTrue(querier.shouldBuffer());
             Assert.assertEquals(querier.getWindow().getClass(), Basic.class);
     
             IntStream.range(0, 9).forEach(i -> querier.consume(RecordBox.get().getRecord()));
    @@ -767,7 +768,7 @@ public void testAdditiveWindowsResetInPartitionMode() {
             querier.initialize();
     
             Assert.assertFalse(querier.isClosed());
    -        Assert.assertTrue(querier.shouldBuffer());
    +        Assert.assertFalse(querier.shouldBuffer());
             Assert.assertEquals(querier.getWindow().getClass(), AdditiveTumbling.class);
     
             IntStream.range(0, 10).forEach(i -> querier.consume(RecordBox.get().getRecord()));
    @@ -804,7 +805,7 @@ public void testAdditiveWindowsDoNotResetInAllMode() {
             querier.initialize();
     
             Assert.assertFalse(querier.isClosed());
    -        Assert.assertTrue(querier.shouldBuffer());
    +        Assert.assertFalse(querier.shouldBuffer());
             Assert.assertEquals(querier.getWindow().getClass(), AdditiveTumbling.class);
     
             IntStream.range(0, 10).forEach(i -> querier.consume(RecordBox.get().getRecord()));
    @@ -831,4 +832,53 @@ record = result.get(0);
             record = result.get(0);
             Assert.assertEquals(record.get(GroupOperation.GroupOperationType.COUNT.getName()), 25L);
         }
    +
    +    @Test
    +    public void testRestarting() throws Exception {
    +        BulletConfig config = new BulletConfig();
    +        config.set(BulletConfig.RESULT_METADATA_ENABLE, true);
    +        config.validate();
    +
    +        Query query = new Query();
    +        Aggregation aggregation = new Aggregation();
    +        aggregation.setType(Aggregation.Type.RAW);
    +        query.setAggregation(aggregation);
    +        query.setWindow(WindowUtils.makeWindow(Window.Unit.TIME, 1));
    +        query.configure(config);
    +        RunningQuery runningQuery = new RunningQuery("", query);
    +
    +        Querier querier = new Querier(Querier.Mode.ALL, runningQuery, config);
    +        querier.initialize();
    +
    +        querier.consume(RecordBox.get().getRecord());
    +        Assert.assertEquals(querier.getRecords().size(), 1);
    +
    +        long timeNow = System.currentTimeMillis();
    +        Meta meta = querier.getMetadata();
    +        Map mapping = BulletConfigTest.allMetadataAsMap();
    +        Map queryMeta = (Map) meta.asMap().get(mapping.get(Meta.Concept.QUERY_METADATA.getName()));
    +        Map windowMeta = (Map) meta.asMap().get(mapping.get(Meta.Concept.WINDOW_METADATA.getName()));
    +        Assert.assertEquals(windowMeta.get(mapping.get(Meta.Concept.WINDOW_NUMBER.getName())), 1L);
    +        long startTime = (Long) queryMeta.get(mapping.get(Meta.Concept.QUERY_RECEIVE_TIME.getName()));
    +        long windowEmitTime = (Long) windowMeta.get(mapping.get(Meta.Concept.WINDOW_EMIT_TIME.getName()));
    +        Assert.assertTrue(startTime <= timeNow);
    +        Assert.assertTrue(windowEmitTime >= timeNow);
    +
    +        Thread.sleep(1);
    +
    +        querier.restart();
    +        Assert.assertEquals(querier.getRecords().size(), 1);
    +        meta = querier.getMetadata();
    +        queryMeta = (Map) meta.asMap().get(mapping.get(Meta.Concept.QUERY_METADATA.getName()));
    +        Assert.assertEquals(windowMeta.get(mapping.get(Meta.Concept.WINDOW_NUMBER.getName())), 1L);
    +        long newStartTime = (Long) queryMeta.get(mapping.get(Meta.Concept.QUERY_RECEIVE_TIME.getName()));
    +        Assert.assertTrue(newStartTime > startTime);
    +
    +        querier.reset();
    +        meta = querier.getMetadata();
    +        windowMeta = (Map) meta.asMap().get(mapping.get(Meta.Concept.WINDOW_METADATA.getName()));
    +        long newEmitTime = (Long) windowMeta.get(mapping.get(Meta.Concept.WINDOW_EMIT_TIME.getName()));
    +        Assert.assertEquals(windowMeta.get(mapping.get(Meta.Concept.WINDOW_NUMBER.getName())), 2L);
    +        Assert.assertTrue(newEmitTime >  windowEmitTime);
    +    }
     }
    diff --git a/src/test/java/com/yahoo/bullet/windowing/AdditiveTumblingTest.java b/src/test/java/com/yahoo/bullet/windowing/AdditiveTumblingTest.java
    index b1922f93..d77664d7 100644
    --- a/src/test/java/com/yahoo/bullet/windowing/AdditiveTumblingTest.java
    +++ b/src/test/java/com/yahoo/bullet/windowing/AdditiveTumblingTest.java
    @@ -65,7 +65,7 @@ public void testClampingToMinimumEmit() {
             AdditiveTumbling additiveTumbling = make(1000, 5000);
             Assert.assertFalse(additiveTumbling.initialize().isPresent());
             // The window is what controls this so AdditiveTumbling has 5000 for the window size
    -        Assert.assertEquals(additiveTumbling.closeAfter, 5000L);
    +        Assert.assertEquals(additiveTumbling.windowLength, 5000L);
         }
     
         @Test
    @@ -110,8 +110,8 @@ public void testResetting() throws Exception {
             AdditiveTumbling additiveTumbling = make(1, 1);
             Assert.assertEquals(strategy.getResetCalls(), 0);
             Assert.assertFalse(additiveTumbling.initialize().isPresent());
    -        long originalStartedAt = additiveTumbling.startedAt;
    -        Assert.assertTrue(originalStartedAt >= started);
    +        long originalCloseTime = additiveTumbling.nextCloseTime;
    +        Assert.assertTrue(originalCloseTime >= started + 1);
     
             // Sleep to make sure it's 1 ms
             Thread.sleep(1);
    @@ -121,10 +121,10 @@ public void testResetting() throws Exception {
     
             long resetTime = System.currentTimeMillis();
             additiveTumbling.reset();
    -        long newStartedAt = additiveTumbling.startedAt;
    +        long newCloseTime = additiveTumbling.nextCloseTime;
             Assert.assertTrue(resetTime > started);
    -        Assert.assertTrue(newStartedAt > originalStartedAt);
    -        Assert.assertTrue(newStartedAt >= resetTime);
    +        Assert.assertTrue(newCloseTime >= originalCloseTime + 1);
    +        Assert.assertTrue(newCloseTime >= resetTime);
             // Aggregation should NOT have been reset
             Assert.assertEquals(strategy.getResetCalls(), 0);
         }
    @@ -136,8 +136,8 @@ public void testResettingInPartitionMode() throws Exception {
             AdditiveTumbling additiveTumbling = make(1, 1);
             Assert.assertEquals(strategy.getResetCalls(), 0);
             Assert.assertFalse(additiveTumbling.initialize().isPresent());
    -        long originalStartedAt = additiveTumbling.startedAt;
    -        Assert.assertTrue(originalStartedAt >= started);
    +        long originalCloseTime = additiveTumbling.nextCloseTime;
    +        Assert.assertTrue(originalCloseTime >= started + 1);
     
             // Sleep to make sure it's 1 ms
             Thread.sleep(1);
    @@ -147,10 +147,10 @@ public void testResettingInPartitionMode() throws Exception {
     
             long resetTime = System.currentTimeMillis();
             additiveTumbling.resetForPartition();
    -        long newStartedAt = additiveTumbling.startedAt;
    +        long newCloseTime = additiveTumbling.nextCloseTime;
             Assert.assertTrue(resetTime > started);
    -        Assert.assertTrue(newStartedAt > originalStartedAt);
    -        Assert.assertTrue(newStartedAt >= resetTime);
    +        Assert.assertTrue(newCloseTime >= originalCloseTime + 1);
    +        Assert.assertTrue(newCloseTime >= resetTime);
             // Aggregation should have been reset
             Assert.assertEquals(strategy.getResetCalls(), 1);
         }
    diff --git a/src/test/java/com/yahoo/bullet/windowing/TumblingTest.java b/src/test/java/com/yahoo/bullet/windowing/TumblingTest.java
    index 4bae7dd9..531c97fb 100644
    --- a/src/test/java/com/yahoo/bullet/windowing/TumblingTest.java
    +++ b/src/test/java/com/yahoo/bullet/windowing/TumblingTest.java
    @@ -65,7 +65,7 @@ public void testClampingToMinimumEmit() {
             Tumbling tumbling = make(1000, 5000);
             Assert.assertFalse(tumbling.initialize().isPresent());
             // The window is what controls this so Tumbling has 5000 for the window size
    -        Assert.assertEquals(tumbling.closeAfter, 5000L);
    +        Assert.assertEquals(tumbling.windowLength, 5000L);
         }
     
         @Test
    @@ -110,8 +110,8 @@ public void testResetting() throws Exception {
             Tumbling tumbling = make(1, 1);
             Assert.assertEquals(strategy.getResetCalls(), 0);
             Assert.assertFalse(tumbling.initialize().isPresent());
    -        long originalStartedAt = tumbling.startedAt;
    -        Assert.assertTrue(originalStartedAt >= started);
    +        long originalCloseTime = tumbling.nextCloseTime;
    +        Assert.assertTrue(originalCloseTime >= started + 1);
     
             // Sleep to make sure it's 1 ms
             Thread.sleep(1);
    @@ -121,10 +121,10 @@ public void testResetting() throws Exception {
     
             long resetTime = System.currentTimeMillis();
             tumbling.reset();
    -        long newStartedAt = tumbling.startedAt;
    +        long newCloseTime = tumbling.nextCloseTime;
             Assert.assertTrue(resetTime > started);
    -        Assert.assertTrue(newStartedAt > originalStartedAt);
    -        Assert.assertTrue(newStartedAt >= resetTime);
    +        Assert.assertTrue(newCloseTime >= originalCloseTime + 1);
    +        Assert.assertTrue(newCloseTime >= resetTime);
             Assert.assertEquals(strategy.getResetCalls(), 1);
         }
     
    @@ -135,8 +135,8 @@ public void testResettingForPartition() throws Exception {
             Tumbling tumbling = make(1, 1);
             Assert.assertEquals(strategy.getResetCalls(), 0);
             Assert.assertFalse(tumbling.initialize().isPresent());
    -        long originalStartedAt = tumbling.startedAt;
    -        Assert.assertTrue(originalStartedAt >= started);
    +        long originalCloseTime = tumbling.nextCloseTime;
    +        Assert.assertTrue(originalCloseTime >= started + 1);
     
             // Sleep to make sure it's 1 ms
             Thread.sleep(1);
    @@ -146,10 +146,10 @@ public void testResettingForPartition() throws Exception {
     
             long resetTime = System.currentTimeMillis();
             tumbling.resetForPartition();
    -        long newStartedAt = tumbling.startedAt;
    +        long newCloseTime = tumbling.nextCloseTime;
             Assert.assertTrue(resetTime > started);
    -        Assert.assertTrue(newStartedAt > originalStartedAt);
    -        Assert.assertTrue(newStartedAt >= resetTime);
    +        Assert.assertTrue(newCloseTime >= originalCloseTime + 1);
    +        Assert.assertTrue(newCloseTime >= resetTime);
             Assert.assertEquals(strategy.getResetCalls(), 1);
         }
     
    
    From ea98f7dbf22331a38ae71f428ab7509b76d98a97 Mon Sep 17 00:00:00 2001
    From: Akshai Sarma 
    Date: Mon, 9 Apr 2018 18:07:37 -0700
    Subject: [PATCH 2/5] Fixing javadocs
    
    ---
     src/main/java/com/yahoo/bullet/querying/Querier.java | 9 +++++----
     1 file changed, 5 insertions(+), 4 deletions(-)
    
    diff --git a/src/main/java/com/yahoo/bullet/querying/Querier.java b/src/main/java/com/yahoo/bullet/querying/Querier.java
    index 6a763d3a..72d9da9b 100644
    --- a/src/main/java/com/yahoo/bullet/querying/Querier.java
    +++ b/src/main/java/com/yahoo/bullet/querying/Querier.java
    @@ -109,7 +109,7 @@
      * if (metadata.hasSignal(Signal.KILL) || metadata.hasSignal(Signal.COMPLETE))
      *     remove Querier for id
      * else
    - *     create new Querier(id, queryBody, config) and initialize it (see note below regarding delaying start).
    + *     create new Querier(id, queryBody, config) and initialize it;
      * 
    * *
    Case 2: BulletRecord record
    @@ -123,7 +123,7 @@ * if (q.isClosed()) * emit(q.getData()) * q.reset() - * q.consume(record) + * q.consume(record) * if (q.isExceedingRateLimit()) * emit(q.getRateLimitError()) * remove q @@ -184,7 +184,8 @@ * {@link #isClosed()} and then {@link #reset()} for non time-based windows. You can use the negation of * {@link #shouldBuffer()} to find out if this kind of query can be started after a bit of delay. This delay will * ensure that results from the Filter phase always start. To aid you in doing this, you can put it in a buffer and - * use {@link #restart()} to mark the delayed start of the query. + * use {@link #restart()} to mark the delayed start of the query. If you do not do this, you should buffer all + * finished queries to wait for the results to arrive from the upstream Filter stage. See buffering below. * Similarly, some queries need to be buffered after they are {@link #isDone()} (these include queries that were not * delayed). You should only do this for queries for which {@link #shouldBuffer()} is true. * @@ -200,7 +201,7 @@ * return * try { * create new Querier(id, queryBody, config) - * initialize it and if errors present: + * initialize it (see note above regarding delaying start) and if errors present: * emit(Clip.of(Meta.of(errors.get())); * catch (Exception e) { * Clip clip = Clip.of(Meta.of(asList(BulletError.makeError(e, queryBody))) From ee831c3ea1b33fadc0bbfcae08de14c5ee35e5cc Mon Sep 17 00:00:00 2001 From: Akshai Sarma Date: Mon, 9 Apr 2018 18:17:25 -0700 Subject: [PATCH 3/5] Rewriting doc --- .../java/com/yahoo/bullet/querying/Querier.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/querying/Querier.java b/src/main/java/com/yahoo/bullet/querying/Querier.java index 72d9da9b..0c1c9d6c 100644 --- a/src/main/java/com/yahoo/bullet/querying/Querier.java +++ b/src/main/java/com/yahoo/bullet/querying/Querier.java @@ -181,13 +181,16 @@ * Optional Delayed start and End (recommended if you are processing event by event): Since data from the * Filter stage partitions may not arrive at the same time and since the querier may be {@link #isClosed()} at the * same time the data from the Filter stage partitions arrive, you should not immediately emit {@link #getResult()} if - * {@link #isClosed()} and then {@link #reset()} for non time-based windows. You can use the negation of + * {@link #isClosed()} and then {@link #reset()}. There are two ways to handle this. You could delay the start of the + * query by a bit in the Join stage so that windows from the Filter stage always arrive a bit earlier. Or you could + * buffer the results in the Join stage for a bit for each window as results trickle in. The issue with the latter + * approach is that you will slowly add the buffer time to the duration of your windows in your Join stage and + * eventually you will get two windows in one. The former approach does not have this problem. However, that approach + * could lead you to drop results for record based windows due to the result. To solve these, you should buffer + * the final results for all queries for whom {@link #shouldBuffer()} is true. And you can use the negation of * {@link #shouldBuffer()} to find out if this kind of query can be started after a bit of delay. This delay will - * ensure that results from the Filter phase always start. To aid you in doing this, you can put it in a buffer and - * use {@link #restart()} to mark the delayed start of the query. If you do not do this, you should buffer all - * finished queries to wait for the results to arrive from the upstream Filter stage. See buffering below. - * Similarly, some queries need to be buffered after they are {@link #isDone()} (these include queries that were not - * delayed). You should only do this for queries for which {@link #shouldBuffer()} is true. + * ensure that results from the Filter phase always start for time based windows. To aid you in doing this, you can + * buffer it and use {@link #restart()} to mark the delayed start of the query. * * * From 0484eacc34eb8aaabd672de4b5c137d43465136b Mon Sep 17 00:00:00 2001 From: Akshai Sarma Date: Mon, 9 Apr 2018 22:33:15 -0700 Subject: [PATCH 4/5] Removing unused method --- src/main/java/com/yahoo/bullet/querying/Querier.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/querying/Querier.java b/src/main/java/com/yahoo/bullet/querying/Querier.java index 0c1c9d6c..b8c9c41f 100644 --- a/src/main/java/com/yahoo/bullet/querying/Querier.java +++ b/src/main/java/com/yahoo/bullet/querying/Querier.java @@ -10,7 +10,6 @@ import com.yahoo.bullet.common.BulletConfig; import com.yahoo.bullet.common.BulletError; import com.yahoo.bullet.common.Monoidal; -import com.yahoo.bullet.parsing.Aggregation; import com.yahoo.bullet.parsing.Clause; import com.yahoo.bullet.parsing.Projection; import com.yahoo.bullet.parsing.Query; @@ -673,10 +672,6 @@ private boolean isLastWindow() { return window.getClass().equals(Basic.class); } - private boolean isRaw() { - return runningQuery.getQuery().getAggregation().getType() == Aggregation.Type.RAW; - } - private Meta getErrorMeta(Exception e) { return Meta.of(BulletError.makeError(e.getMessage(), TRY_AGAIN_LATER)); } From 53e9068d5fc748c1bc9f6b487f4cb4575563c904 Mon Sep 17 00:00:00 2001 From: Akshai Sarma Date: Tue, 10 Apr 2018 10:28:02 -0700 Subject: [PATCH 5/5] Docs update --- src/main/java/com/yahoo/bullet/querying/Querier.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/querying/Querier.java b/src/main/java/com/yahoo/bullet/querying/Querier.java index b8c9c41f..9759f475 100644 --- a/src/main/java/com/yahoo/bullet/querying/Querier.java +++ b/src/main/java/com/yahoo/bullet/querying/Querier.java @@ -185,11 +185,13 @@ * buffer the results in the Join stage for a bit for each window as results trickle in. The issue with the latter * approach is that you will slowly add the buffer time to the duration of your windows in your Join stage and * eventually you will get two windows in one. The former approach does not have this problem. However, that approach - * could lead you to drop results for record based windows due to the result. To solve these, you should buffer - * the final results for all queries for whom {@link #shouldBuffer()} is true. And you can use the negation of - * {@link #shouldBuffer()} to find out if this kind of query can be started after a bit of delay. This delay will - * ensure that results from the Filter phase always start for time based windows. To aid you in doing this, you can - * buffer it and use {@link #restart()} to mark the delayed start of the query. + * could lead to results that are sent immediately (for record based windows) being dropped while the delay is + * happening. To solve these issues, you should buffer the final results for all queries for whom {@link #shouldBuffer()} + * is true. This should be true for time-based windows and false for all record-based windows or queries with no + * window. So you can use the negation of {@link #shouldBuffer()} to find out if the latter queries can be delayed. + * This delay will ensure that results from the filter phase are collected in their entirety before emitting from the + * Join phase. To aid you in doing this, you can buffer it and use {@link #restart()} to mark the delayed start of the + * query. * * *