Skip to content

Commit

Permalink
Merge 2c13d9d into 9fa50fa
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaisarma authored Apr 12, 2018
2 parents 9fa50fa + 2c13d9d commit a0edcf5
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 231 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<storm.version>1.0.2</storm.version>
<storm.metrics.version>1.0.2</storm.metrics.version>
<bullet.record.version>0.1.2</bullet.record.version>
<bullet.core.version>0.3.3</bullet.core.version>
<bullet.core.version>0.3.4</bullet.core.version>
<sketches.version>0.9.1</sketches.version>
</properties>

Expand Down
43 changes: 28 additions & 15 deletions src/main/java/com/yahoo/bullet/storm/BulletStormConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public class BulletStormConfig extends BulletConfig implements Serializable {
public static final String LOOP_BOLT_MEMORY_ON_HEAP_LOAD = "bullet.topology.loop.bolt.memory.on.heap.load";
public static final String LOOP_BOLT_MEMORY_OFF_HEAP_LOAD = "bullet.topology.loop.bolt.memory.off.heap.load";
public static final String TICK_SPOUT_INTERVAL = "bullet.topology.tick.spout.interval.ms";
public static final String JOIN_BOLT_QUERY_TICK_TIMEOUT = "bullet.topology.join.bolt.query.tick.timeout";
public static final String JOIN_BOLT_WINDOW_TICK_TIMEOUT = "bullet.topology.join.bolt.window.tick.timeout";
public static final String JOIN_BOLT_QUERY_POST_FINISH_BUFFER_TICKS = "bullet.topology.join.bolt.query.post.finish.buffer.ticks";
public static final String JOIN_BOLT_WINDOW_PRE_START_DELAY_TICKS = "bullet.topology.join.bolt.query.pre.start.delay.ticks";
public static final String LOOP_BOLT_PUBSUB_OVERRIDES = "bullet.topology.loop.bolt.pubsub.overrides";

// Defaults
Expand Down Expand Up @@ -99,8 +99,8 @@ public class BulletStormConfig extends BulletConfig implements Serializable {
public static final double DEFAULT_LOOP_BOLT_MEMORY_ON_HEAP_LOAD = 256.0;
public static final double DEFAULT_LOOP_BOLT_MEMORY_OFF_HEAP_LOAD = 160.0;
public static final int DEFAULT_TICK_SPOUT_INTERVAL = 100;
public static final int DEFAULT_JOIN_BOLT_QUERY_TICK_TIMEOUT = 5;
public static final int DEFAULT_JOIN_BOLT_WINDOW_TICK_TIMEOUT = 2;
public static final int DEFAULT_JOIN_BOLT_QUERY_POST_FINISH_BUFFER_TICKS = 3;
public static final int DEFAULT_JOIN_BOLT_QUERY_PRE_START_DELAY_TICKS = 2;
public static final Map<String, Object> DEFAULT_LOOP_BOLT_PUBSUB_OVERRIDES = Collections.emptyMap();

// Other constants
Expand All @@ -118,6 +118,8 @@ public class BulletStormConfig extends BulletConfig implements Serializable {
// The smallest value that Tick Interval can be
public static final int TICK_INTERVAL_MINIMUM = 10;

public static final double SMALLEST_WINDOW_MIN_EMIT_EVERY_MULTIPLE = 2.0;
public static final int PRE_START_DELAY_BUFFER_TICKS = 2;

public static final String DEFAULT_STORM_CONFIGURATION = "bullet_storm_defaults.yaml";

Expand Down Expand Up @@ -267,25 +269,30 @@ public class BulletStormConfig extends BulletConfig implements Serializable {
.defaultTo(DEFAULT_TICK_SPOUT_INTERVAL)
.castTo(Validator::asInt);

VALIDATOR.define(JOIN_BOLT_QUERY_TICK_TIMEOUT)
VALIDATOR.define(JOIN_BOLT_QUERY_POST_FINISH_BUFFER_TICKS)
.checkIf(Validator::isPositiveInt)
.defaultTo(DEFAULT_JOIN_BOLT_QUERY_TICK_TIMEOUT)
.defaultTo(DEFAULT_JOIN_BOLT_QUERY_POST_FINISH_BUFFER_TICKS)
.castTo(Validator::asInt);

VALIDATOR.define(JOIN_BOLT_WINDOW_TICK_TIMEOUT)
VALIDATOR.define(JOIN_BOLT_WINDOW_PRE_START_DELAY_TICKS)
.checkIf(Validator::isPositiveInt)
.defaultTo(DEFAULT_JOIN_BOLT_WINDOW_TICK_TIMEOUT)
.defaultTo(DEFAULT_JOIN_BOLT_QUERY_PRE_START_DELAY_TICKS)
.castTo(Validator::asInt);

VALIDATOR.define(LOOP_BOLT_PUBSUB_OVERRIDES)
.checkIf(Validator::isMap)
.checkIf(BulletStormConfig::isMapWithStringKeys)
.defaultTo(DEFAULT_LOOP_BOLT_PUBSUB_OVERRIDES);

VALIDATOR.relate("Minimum window emit every should be >= 2 * tick interval", BulletConfig.WINDOW_MIN_EMIT_EVERY, TICK_SPOUT_INTERVAL)
.checkIf(BulletStormConfig::isAtleastTwice);
VALIDATOR.relate("Built-in metrics enabled but no intervals provided", TOPOLOGY_METRICS_BUILT_IN_ENABLE, TOPOLOGY_METRICS_BUILT_IN_EMIT_INTERVAL_MAPPING)
VALIDATOR.relate("Built-in metrics enabled but no intervals provided", TOPOLOGY_METRICS_BUILT_IN_ENABLE,
TOPOLOGY_METRICS_BUILT_IN_EMIT_INTERVAL_MAPPING)
.checkIf(BulletStormConfig::areNeededIntervalsProvided);

VALIDATOR.evaluate("Minimum window emit every should be >= pre-start buffer delay + " + PRE_START_DELAY_BUFFER_TICKS + " ticks",
TICK_SPOUT_INTERVAL, BulletStormConfig.JOIN_BOLT_WINDOW_PRE_START_DELAY_TICKS,
BulletConfig.WINDOW_MIN_EMIT_EVERY)
.checkIf(BulletStormConfig::isStartDelayEnough)
.orFail();
}

/**
Expand Down Expand Up @@ -385,14 +392,20 @@ private static boolean isMapWithStringKeys(Object maybeMap) {
}
}

private static boolean isAtleastTwice(Object minEvery, Object tickInterval) {
return ((Number) minEvery).doubleValue() >= 2.0 * ((Number) tickInterval).doubleValue();
}

@SuppressWarnings("unchecked")
private static boolean areNeededIntervalsProvided(Object builtInEnable, Object intervalMapping) {
boolean enabled = (boolean) builtInEnable;
// return false when enabled and map is empty
return !(enabled && Utilities.isEmpty((Map<String, Number>) intervalMapping));
}

@SuppressWarnings("unchecked")
private static boolean isStartDelayEnough(List<Object> objects) {
int tickInterval = (Integer) objects.get(0);
int preStartDelayTicks = (Integer) objects.get(1);
int minEmitEvery = (Integer) objects.get(2);

return minEmitEvery >= tickInterval * (preStartDelayTicks + PRE_START_DELAY_BUFFER_TICKS);
}

}
143 changes: 72 additions & 71 deletions src/main/java/com/yahoo/bullet/storm/JoinBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public class JoinBolt extends QueryBolt {
private static final long serialVersionUID = 3312434064971532267L;

private transient Map<String, Metadata> bufferedMetadata;
// For doing a join between Queries and intermediate windows, if the windows are time based and arriving slowly.
private transient RotatingMap<String, Querier> bufferedQueries;
// For doing a join between expired queries and final windows, if query has expired and last data is arriving slowly.
private transient RotatingMap<String, Querier> bufferedWindows;
// For buffering queries for their final windows or results, if the query windows are record based or have no windows.
private transient RotatingMap<String, Querier> postFinishBuffer;
// For buffering queries initially before they are restarted to offset windows if they are time based.
private transient RotatingMap<String, Querier> preStartBuffer;

// Variable
private transient AbsoluteCountMetric activeQueriesCount;
Expand All @@ -66,11 +66,11 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll

bufferedMetadata = new HashMap<>();

int windowTickOut = config.getAs(BulletStormConfig.JOIN_BOLT_WINDOW_TICK_TIMEOUT, Integer.class);
bufferedWindows = new RotatingMap<>(windowTickOut);
int preStartDelayTicks = config.getAs(BulletStormConfig.JOIN_BOLT_WINDOW_PRE_START_DELAY_TICKS, Integer.class);
preStartBuffer = new RotatingMap<>(preStartDelayTicks);

int queryTickOut = config.getAs(BulletStormConfig.JOIN_BOLT_QUERY_TICK_TIMEOUT, Integer.class);
bufferedQueries = new RotatingMap<>(queryTickOut);
int postFinishBufferTicks = config.getAs(BulletStormConfig.JOIN_BOLT_QUERY_POST_FINISH_BUFFER_TICKS, Integer.class);
postFinishBuffer = new RotatingMap<>(postFinishBufferTicks);

if (metricsEnabled) {
activeQueriesCount = registerAbsoluteCountMetric(TopologyConstants.ACTIVE_QUERIES_METRIC, context);
Expand Down Expand Up @@ -116,17 +116,17 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

private void onTick() {
// Force emit all the done queries or closed queries that are being rotated out.
Map<String, Querier> forceDone = bufferedQueries.rotate();
// Force emit all the done queries queries that are being rotated out.
Map<String, Querier> forceDone = postFinishBuffer.rotate();
forceDone.entrySet().forEach(this::emitFinished);
// The active queries count is not updated for these since these queries are not in any map, so do it here
updateCount(activeQueriesCount, -forceDone.size());

// Close all the buffered windows and re-add them back to queries
Map<String, Querier> forceClosed = bufferedWindows.rotate();
forceClosed.entrySet().forEach(this::emitBufferedWindow);
// Start all the delayed queries and add them to queries
Map<String, Querier> delayed = preStartBuffer.rotate();
delayed.entrySet().forEach(this::startDelayed);

// Categorize all in queries in non-partition mode and do the roll over or emit as necessary
// Categorize all the active queries in non-partition mode and do the buffering or emit as necessary
handleCategorizedQueries(new QueryCategorizer().categorize(queries));
}

Expand Down Expand Up @@ -166,7 +166,7 @@ private void onData(Tuple tuple) {
} else if (querier.isExceedingRateLimit()) {
emitRateLimitError(id, querier, querier.getRateLimitError());
} else if (querier.isClosed()) {
emitOrBufferWindow(id, querier);
emitWindow(id, querier);
}
}

Expand All @@ -175,7 +175,7 @@ private void onError(Tuple tuple) {
Querier querier = getQuery(id);
if (querier == null) {
log.debug("Received error for {} without the query existing", id);
// TODO Might later create this query whose error ignored here. This is a leak.
// TODO Might later create this query if it is received late but whose error was ignored here. This is a leak.
return;
}
RateLimitError error = (RateLimitError) tuple.getValue(TopologyConstants.ERROR_POSITION);
Expand All @@ -190,10 +190,10 @@ private void handleCategorizedQueries(QueryCategorizer category) {
rateLimited.entrySet().forEach(this::emitRateLimitError);

Map<String, Querier> closed = category.getClosed();
closed.entrySet().forEach(this::emitOrBufferWindow);
closed.entrySet().forEach(this::emitWindow);

log.debug("Done: {}, Rate limited: {}, Closed: {}, Pending Windows: {}, Pending Done: {}, Active: {}", done.size(),
rateLimited.size(), closed.size(), bufferedWindows.size(), bufferedQueries.size(), queries.size());
log.debug("Done: {}, Rate limited: {}, Closed: {}, Starting delayed: {}, Buffered finished: {}, Active: {}",
done.size(), rateLimited.size(), closed.size(), preStartBuffer.size(), postFinishBuffer.size(), queries.size());
}

// RESULT_STREAM and METADATA_STREAM emitters
Expand All @@ -219,22 +219,23 @@ private void emitOrBufferFinished(Map.Entry<String, Querier> query) {
}

private void emitOrBufferFinished(String id, Querier querier) {
// If we shouldn't buffer, then emit it and return.
/*
* Three cases:
* 1) If we shouldn't buffer, then emit it and return. If it was being delayed and somehow finished, it is
* cleaned up and removed. There should be no query that needs delaying AND buffering.
* 2) If the query became closed after it finished (wherever it is), we emit it. We should still honor isClosed.
* 3) If it should buffer and it isn't closed, postFinishBuffer it till it becomes closed or ticks emit it.
*/
if (!querier.shouldBuffer()) {
log.debug("Emitting query since it shouldn't be buffered {}", id);
emitFinished(id, querier);
return;
}
// If in queries, move it to bufferedQueries. If in bufferedWindows, move it to bufferedQueries.
// Otherwise (in bufferedQueries, do nothing). Tick will emit it.
if (queries.containsKey(id)) {
log.debug("Starting to buffer while waiting for more final results for query {}...", id);
rotateInto(id, querier, bufferedQueries);
} else if (bufferedWindows.containsKey(id)) {
log.debug("Query window was being buffered while query became done. Moving it to finished query buffer");
bufferedWindows.remove(id);
rotateInto(id, querier, bufferedQueries);
} else {
log.debug("Continuing to buffer query {} till tick while waiting for more final results...", id);
} else if (querier.isClosed()) {
log.debug("Emitting query since it finished but this is the last window for it {}", id);
emitFinished(id, querier);
} else if (queries.containsKey(id)) {
log.debug("Starting to buffer while waiting for more final results for query {}", id);
queries.remove(id);
postFinishBuffer.put(id, querier);
}
}

Expand All @@ -251,32 +252,17 @@ private void emitFinished(String id, Querier querier) {

// RESULT_STREAM emitters

private void emitOrBufferWindow(Map.Entry<String, Querier> query) {
emitOrBufferWindow(query.getKey(), query.getValue());
}

private void emitOrBufferWindow(String id, Querier querier) {
// If not in queries - in bufferedWindows (bufferedQueries not possible since isDone was false), do nothing.
// Tick will emit it. If in queries, rotate into bufferedWindows. Otherwise, emit and reset.
if (!queries.containsKey(id)) {
log.debug("Continuing to buffer query {} till tick while waiting for more windows...", id);
} else if (querier.shouldBuffer()) {
log.debug("Starting to buffer while waiting for more windows for query {}...", id);
rotateInto(id, querier, bufferedWindows);
} else {
log.debug("Emitting window for {} and resetting...", id);
emitResult(id, bufferedMetadata.get(id), querier.getResult());
querier.reset();
}
private void emitWindow(Map.Entry<String, Querier> query) {
emitWindow(query.getKey(), query.getValue());
}

private void emitBufferedWindow(Map.Entry<String, Querier> query) {
String id = query.getKey();
Querier querier = query.getValue();
private void emitWindow(String id, Querier querier) {
// No matter where it is - emit and reset.
log.debug("Emitting window for {} and resetting...", id);
emitResult(id, bufferedMetadata.get(id), querier.getResult());
querier.reset();
bufferedWindows.remove(id);
queries.put(id, querier);
// We should not receive window for queries in the pre-start buffer because those are only time-based windowed
// queries that the config ensures have a minimum emit time greater than the pre-start delay.
}

private void emitErrorsAsResult(String id, Metadata metadata, BulletError... errors) {
Expand Down Expand Up @@ -304,33 +290,48 @@ private void emitMetaSignal(String id, Metadata.Signal signal) {

@Override
protected void setupQuery(String id, String query, Metadata metadata, Querier querier) {
super.setupQuery(id, query, metadata, querier);
bufferedMetadata.put(id, metadata);
updateCount(createdQueriesCount, 1L);
updateCount(activeQueriesCount, 1L);
bufferedMetadata.put(id, metadata);
// If the query should be post-finish buffered, it should not be pre-start delayed.
if (querier.shouldBuffer()) {
queries.put(id, querier);
updateCount(activeQueriesCount, 1L);
log.info("Received and started query {}", querier.toString());
} else {
preStartBuffer.put(id, querier);
log.info("Received but delaying starting query {}", id);
}
}

@Override
protected void removeQuery(String id) {
if (getQuery(id) != null) {
// Only update count if query was in queries or postFinishBuffer.
if (queries.containsKey(id) || postFinishBuffer.containsKey(id)) {
updateCount(activeQueriesCount, -1L);
}
super.removeQuery(id);
bufferedWindows.remove(id);
bufferedQueries.remove(id);
postFinishBuffer.remove(id);
bufferedMetadata.remove(id);
// It should not be in the preStartBuffer under normal operations but could be if it was killed.
preStartBuffer.remove(id);
}

// Other helpers

private void rotateInto(String id, Querier querier, RotatingMap<String, Querier> into) {
// Make sure it's not in queries
queries.remove(id);
into.put(id, querier);
private void startDelayed(Map.Entry<String, Querier> query) {
String id = query.getKey();
Querier querier = query.getValue();
preStartBuffer.remove(id);
// Make the query start again to mark the correct start for the query.
querier.restart();
queries.put(id, querier);
// Now it's active
updateCount(activeQueriesCount, 1L);
log.info("Started delayed query {}", id);
}

private Metadata withSignal(Metadata metadata, Metadata.Signal signal) {
// Don't change the non-readonly bits of metadata in place since that might affect tuples emitted but pending
// Don't change the non-readonly bits of metadata in place since that might affect tuples emitted but pending.
Metadata copy = new Metadata(signal, null);
if (metadata != null) {
copy.setContent(metadata.getContent());
Expand All @@ -339,15 +340,15 @@ private Metadata withSignal(Metadata metadata, Metadata.Signal signal) {
}

private Querier getQuery(String id) {
// JoinBolt has three places where the query might be
// JoinBolt has two regular places where the query might be.
Querier query = queries.get(id);
if (query == null) {
log.debug("Query might be buffered: {}", id);
query = bufferedWindows.get(id);
log.debug("Query might be done: {}", id);
query = postFinishBuffer.get(id);
}
if (query == null) {
log.debug("Query might be done: {}", id);
query = bufferedQueries.get(id);
log.debug("Fetching delayed query {}", id);
query = preStartBuffer.get(id);
}
return query;
}
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/com/yahoo/bullet/storm/drpc/DRPCConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public class DRPCConfig extends BulletStormConfig {
.defaultTo(DEFAULT_DRPC_MAX_UNCOMMITED_MESSAGES)
.castTo(Validator::asInt);

// This throws a RunTimeException if windowing is not disabled because we do not want to proceed.
VALIDATOR.relate("Windowing is not disabled", BulletConfig.WINDOW_DISABLE, BulletConfig.WINDOW_DISABLE)
.checkIf((mustBeTrue, ignored) -> failIfNotTrue(mustBeTrue));
VALIDATOR.evaluate("Windowing should be disabled when using the DRPC PubSub", BulletConfig.WINDOW_DISABLE)
.checkIf(DRPCConfig::windowIsDisabled)
.orFail();
}

/**
Expand Down Expand Up @@ -136,11 +136,12 @@ private static boolean isStringPositiveInteger(Object entry) {
}
}

private static boolean failIfNotTrue(Object mustBeTrue) {
Boolean windowDisabled = (Boolean) mustBeTrue;
@SuppressWarnings("unchecked")
private static boolean windowIsDisabled(Object keyList) {
Boolean windowDisabled = (Boolean) ((List<Object>) (keyList)).get(0);
if (!windowDisabled) {
log.error("DRPC does not support windowing. You must set {} to true", BulletConfig.WINDOW_DISABLE);
throw new RuntimeException("DRPC does not support windowing. Disable windows first.");
return false;
}
return true;
}
Expand Down
Loading

0 comments on commit a0edcf5

Please sign in to comment.