Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRPC #37

Merged
merged 15 commits into from
Oct 18, 2017
Merged
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## Introduction

This project implements Bullet on [Storm](http://storm.apache.org).
This project implements Bullet on [Storm](http://storm.apache.org). It also includes the PubSub implementation that uses Storm DRPC as the PubSub.

## Documentation

Expand Down
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<storm.version>1.0.2</storm.version>
<storm.metrics.version>1.0.2</storm.metrics.version>
<bullet.record.version>0.1.2</bullet.record.version>
<bullet.core.version>0.2.2</bullet.core.version>
<bullet.core.version>0.2.5</bullet.core.version>
<sketches.version>0.9.1</sketches.version>
</properties>

Expand Down Expand Up @@ -109,6 +109,11 @@
<artifactId>sketches-core</artifactId>
<version>${sketches.version}</version>
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.0.37</version>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
Expand All @@ -124,11 +129,6 @@
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>net.java.dev</groupId>
<artifactId>jvyaml</artifactId>
<version>0.2.1</version>
</dependency>
</dependencies>

<build>
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/yahoo/bullet/storm/AbsoluteCountMetric.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.storm;

import lombok.NoArgsConstructor;
Expand Down
30 changes: 20 additions & 10 deletions src/main/java/com/yahoo/bullet/storm/BulletStormConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.yahoo.bullet.Config;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -47,6 +46,12 @@ public class BulletStormConfig extends BulletConfig {
public static final String RESULT_BOLT_MEMORY_OFF_HEAP_LOAD = "bullet.topology.result.bolt.memory.off.heap.load";
public static final String TICK_INTERVAL_SECS = "bullet.topology.tick.interval.secs";

// Used automatically by the Storm code. Not for user setting.
// This is the key to place the Storm configuration as
public static final String STORM_CONFIG = "bullet.topology.storm.config";
// This is the key to place the TopologyContext as
public static final String STORM_CONTEXT = "bullet.topology.storm.context";

public static Set<String> TOPOLOGY_SUBMISSION_SETTINGS =
new HashSet<>(asList(QUERY_SPOUT_PARALLELISM, QUERY_SPOUT_CPU_LOAD, QUERY_SPOUT_MEMORY_ON_HEAP_LOAD,
QUERY_SPOUT_MEMORY_OFF_HEAP_LOAD, FILTER_BOLT_PARALLELISM, FILTER_BOLT_CPU_LOAD,
Expand All @@ -62,16 +67,21 @@ public class BulletStormConfig extends BulletConfig {
* Constructor that loads specific file augmented with defaults.
*
* @param file YAML file to load.
* @throws IOException if an error occurred with the file loading.
*/
public BulletStormConfig(String file) throws IOException {
// Load Bullet defaults
super(null);
// Load Bullet Storm settings
Config stormDefaults = new Config(file, DEFAULT_STORM_CONFIGURATION);
// Merge Storm settings onto Bullet defaults
merge(stormDefaults);
log.info("Bullet Storm merged settings:\n {}", getAll(Optional.empty()));
public BulletStormConfig(String file) {
this(new Config(file));
}

/**
* Constructor that loads the defaults and augments it with defaults.
*
* @param other The other config to wrap.
*/
public BulletStormConfig(Config other) {
// Load Bullet and Storm defaults. Then merge the other.
super(DEFAULT_STORM_CONFIGURATION);
merge(other);
log.info("Merged settings:\n {}", getAll(Optional.empty()));
}

/**
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/yahoo/bullet/storm/JoinBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,7 @@ private void updateCount(AbsoluteCountMetric metric, long updateValue) {
}

private AbsoluteCountMetric registerAbsoluteCountMetric(String name, TopologyContext context) {
Number interval = metricsIntervalMapping.getOrDefault(name,
metricsIntervalMapping.get(DEFAULT_BUILT_IN_METRICS_INTERVAL_KEY));
Number interval = metricsIntervalMapping.getOrDefault(name, metricsIntervalMapping.get(DEFAULT_BUILT_IN_METRICS_INTERVAL_KEY));
log.info("Registered {} with interval {}", name, interval);
return context.registerMetric(name, new AbsoluteCountMetric(), interval.intValue());
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/yahoo/bullet/storm/QueryBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public abstract class QueryBolt<Q extends AbstractQuery> implements IRichBolt {

/**
* Constructor that accepts the tick interval.
*
* @param tickInterval The tick interval in seconds.
*/
public QueryBolt(Integer tickInterval) {
Expand Down Expand Up @@ -79,6 +80,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll

/**
* Retires queries that are active past the tick time.
*
* @return The map of query ids to queries that were retired.
*/
protected Map<String, Q> retireQueries() {
Expand All @@ -93,6 +95,7 @@ protected Map<String, Q> retireQueries() {

/**
* Initializes a query from a query tuple.
*
* @param tuple The query tuple with the query to initialize.
* @return The created query.
*/
Expand All @@ -111,6 +114,7 @@ protected Q initializeQuery(Tuple tuple) {

/**
* Gets the default tick configuration to be used.
*
* @return A Map configuration containing the default tick configuration.
*/
public Map<String, Object> getDefaultTickConfiguration() {
Expand Down
35 changes: 28 additions & 7 deletions src/main/java/com/yahoo/bullet/storm/QuerySpout.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.storm;

import com.yahoo.bullet.pubsub.PubSub;
import com.yahoo.bullet.pubsub.PubSubException;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.pubsub.Subscriber;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
Expand All @@ -23,23 +30,36 @@ public class QuerySpout extends BaseRichSpout {
public static final String QUERY_FIELD = "query";
public static final String METADATA_FIELD = "metadata";

private PubSub pubSub;
private BulletStormConfig config;
private Subscriber subscriber;
private SpoutOutputCollector collector;

/** Exposed for testing only. */
@Getter(AccessLevel.PACKAGE)
private PubSub pubSub;

/**
* Creates a QuerySpout and passes in a {@link PubSub}.
* Creates a QuerySpout with a passed in {@link BulletStormConfig}.
*
* @param pubSub PubSub to get a {@link Subscriber} from
* @param config The BulletStormConfig to create the PubSub from.
*/
public QuerySpout(PubSub pubSub) {
this.pubSub = pubSub;
public QuerySpout(BulletStormConfig config) {
this.config = config;
}

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// Add the Storm Config and the context as is, in case any PubSubs need it.
config.set(BulletStormConfig.STORM_CONFIG, conf);
config.set(BulletStormConfig.STORM_CONTEXT, context);

try {
this.pubSub = PubSub.from(config);
this.subscriber = pubSub.getSubscriber();
} catch (PubSubException e) {
throw new RuntimeException("Cannot create PubSub instance or a Subscriber for it.", e);
}
this.collector = collector;
this.subscriber = pubSub.getSubscriber();
}

@Override
Expand All @@ -51,8 +71,9 @@ public void nextTuple() {
log.error(e.getMessage());
}
if (message != null) {
// TODO: No need for two streams. Just send a unified Query stream. JoinBolt needs to not do that join.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"JoinBolt need not do that join."

Also, maybe call message.getId() just once and save to value to avoid calling the function 4 times?

collector.emit(QUERY_STREAM, new Values(message.getId(), message.getContent()), message.getId());
collector.emit(METADATA_STREAM, new Values(message.getId(), message.getMetadata()));
collector.emit(METADATA_STREAM, new Values(message.getId(), message.getMetadata()), message.getId());
} else {
Utils.sleep(1);
}
Expand Down
35 changes: 27 additions & 8 deletions src/main/java/com/yahoo/bullet/storm/ResultBolt.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.storm;

import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSub;
import com.yahoo.bullet.pubsub.PubSubException;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.pubsub.Publisher;
import com.yahoo.bullet.pubsub.Metadata;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
Expand All @@ -16,23 +23,35 @@

@Slf4j
public class ResultBolt extends BaseRichBolt {
private PubSub pubSub;
private Publisher publisher;
private OutputCollector collector;
private BulletStormConfig config;

/** Exposed for testing only. */
@Getter(AccessLevel.PACKAGE)
private Publisher publisher;

/**
* Creates a ResultBolt and passes in a {@link PubSub}.
* Creates a ResultBolt and passes in a {@link BulletStormConfig}.
*
* @param pubSub PubSub to get a {@link Publisher} from
* @param config The BulletStormConfig to create PubSub from.
*/
public ResultBolt(PubSub pubSub) {
this.pubSub = pubSub;
public ResultBolt(BulletStormConfig config) {
this.config = config;
}

@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
// Add the Storm Config and the context as is, in case any PubSubs need it.
config.set(BulletStormConfig.STORM_CONFIG, conf);
config.set(BulletStormConfig.STORM_CONTEXT, context);

try {
PubSub pubSub = PubSub.from(config);
this.publisher = pubSub.getPublisher();
} catch (PubSubException e) {
throw new RuntimeException("Cannot create PubSub instance or a Publisher for it.", e);
}
this.collector = collector;
this.publisher = pubSub.getPublisher();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.storm;

import org.apache.storm.Config;
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/com/yahoo/bullet/storm/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package com.yahoo.bullet.storm;

import com.yahoo.bullet.pubsub.PubSub;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -125,9 +124,7 @@ public static void submit(BulletStormConfig config, String recordComponent, Topo

Integer tickInterval = ((Number) config.get(BulletStormConfig.TICK_INTERVAL_SECS)).intValue();

PubSub pubSub = PubSub.from(config);

builder.setSpout(TopologyConstants.QUERY_COMPONENT, new QuerySpout(pubSub), querySpoutParallelism)
builder.setSpout(TopologyConstants.QUERY_COMPONENT, new QuerySpout(config), querySpoutParallelism)
.setCPULoad(querySpoutCPULoad)
.setMemoryLoad(querySpoutMemoryOnHeapLoad, querySpoutMemoryOffHeapLoad);

Expand All @@ -145,7 +142,7 @@ public static void submit(BulletStormConfig config, String recordComponent, Topo
.setCPULoad(joinBoltCPULoad)
.setMemoryLoad(joinBoltMemoryOnHeapLoad, joinBoltMemoryOffHeapLoad);

builder.setBolt(TopologyConstants.RESULT_COMPONENT, new ResultBolt(pubSub), resultBoltParallelism)
builder.setBolt(TopologyConstants.RESULT_COMPONENT, new ResultBolt(config), resultBoltParallelism)
.shuffleGrouping(TopologyConstants.JOIN_COMPONENT, TopologyConstants.JOIN_STREAM)
.setCPULoad(resultBoltCPULoad)
.setMemoryLoad(resultBoltMemoryOnHeapLoad, resultBoltMemoryOffHeapLoad);
Expand Down Expand Up @@ -254,4 +251,3 @@ public static void main(String[] args) throws Exception {
submit(bulletStormConfig, TopologyConstants.RECORD_COMPONENT, builder);
}
}

53 changes: 53 additions & 0 deletions src/main/java/com/yahoo/bullet/storm/drpc/DRPCConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.storm.drpc;

import com.yahoo.bullet.Config;
import com.yahoo.bullet.storm.BulletStormConfig;

public class DRPCConfig extends BulletStormConfig {
public static final String PREFIX = "bullet.pubsub.storm.drpc.";

// The location of DRPC servers.
public static final String DRPC_SERVERS = PREFIX + "servers";
// This is the name of the DRPC function used to register with the DRPC servers
public static final String DRPC_FUNCTION = PREFIX + "function";

// HTTP configuration
// The timeout and retry limits for HTTP connections to DRPC servers.
public static final String DRPC_HTTP_CONNECT_TIMEOUT_MS = PREFIX + "http.connect.timeout.ms";
public static final String DRPC_HTTP_CONNECT_RETRY_LIMIT = PREFIX + "http.connect.retry.limit";
// This is the HTTP protocol to use when submitting to the DRPC server.
public static final String DRPC_HTTP_PROTOCOL = PREFIX + "http.protocol";
// This is the port that the QUERY_SUBMISSION end talks to.
public static final String DRPC_HTTP_PORT = PREFIX + "http.port";
// The path that queries must be POSTed to. This generally is "drpc".
public static final String DRPC_HTTP_PATH = PREFIX + "http.path";


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want two blank lines here?

// This is the maximum number of pending queries that can be read by a single subscriber in QUERY_PROCESSING
// before a commit is needed.
public static final String DRPC_MAX_UNCOMMITED_MESSAGES = PREFIX + "max.uncommitted.messages";

/**
* Create a new DRPCConfig by reading in a file.
*
* @param file The file containing DRPC settings.
*/
public DRPCConfig(String file) {
// Load and merge with default bullet-storm settings. Storm defaults also contain the DRPC settings.
this(new BulletStormConfig(file));
}

/**
* Creates a new DRPCConfig wrapping the given config.
*
* @param config The config to wrap.
*/
public DRPCConfig(Config config) {
super(config);
}
}
Loading