Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package com.yahoo.bullet.pubsub.rest;

import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubException;
import com.yahoo.bullet.pubsub.PubSubMessage;
import lombok.AccessLevel;
import lombok.Getter;
Expand Down Expand Up @@ -34,7 +33,7 @@ public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String re
}

@Override
public void send(PubSubMessage message) throws PubSubException {
public void send(PubSubMessage message) {
// Put responseURL in the metadata so the ResponsePublisher knows to which host to send the response
Metadata metadata = message.getMetadata();
metadata = metadata == null ? new Metadata() : metadata;
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,28 @@
package com.yahoo.bullet.pubsub.rest;

import com.yahoo.bullet.pubsub.BufferingSubscriber;
import com.yahoo.bullet.pubsub.PubSubException;
import com.yahoo.bullet.pubsub.PubSubMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;

import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@Slf4j
public class RESTSubscriber extends BufferingSubscriber {
@Getter(AccessLevel.PACKAGE)
private List<String> urls;
private CloseableHttpClient client;
private long minWait;
@Setter(AccessLevel.PACKAGE)
private long lastRequest;
private int connectTimeout;

Expand All @@ -37,6 +38,7 @@ public class RESTSubscriber extends BufferingSubscriber {
* @param urls The URLs which will be used to make the http request.
* @param client The client to use to make http requests.
* @param minWait The minimum time (ms) to wait between subsequent http requests.
* @param connectTimeout The minimum time (ms) to wait for a connection to be made.
*/
public RESTSubscriber(int maxUncommittedMessages, List<String> urls, CloseableHttpClient client, long minWait, int connectTimeout) {
super(maxUncommittedMessages);
Expand All @@ -48,7 +50,7 @@ public RESTSubscriber(int maxUncommittedMessages, List<String> urls, CloseableHt
}

@Override
public List<PubSubMessage> getMessages() throws PubSubException {
public List<PubSubMessage> getMessages() {
List<PubSubMessage> messages = new ArrayList<>();
long currentTime = System.currentTimeMillis();
if (currentTime - lastRequest <= minWait) {
Expand Down
65 changes: 37 additions & 28 deletions src/main/java/com/yahoo/bullet/querying/Querier.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.common.BulletError;
import com.yahoo.bullet.common.Monoidal;
import com.yahoo.bullet.parsing.Aggregation;
import com.yahoo.bullet.parsing.Clause;
import com.yahoo.bullet.parsing.Projection;
import com.yahoo.bullet.parsing.Query;
Expand Down Expand Up @@ -123,7 +122,7 @@
* if (q.isClosed())
* emit(q.getData())
* q.reset()
* q.consume(record)
* q.consume(record)
* if (q.isExceedingRateLimit())
* emit(q.getRateLimitError())
* remove q
Expand Down Expand Up @@ -178,28 +177,35 @@
* data volume is too low.
* </li>
* <li>
* <em>Optional but recommended if you are processing event by event not microbatches</em>: Since data from the Filter stage
* partitions may not arrive at the same time and since the querier may be {@link #isClosed()} at the same time the
* data from the Filter stage partitions arrive, you should not immediately emit {@link #getResult()} if
* {@link #isClosed()} and then {@link #reset()} for <em>non time-based windows</em>. You should put it in a buffer
* for a little bit of time while the data arrives if {@link #isClosed()}, then {@link #getResult()} and
* {@link #reset()}. Similarly, for {@link #isDone()}. You should only do this for queries for which
* {@link #shouldBuffer()} is true. For record based windows, you can use {@link #isClosed()} to drive the
* emission of the results.
* <em>Optional Delayed start and End (recommended if you are processing event by event)</em>: Since data from the
* Filter stage partitions may not arrive at the same time and since the querier may be {@link #isClosed()} at the
* same time the data from the Filter stage partitions arrive, you should not immediately emit {@link #getResult()} if
* {@link #isClosed()} and then {@link #reset()}. There are two ways to handle this. You could delay the start of the
* query by a bit in the Join stage so that windows from the Filter stage always arrive a bit earlier. Or you could
* buffer the results in the Join stage for a bit for each window as results trickle in. The issue with the latter
* approach is that you will slowly add the buffer time to the duration of your windows in your Join stage and
* eventually you will get two windows in one. The former approach does not have this problem. However, that approach
* could lead to results that are sent immediately (for record based windows) being dropped while the delay is
* happening. To solve these issues, you should buffer the final results for all queries for whom {@link #shouldBuffer()}
* is true. This should be true for time-based windows and false for all record-based windows or queries with no
* window. So you can use the negation of {@link #shouldBuffer()} to find out if the latter queries can be delayed.
* This delay will ensure that results from the filter phase are collected in their entirety before emitting from the
* Join phase. To aid you in doing this, you can buffer it and use {@link #restart()} to mark the delayed start of the
* query.
* </li>
* </ol>
*
* <h4>Pseudo Code</h4>
*
* <h5>Case 1: Query</h5>
* <h5>Case 1: Query</h5>
* <pre>
* (String id, String queryBody, Metadata metadata) = Query
* if (metadata.hasSignal(Signal.KILL))
* remove Querier for id
* return
* try {
* create new Querier(id, queryBody, config)
* initialize it and if errors present:
* initialize it (see note above regarding delaying start) and if errors present:
* emit(Clip.of(Meta.of(errors.get()));
* catch (Exception e) {
* Clip clip = Clip.of(Meta.of(asList(BulletError.makeError(e, queryBody)))
Expand Down Expand Up @@ -393,6 +399,19 @@ public Optional<List<BulletError>> initialize() {
return window.initialize();
}

/**
* Forces a restart of a valid query (must have previously called {@link #initialize()}) to mark the
* correct start of this object if it was previously created but delayed in starting it (by using the negation of
* {@link #shouldBuffer()}. You might be using this if you were delaying the start of the query in the
* Join phase. This does not revalidate the query or reset any data this might have already consumed.
*/
public void restart() {
// Only timestamps are in RunningQuery and Scheme.
// For RunningQuery, we just need to fix the start time.
runningQuery.start();
window.initialize();
}

/**
* Consume a {@link BulletRecord} for this query. The record may or not be actually incorporated into the query
* results. This depends on whether the query can accept more data, if it is expired or not or if the record matches
Expand Down Expand Up @@ -577,22 +596,16 @@ public RateLimitError getRateLimitError() {
}

/**
* Returns if this query should buffer before emitting results. You can use this to wait for the final results
* for your window (or your final results if you have no window) in your Join or Combine stage after a query is
* {@link #isDone()} or {@link #isClosed()}.
* Returns if this query should buffer before emitting the final results. You can use this to wait for the final
* results in your Join or Combine stage after a query is {@link #isDone()}.
*
* @return A boolean that is true if the query results should be buffered.
* @return A boolean that is true if the query results should be buffered in the Join phase.
*/
public boolean shouldBuffer() {
Window window = runningQuery.getQuery().getWindow();
boolean noWindow = window == null;
// If it's a RAW query without a window, it should be buffered if and only if it timed out. This means that the
// query is not yet done. So this tells the driver to buffer the query to wait for more potential results.
if (noWindow && isRaw()) {
return runningQuery.isTimedOut();
}
// No window (and not raw) is a duration based query => do buffer. Otherwise, buffer if the window is time based.
return noWindow || window.isTimeBased();
boolean noWindow = window == null;
// Only buffer if there is no window (including Raw) or if it's a record based window.
return noWindow || !window.isTimeBased();
}

/**
Expand Down Expand Up @@ -661,10 +674,6 @@ private boolean isLastWindow() {
return window.getClass().equals(Basic.class);
}

private boolean isRaw() {
return runningQuery.getQuery().getAggregation().getType() == Aggregation.Type.RAW;
}

private Meta getErrorMeta(Exception e) {
return Meta.of(BulletError.makeError(e.getMessage(), TRY_AGAIN_LATER));
}
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/com/yahoo/bullet/querying/RunningQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ public class RunningQuery implements Initializable {
@Getter
private final String id;
@Getter
private final long startTime;
@Getter
private final Query query;

@Getter
private long startTime;
private String queryString;

/**
Expand All @@ -52,11 +52,11 @@ public RunningQuery(String id, String queryString, BulletConfig config) {
RunningQuery(String id, Query query) {
this.id = id;
this.query = query;
startTime = System.currentTimeMillis();
}

@Override
public Optional<List<BulletError>> initialize() {
start();
return query.initialize();
}

Expand All @@ -75,4 +75,11 @@ public boolean isTimedOut() {
// Never add to query.getDuration since it can be infinite (Long.MAX_VALUE)
return System.currentTimeMillis() - startTime >= query.getDuration();
}

/**
* Exposed for package only. Starts the query.
*/
void start() {
startTime = System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public AdditiveTumbling(Strategy aggregation, Window window, BulletConfig config

@Override
public void reset() {
startedAt = System.currentTimeMillis();
nextCloseTime = nextCloseTime + windowLength;
windowCount++;
}

Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/yahoo/bullet/windowing/Tumbling.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
public class Tumbling extends Basic {
public static final String NAME = "Tumbling";

protected long startedAt;
protected long closeAfter;
protected long nextCloseTime;
protected long windowLength;

/**
* Creates an instance of this windowing scheme with the provided {@link Strategy} and {@link BulletConfig}.
Expand All @@ -39,27 +39,27 @@ public Tumbling(Strategy aggregation, Window window, BulletConfig config) {
public Optional<List<BulletError>> initialize() {
// Window is initialized so every is present and positive.
Number every = Utilities.getCasted(window.getEmit(), Window.EMIT_EVERY_FIELD, Number.class);
closeAfter = every.longValue();
startedAt = System.currentTimeMillis();
windowLength = every.longValue();
nextCloseTime = System.currentTimeMillis() + windowLength;
return Optional.empty();
}

@Override
protected Map<String, Object> getMetadata(Map<String, String> metadataKeys) {
Map<String, Object> meta = super.getMetadata(metadataKeys);
addIfNonNull(meta, metadataKeys, Meta.Concept.WINDOW_EXPECTED_EMIT_TIME, () -> this.startedAt + this.closeAfter);
addIfNonNull(meta, metadataKeys, Meta.Concept.WINDOW_EXPECTED_EMIT_TIME, () -> this.nextCloseTime);
return meta;
}

@Override
public void reset() {
super.reset();
startedAt = System.currentTimeMillis();
nextCloseTime = nextCloseTime + windowLength;
}

@Override
public boolean isClosed() {
return System.currentTimeMillis() >= startedAt + closeAfter;
return System.currentTimeMillis() >= nextCloseTime;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static com.yahoo.bullet.TestHelpers.assertJSONEquals;

public class BulletConfigTest {
private static Map<String, String> allMetadataAsMap() {
public static Map<String, String> allMetadataAsMap() {
Map<String, String> meta = new HashMap<>();
for (Map<String, String> m : BulletConfig.DEFAULT_RESULT_METADATA_METRICS) {
meta.put(m.get(BulletConfig.RESULT_METADATA_METRICS_CONCEPT_KEY), m.get(BulletConfig.RESULT_METADATA_METRICS_NAME_KEY));
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ public void testSwitchingContext() throws PubSubException {
Assert.assertTrue(pubSub.getSubscriber().receive().getContent().isEmpty());

// No switch
pubSub.switchContext(PubSub.Context.QUERY_PROCESSING, new BulletConfig());
Assert.assertEquals(pubSub.getContext(), PubSub.Context.QUERY_PROCESSING);
pubSub.switchContext(PubSub.Context.QUERY_SUBMISSION, new BulletConfig());
Assert.assertEquals(pubSub.getContext(), PubSub.Context.QUERY_SUBMISSION);
Assert.assertTrue(pubSub.getSubscriber().receive().getContent().isEmpty());

// Switch
BulletConfig newConfig = new BulletConfig("src/test/resources/test_config.yaml");
newConfig.set(MockPubSub.MOCK_MESSAGE_NAME, "foo");
pubSub.switchContext(PubSub.Context.QUERY_SUBMISSION, newConfig);
Assert.assertEquals(pubSub.getContext(), PubSub.Context.QUERY_SUBMISSION);
pubSub.switchContext(PubSub.Context.QUERY_PROCESSING, newConfig);
Assert.assertEquals(pubSub.getContext(), PubSub.Context.QUERY_PROCESSING);
Assert.assertEquals(pubSub.getSubscriber().receive().getContent(), "foo");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testCloseDoesNotThrow() throws Exception {
public void testMinWait() throws Exception {
String message = new PubSubMessage("someID", "someContent").asJSON();
CloseableHttpClient mockClient = mockClient(RESTPubSub.OK_200, message);
RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 100, 3000);
RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 600000, 3000);

// First response should give content (2 events since we have 2 endpoints in the config)
List<PubSubMessage> messages = subscriber.getMessages();
Expand All @@ -120,8 +120,8 @@ public void testMinWait() throws Exception {
messages = subscriber.getMessages();
Assert.assertEquals(messages.size(), 0);

// After waiting it should return messages again
Thread.sleep(150);
// After waiting (mocking that by changing the lastRequest time), it should return messages again
subscriber.setLastRequest(600000 * 2);
messages = subscriber.getMessages();
Assert.assertEquals(messages.size(), 2);
}
Expand Down
Loading