Skip to content

Commit

Permalink
Using PubSubMessageSerDe for Queries (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
0aix committed Jul 1, 2021
1 parent 67b709d commit 5e5ddbc
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 173 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-storm</artifactId>
<version>1.1.5-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>bullet-storm</name>

Expand Down Expand Up @@ -54,8 +54,8 @@
<storm.version>2.2.0</storm.version>
<storm.metrics.version>2.2.0</storm.metrics.version>
<jackson.core.version>2.10.2</jackson.core.version>
<bullet.core.version>1.3.1</bullet.core.version>
<bullet.dsl.version>1.1.6</bullet.dsl.version>
<bullet.core.version>1.4.2</bullet.core.version>
<bullet.dsl.version>1.1.7</bullet.dsl.version>
</properties>

<dependencies>
Expand Down
40 changes: 16 additions & 24 deletions src/main/java/com/yahoo/bullet/storm/FilterBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
*/
package com.yahoo.bullet.storm;

import com.yahoo.bullet.common.SerializerDeserializer;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.query.Query;
import com.yahoo.bullet.querying.Querier;
import com.yahoo.bullet.querying.QueryCategorizer;
import com.yahoo.bullet.querying.QueryManager;
Expand Down Expand Up @@ -117,32 +114,21 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

@Override
protected void initializeQuery(PubSubMessage message) {
initializeQuery(message.getId(), message.getContent(), message.getMetadata());
}

@Override
protected void removeQuery(String id) {
super.removeQuery(id);
manager.removeAndGetQuery(id);
}

private void onQuery(Tuple tuple) {
String id = tuple.getString(TopologyConstants.ID_POSITION);
byte[] queryData = (byte[]) tuple.getValue(TopologyConstants.QUERY_POSITION);
Metadata metadata = (Metadata) tuple.getValue(TopologyConstants.QUERY_METADATA_POSITION);
initializeQuery(id, queryData, metadata);
}

private void initializeQuery(String id, byte[] queryData, Metadata metadata) {
protected boolean hasQuery(String id) {
if (manager.hasQuery(id)) {
log.debug("Duplicate for request {}", id);
duplicatedCount++;
return;
return true;
}
return false;
}

@Override
protected void initializeQuery(PubSubMessage message) {
String id = message.getId();
try {
Query query = SerializerDeserializer.fromBytes(queryData);
Querier querier = createQuerier(Querier.Mode.PARTITION, id, query, metadata, config);
message = querySerDe.fromMessage(message);
Querier querier = createQuerier(Querier.Mode.PARTITION, id, message.getContentAsQuery(), message.getMetadata(), config);
manager.addQuery(id, querier);
log.info("Initialized query {} : {}", querier.getRunningQuery().getId(), querier.getRunningQuery().getQueryString());
log.debug("Initialized query {}", querier);
Expand All @@ -153,6 +139,12 @@ private void initializeQuery(String id, byte[] queryData, Metadata metadata) {
log.error("Failed to initialize query for request {}", id);
}

@Override
protected void removeQuery(String id) {
super.removeQuery(id);
manager.removeAndGetQuery(id);
}

private void onRecord(Tuple tuple) {
BulletRecord record = (BulletRecord) tuple.getValue(TopologyConstants.RECORD_POSITION);
handleCategorizedQueries(manager.categorize(record));
Expand Down
52 changes: 22 additions & 30 deletions src/main/java/com/yahoo/bullet/storm/JoinBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
package com.yahoo.bullet.storm;

import com.yahoo.bullet.common.BulletError;
import com.yahoo.bullet.common.SerializerDeserializer;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.query.Query;
import com.yahoo.bullet.querying.Querier;
import com.yahoo.bullet.querying.QueryCategorizer;
import com.yahoo.bullet.querying.RateLimitError;
Expand Down Expand Up @@ -146,33 +144,6 @@ private void onTick() {
emitReplayRequestIfNecessary();
}

private void onQuery(Tuple tuple) {
String id = tuple.getString(TopologyConstants.ID_POSITION);
byte[] queryData = (byte[]) tuple.getValue(TopologyConstants.QUERY_POSITION);
Metadata metadata = (Metadata) tuple.getValue(TopologyConstants.QUERY_METADATA_POSITION);
initializeQuery(id, queryData, metadata);
}

private void initializeQuery(String id, byte[] queryData, Metadata metadata) {
// bufferedMetadata has an entry for each query that exists in the JoinBolt; therefore, we check bufferedMetadata
// for existing queries (as opposed to individually checking the queries, preStartBuffer, and postFinishBuffer maps)
if (bufferedMetadata.containsKey(id)) {
log.debug("Duplicate for request {}", id);
metrics.updateCount(duplicatedQueriesCount, 1L);
return;
}
try {
Query query = SerializerDeserializer.fromBytes(queryData);
Querier querier = createQuerier(Querier.Mode.ALL, id, query, metadata, config);
setupQuery(id, metadata, querier);
return;
} catch (RuntimeException re) {
// Includes JSONParseException
emitErrorsAsResult(id, metadata, BulletError.makeError(re.toString(), "Error initializing query"));
}
log.error("Failed to initialize query for request {}", id);
}

private void onData(Tuple tuple) {
String id = tuple.getString(TopologyConstants.ID_POSITION);
Querier querier = getQuery(id);
Expand Down Expand Up @@ -310,9 +281,30 @@ private void emitMetaSignal(String id, Metadata.Signal signal) {

// Override hooks

@Override
protected boolean hasQuery(String id) {
// bufferedMetadata has an entry for each query that exists in the JoinBolt; therefore, we check bufferedMetadata
// for existing queries (as opposed to individually checking the queries, preStartBuffer, and postFinishBuffer maps)
if (bufferedMetadata.containsKey(id)) {
log.debug("Duplicate for request {}", id);
metrics.updateCount(duplicatedQueriesCount, 1L);
return true;
}
return false;
}

@Override
protected void initializeQuery(PubSubMessage message) {
initializeQuery(message.getId(), message.getContent(), message.getMetadata());
String id = message.getId();
try {
message = querySerDe.fromMessage(message);
Querier querier = createQuerier(Querier.Mode.ALL, id, message.getContentAsQuery(), message.getMetadata(), config);
setupQuery(id, message.getMetadata(), querier);
return;
} catch (RuntimeException re) {
emitErrorsAsResult(id, message.getMetadata(), BulletError.makeError(re.toString(), "Error initializing query"));
}
log.error("Failed to initialize query for request {}", id);
}

@Override
Expand Down
27 changes: 25 additions & 2 deletions src/main/java/com/yahoo/bullet/storm/QueryBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.pubsub.PubSubMessageSerDe;
import com.yahoo.bullet.query.Query;
import com.yahoo.bullet.querying.Querier;
import com.yahoo.bullet.querying.RunningQuery;
Expand All @@ -21,7 +22,6 @@

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static com.yahoo.bullet.storm.BulletStormConfig.REPLAY_BATCH_COMPRESS_ENABLE;
Expand All @@ -41,6 +41,7 @@ public abstract class QueryBolt extends ConfigComponent implements IRichBolt {
private static final long serialVersionUID = 4567140628827887965L;

protected transient BulletMetrics metrics;
protected transient PubSubMessageSerDe querySerDe;
protected transient OutputCollector collector;
protected transient TupleClassifier classifier;
protected transient String componentTaskID;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
componentTaskID = context.getThisComponentId() + HYPHEN + context.getThisTaskId();
// Enable built-in metrics
metrics = new BulletMetrics(config);
querySerDe = PubSubMessageSerDe.from(config);
startTimestamp = System.currentTimeMillis();
replayEnabled = config.getAs(REPLAY_ENABLE, Boolean.class);
replayBatchCompressEnable = config.getAs(REPLAY_BATCH_COMPRESS_ENABLE, Boolean.class);
Expand Down Expand Up @@ -122,6 +124,19 @@ private void handleReplaySignal() {
emitReplayRequest();
}

/**
* Handles a query message.
*
* @param tuple The query tuple.
*/
protected void onQuery(Tuple tuple) {
PubSubMessage message = (PubSubMessage) tuple.getValue(TopologyConstants.QUERY_POSITION);
if (hasQuery(message.getId())) {
return;
}
initializeQuery(querySerDe.toMessage(message));
}

/**
* Handles a batch message for query replay.
*
Expand Down Expand Up @@ -150,13 +165,21 @@ protected void onBatch(Tuple tuple) {
return;
}
removedIds.removeIf(batch.keySet()::remove);
batch.values().stream().filter(Objects::nonNull).forEach(this::initializeQuery);
batch.values().stream().filter(message -> message != null && !hasQuery(message.getId())).forEach(this::initializeQuery);
batchCount++;
replayedQueriesCount += batch.size();
lastReplayRequest = System.currentTimeMillis();
log.info("Initialized {} queries.", batch.size());
}

/**
* Whether or not the bolt has the given query id.
*
* @param id The query id.
* @return True if the bolt has the query and false otherwise.
*/
protected abstract boolean hasQuery(String id);

/**
* Initialize the query contained in the given {@link PubSubMessage}.
*
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/yahoo/bullet/storm/QuerySpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ public void nextTuple() {
log.info("Received {} signal. Relaying to downstream bolts.", Metadata.Signal.REPLAY);
}
} else if (message.hasContent()) {
collector.emit(QUERY_STREAM, new Values(id, message.getContent(), metadata), id);
collector.emit(QUERY_STREAM, new Values(id, message), id);
} else {
collector.emit(METADATA_STREAM, new Values(id, metadata), id);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(QUERY_STREAM, new Fields(ID_FIELD, QUERY_FIELD, METADATA_FIELD));
declarer.declareStream(QUERY_STREAM, new Fields(ID_FIELD, QUERY_FIELD));
declarer.declareStream(METADATA_STREAM, new Fields(ID_FIELD, METADATA_FIELD));
declarer.declareStream(REPLAY_STREAM, new Fields(ID_FIELD, REPLAY_TIMESTAMP_FIELD, REPLAY_ACK_FIELD));
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/yahoo/bullet/storm/ReplayBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ public void cleanup() {

private void onQuery(Tuple tuple) {
String id = tuple.getString(ID_POSITION);
byte[] queryData = (byte[]) tuple.getValue(QUERY_POSITION);
PubSubMessage message = (PubSubMessage) tuple.getValue(QUERY_POSITION);
// Batch manager does not add duplicate keys
batchManager.add(id, new PubSubMessage(id, queryData));
batchManager.add(id, message);
metrics.setCount(batchedQueriesCount, batchManager.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class TopologyConstants {

public static final int ID_POSITION = 0;
public static final int QUERY_POSITION = 1;
public static final int QUERY_METADATA_POSITION = 2;
public static final int METADATA_POSITION = 1;
public static final int ERROR_POSITION = 1;
public static final int RECORD_POSITION = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.common.RandomPool;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.pubsub.Publisher;
import com.yahoo.bullet.pubsub.Subscriber;
Expand Down Expand Up @@ -128,7 +129,7 @@ private Response handleException(Throwable throwable) {
private void handleResponse(String id, Response response) {
if (response == null || response.getStatusCode() != OK_200) {
log.error("Handling error for id {} with response {}", id, response);
responses.offer(new PubSubMessage(id, DRPCError.CANNOT_REACH_DRPC.asJSONClip(), null));
responses.offer(new PubSubMessage(id, DRPCError.CANNOT_REACH_DRPC.asJSONClip(), (Metadata) null));
return;
}
log.info("Received for id {}: {} {}", response.getStatusCode(), id, response.getStatusText());
Expand Down
Loading

0 comments on commit 5e5ddbc

Please sign in to comment.