Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Standardize builder interfaces and remove Resources class (#2559)
Browse files Browse the repository at this point in the history
* add second useKryoSerializer option

* add setters for all configs in Builder

* add enum for prefix names

* use prefix name enum in streamlet class constructors

* change streamlet examples to use new builder

* remove trailing space caught by checkstyle

* fix checkstyle issues

* fix javadoc variable names to satisfy checkstyle

* fix starred import statement and remove unnecessary java List import

* reverse static and final in Builder class designation

* fix order of imports in FilesystemSinkTopology

* remove ByteAmount dependency from streamlet API

* fix example topology that uses ByteAmount

* add missing javadoc for methods
  • Loading branch information
lucperkins authored and srkukarni committed Nov 27, 2017
1 parent bd2056f commit e9aee44
Show file tree
Hide file tree
Showing 30 changed files with 221 additions and 241 deletions.
Expand Up @@ -99,7 +99,7 @@ public void cleanup() {
* at runtime
*/
public static void main(String[] args) throws Exception {
Builder processingGraphBuilder = Builder.createBuilder();
Builder processingGraphBuilder = Builder.newBuilder();

// Creates a temporary file to write output into.
File file = File.createTempFile("filesystem-sink-example", ".tmp");
Expand Down Expand Up @@ -127,7 +127,7 @@ public static void main(String[] args) throws Exception {
// argument (or else the default of 2 will be used).
int topologyParallelism = StreamletUtils.getParallelism(args, 2);

Config config = new Config.Builder()
Config config = Config.newBuilder()
.setNumContainers(topologyParallelism)
.build();

Expand Down
Expand Up @@ -84,7 +84,7 @@ String getDeviceId() {
}

public static void main(String[] args) throws Exception {
Builder processingGraphBuilder = Builder.createBuilder();
Builder processingGraphBuilder = Builder.newBuilder();

processingGraphBuilder
// The source streamlet is an indefinite series of sensor readings
Expand Down
Expand Up @@ -133,7 +133,7 @@ public String toString() {
* at runtime
*/
public static void main(String[] args) throws Exception {
Builder processingGraphBuilder = Builder.createBuilder();
Builder processingGraphBuilder = Builder.newBuilder();

// A KVStreamlet is produced. Each element is a KeyValue object where the key
// is the impression ID and the user ID is the value.
Expand Down
Expand Up @@ -19,7 +19,6 @@
import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
import com.twitter.heron.streamlet.Builder;
import com.twitter.heron.streamlet.Config;
import com.twitter.heron.streamlet.Resources;
import com.twitter.heron.streamlet.Runner;
import com.twitter.heron.streamlet.Streamlet;

Expand All @@ -44,7 +43,7 @@ private IntegerProcessingTopology() {
* at runtime
*/
public static void main(String[] args) throws Exception {
Builder builder = Builder.createBuilder();
Builder builder = Builder.newBuilder();

Streamlet<Integer> zeroes = builder.newSource(() -> 0);

Expand All @@ -58,14 +57,10 @@ public static void main(String[] args) throws Exception {
.setName("remove-twos")
.log();

Resources resources = new Resources.Builder()
.setCpu(CPU)
.setRamInGB(GIGABYTES_OF_RAM)
.build();

Config config = new Config.Builder()
Config config = Config.newBuilder()
.setNumContainers(NUM_CONTAINERS)
.setContainerResources(resources)
.setPerContainerRamInGigabytes(GIGABYTES_OF_RAM)
.setPerContainerCpu(CPU)
.build();

// Fetches the topology name from the first command-line argument
Expand Down
Expand Up @@ -77,7 +77,7 @@ private static List<Integer> repartitionStreamlet(int incomingInteger, int numPa
* at runtime
*/
public static void main(String[] args) throws Exception {
Builder processingGraphBuilder = Builder.createBuilder();
Builder processingGraphBuilder = Builder.newBuilder();

Streamlet<Integer> randomIntegers = processingGraphBuilder
.newSource(() -> {
Expand Down
Expand Up @@ -93,7 +93,7 @@ public void cleanup() {
* at runtime
*/
public static void main(String[] args) throws Exception {
Builder processingGraphBuilder = Builder.createBuilder();
Builder processingGraphBuilder = Builder.newBuilder();

/**
* A Pulsar source is constructed for a specific Pulsar installation, topic, and
Expand Down
Expand Up @@ -69,7 +69,7 @@ int getFeetRun() {
}

public static void main(String[] args) throws Exception {
Builder processingGraphBuilder = Builder.createBuilder();
Builder processingGraphBuilder = Builder.newBuilder();

processingGraphBuilder.newSource(SmartWatchReading::new)
.setName("incoming-watch-readings")
Expand Down
Expand Up @@ -124,7 +124,7 @@ public void cleanup() {
* at runtime
*/
public static void main(String[] args) throws Exception {
Builder processingGraphBuilder = Builder.createBuilder();
Builder processingGraphBuilder = Builder.newBuilder();

/**
* A supplier streamlet of random GameScore objects is cloned into two
Expand Down
Expand Up @@ -94,7 +94,7 @@ public void cleanup() {
* at runtime
*/
public static void main(String[] args) throws Exception {
Builder builder = Builder.createBuilder();
Builder builder = Builder.newBuilder();

/**
* The processing graph consists of a supplier streamlet that emits
Expand Down
Expand Up @@ -48,7 +48,7 @@ private WindowedWordCountTopology() {
);

public static void main(String[] args) throws Exception {
Builder processingGraphBuilder = Builder.createBuilder();
Builder processingGraphBuilder = Builder.newBuilder();

processingGraphBuilder
// The origin of the processing graph: an indefinite series of sentences chosen
Expand Down Expand Up @@ -82,7 +82,7 @@ public static void main(String[] args) throws Exception {
// argument (or else the default of 2 will be used).
int topologyParallelism = StreamletUtils.getParallelism(args, 2);

Config config = new Config.Builder()
Config config = Config.newBuilder()
.setNumContainers(topologyParallelism)
.build();

Expand Down
Expand Up @@ -142,7 +142,7 @@ private static boolean checkRequestAmount(WireRequest request) {
* at runtime
*/
public static void main(String[] args) throws Exception {
Builder builder = Builder.createBuilder();
Builder builder = Builder.newBuilder();

// Requests from the "quiet" bank branch (high throttling).
Streamlet<WireRequest> quietBranch = builder.newSource(() -> new WireRequest(20))
Expand Down Expand Up @@ -178,7 +178,7 @@ public static void main(String[] args) throws Exception {
.setName("all-branches-fraud-detect")
.log();

Config config = new Config.Builder()
Config config = Config.newBuilder()
.setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
.setNumContainers(2)
.build();
Expand Down
Expand Up @@ -22,7 +22,7 @@
* information to build the topology
*/
public interface Builder {
static Builder createBuilder() {
static Builder newBuilder() {
return new BuilderImpl();
}

Expand Down

0 comments on commit e9aee44

Please sign in to comment.