Skip to content

Commit

Permalink
Bullet Core 0.3.3 (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaisarma committed Mar 30, 2018
1 parent d250bde commit a180c14
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 38 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-storm</artifactId>
<version>0.8.1-SNAPSHOT</version>
<version>0.8.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>bullet-storm</name>

Expand Down Expand Up @@ -39,7 +39,7 @@
<storm.version>1.0.2</storm.version>
<storm.metrics.version>1.0.2</storm.metrics.version>
<bullet.record.version>0.1.2</bullet.record.version>
<bullet.core.version>0.3.1</bullet.core.version>
<bullet.core.version>0.3.3</bullet.core.version>
<sketches.version>0.9.1</sketches.version>
</properties>

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/yahoo/bullet/storm/FilterBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void onQuery(Tuple tuple) {
String query = tuple.getString(TopologyConstants.QUERY_POSITION);

try {
Querier querier = createQuerier(id, query, config);
Querier querier = createQuerier(Querier.Mode.PARTITION, id, query, config);
if (!querier.initialize().isPresent()) {
setupQuery(id, query, null, querier);
return;
Expand All @@ -104,12 +104,12 @@ private void onQuery(Tuple tuple) {

private void onRecord(Tuple tuple) {
BulletRecord record = (BulletRecord) tuple.getValue(TopologyConstants.RECORD_POSITION);
handleCategorizedQueries(new QueryCategorizer(Querier::isClosedForPartition).categorize(record, queries));
handleCategorizedQueries(new QueryCategorizer().categorize(record, queries));
}

private void onTick() {
// Categorize queries in partition mode.
handleCategorizedQueries(new QueryCategorizer(Querier::isClosedForPartition).categorize(queries));
handleCategorizedQueries(new QueryCategorizer().categorize(queries));
}

private void handleCategorizedQueries(QueryCategorizer category) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/yahoo/bullet/storm/JoinBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private void onTick() {
forceClosed.entrySet().forEach(this::emitBufferedWindow);

// Categorize all in queries in non-partition mode and do the roll over or emit as necessary
handleCategorizedQueries(new QueryCategorizer(Querier::isClosed).categorize(queries));
handleCategorizedQueries(new QueryCategorizer().categorize(queries));
}

private void onQuery(Tuple tuple) {
Expand All @@ -137,7 +137,7 @@ private void onQuery(Tuple tuple) {

Querier querier;
try {
querier = createQuerier(id, query, config);
querier = createQuerier(Querier.Mode.ALL, id, query, config);
Optional<List<BulletError>> optionalErrors = querier.initialize();
if (!optionalErrors.isPresent()) {
setupQuery(id, query, metadata, querier);
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/yahoo/bullet/storm/QueryBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ protected Metadata onMeta(Tuple tuple) {
/**
* Exposed for testing only. Create a {@link Querier} from the given query ID, body and configuration.
*
* @param mode The {@link Querier.Mode} to use to create the instance.
* @param id The ID for the query.
* @param query The actual query JSON body.
* @param config The configuration to use for the query.
* @return A created, uninitialized instance of a querier or a RuntimeException if there were issues.
*/
protected Querier createQuerier(String id, String query, BulletConfig config) {
return new Querier(id, query, config);
protected Querier createQuerier(Querier.Mode mode, String id, String query, BulletConfig config) {
return new Querier(mode, id, query, config);
}

/**
Expand Down
21 changes: 3 additions & 18 deletions src/main/java/com/yahoo/bullet/storm/QueryCategorizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,20 @@

import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;

/**
* This categorizes running queries into whether they are done, closed or have exceeded the rate limits. Running queries
* are provided as a {@link Map} of String query IDs to non-null, valid, initialized {@link Querier} objects.
*
* When creating this, you provide a {@link Predicate} on {@link Querier}, which is used to test if the Querier is
* closed or not. Use {@link #categorize(Map)} and {@link #categorize(BulletRecord, Map)}for categorizing such queries.
* The latter categorizes after making the Querier instances {@link Querier#consume(BulletRecord)}.
* Use {@link #categorize(Map)} and {@link #categorize(BulletRecord, Map)}for categorizing queries. The latter
* categorizes after making the Querier instances {@link Querier#consume(BulletRecord)}.
*/
@Getter @Slf4j
public class QueryCategorizer {
private Map<String, Querier> rateLimited = new HashMap<>();
private Map<String, Querier> closed = new HashMap<>();
private Map<String, Querier> done = new HashMap<>();

private final Predicate<Querier> isClosed;

/**
* The constructor for the categorizer that takes in a function that determines if a query is closed.
*
* @param isClosed A {@link Predicate} used to test if the {@link Querier} is closed. If the data the querier is
* is seeing is partitioned, you should use {@link Querier#isClosedForPartition()} otherwise you
* use {@link Querier#isClosed()}.
*/
public QueryCategorizer(Predicate<Querier> isClosed) {
this.isClosed = isClosed;
}

/**
* Categorize the given {@link Map} of query IDs to {@link Querier} instances.
*
Expand Down Expand Up @@ -74,7 +59,7 @@ private void classify(Map.Entry<String, Querier> query) {
done.put(id, querier);
} else if (querier.isExceedingRateLimit()) {
rateLimited.put(id, querier);
} else if (isClosed.test(querier)) {
} else if (querier.isClosed()) {
closed.put(id, querier);
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/test/java/com/yahoo/bullet/storm/FilterBoltTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private static class NoQueryFilterBolt extends FilterBolt {
}

@Override
protected Querier createQuerier(String id, String query, BulletConfig config) {
protected Querier createQuerier(Querier.Mode mode, String id, String query, BulletConfig config) {
return null;
}
}
Expand All @@ -122,8 +122,8 @@ private DonableFilterBolt(int recordsConsumed, int ticksConsumed, BulletStormCon
}

@Override
protected Querier createQuerier(String id, String query, BulletConfig config) {
Querier spied = spy(super.createQuerier(id, query, config));
protected Querier createQuerier(Querier.Mode mode, String id, String query, BulletConfig config) {
Querier spied = spy(super.createQuerier(mode, id, query, config));
List<Boolean> answers = IntStream.range(0, doneAfter).mapToObj(i -> false)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
answers.add(true);
Expand All @@ -143,8 +143,8 @@ private RateLimitedFilterBolt(int recordsConsumed, RateLimitError error, BulletS
}

@Override
protected Querier createQuerier(String id, String query, BulletConfig config) {
Querier spied = spy(super.createQuerier(id, query, config));
protected Querier createQuerier(Querier.Mode mode, String id, String query, BulletConfig config) {
Querier spied = spy(super.createQuerier(mode, id, query, config));
List<Boolean> answers = IntStream.range(0, limitedAfter).mapToObj(i -> false)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
answers.add(true);
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/com/yahoo/bullet/storm/JoinBoltTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ public DonableJoinBolt(BulletStormConfig config, int doneAfter, boolean shouldBu
}

@Override
protected Querier createQuerier(String id, String query, BulletConfig config) {
protected Querier createQuerier(Querier.Mode mode, String id, String query, BulletConfig config) {
// Each new querier will be done doneAfter more times than the last querier
Querier spied = spy(super.createQuerier(id, query, config));
Querier spied = spy(super.createQuerier(mode, id, query, config));
List<Boolean> doneAnswers = IntStream.range(0, doneAfter).mapToObj(i -> false)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
doneAnswers.add(true);
Expand All @@ -133,9 +133,9 @@ public ClosableJoinBolt(BulletStormConfig config, int closeAfter, boolean should
}

@Override
protected Querier createQuerier(String id, String query, BulletConfig config) {
protected Querier createQuerier(Querier.Mode mode, String id, String query, BulletConfig config) {
// Each new querier will be done doneAfter more times than the last querier
Querier spied = spy(super.createQuerier(id, query, config));
Querier spied = spy(super.createQuerier(mode, id, query, config));
List<Boolean> closeAnswers = IntStream.range(0, closeAfter).mapToObj(i -> false)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
closeAnswers.add(true);
Expand All @@ -155,8 +155,8 @@ private RateLimitedJoinBolt(int limitedAfter, RateLimitError error, BulletStormC
}

@Override
protected Querier createQuerier(String id, String query, BulletConfig config) {
Querier spied = spy(super.createQuerier(id, query, config));
protected Querier createQuerier(Querier.Mode mode, String id, String query, BulletConfig config) {
Querier spied = spy(super.createQuerier(mode, id, query, config));
List<Boolean> answers = IntStream.range(0, limitedAfter).mapToObj(i -> false)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
answers.add(true);
Expand Down

0 comments on commit a180c14

Please sign in to comment.