From 75365a2b6feeb1ec8e577704896e6fb26273cc06 Mon Sep 17 00:00:00 2001 From: Angus Helm Date: Mon, 17 Apr 2017 14:59:03 -0500 Subject: [PATCH 1/2] Add support for configuring memory load and cpu load in Flux --- .../org/apache/storm/flux/FluxBuilder.java | 29 +++++++++++++++++-- .../apache/storm/flux/model/VertexDef.java | 27 +++++++++++++++++ .../src/test/resources/configs/tck.yaml | 6 ++++ 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java index 77b89f92f9f..ffe882b62db 100644 --- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java @@ -18,13 +18,12 @@ package org.apache.storm.flux; import org.apache.storm.Config; +import org.apache.storm.flux.model.*; import org.apache.storm.generated.StormTopology; import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.topology.*; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; -import org.apache.storm.flux.api.TopologySource; -import org.apache.storm.flux.model.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -188,6 +187,18 @@ private static void buildStreamDefinitions(ExecutionContext context, TopologyBui boltObj.getClass().getName()); } + BoltDef boltDef = topologyDef.getBoltDef(stream.getTo()); + if (boltDef.getOnHeapMemoryLoad() > -1) { + if (boltDef.getOffHeapMemoryLoad() > -1) { + declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad()); + } else { + declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad()); + } + } + if (boltDef.getCpuLoad() > -1) { + declarer.setCPULoad(boltDef.getCpuLoad()); + } + GroupingDef grouping = stream.getGrouping(); // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId()); @@ -358,7 +369,19 @@ private static void buildSpouts(ExecutionContext context, TopologyBuilder builde NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchFieldException { for (SpoutDef sd : context.getTopologyDef().getSpouts()) { IRichSpout spout = buildSpout(sd, context); - builder.setSpout(sd.getId(), spout, sd.getParallelism()); + SpoutDeclarer declarer = builder.setSpout(sd.getId(), spout, sd.getParallelism()); + + if (sd.getOnHeapMemoryLoad() > -1) { + if (sd.getOffHeapMemoryLoad() > -1) { + declarer.setMemoryLoad(sd.getOnHeapMemoryLoad(), sd.getOffHeapMemoryLoad()); + } else { + declarer.setMemoryLoad(sd.getOnHeapMemoryLoad()); + } + } + if (sd.getCpuLoad() > -1) { + declarer.setCPULoad(sd.getCpuLoad()); + } + context.addSpout(sd.getId(), spout); } } diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java index e71bcc21ce6..e76f04b202b 100644 --- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java @@ -25,6 +25,9 @@ public abstract class VertexDef extends BeanDef { // default parallelism to 1 so if it's ommitted, the topology will still function. private int parallelism = 1; + private int onHeapMemoryLoad = -1; + private int offHeapMemoryLoad = -1; + private int cpuLoad = -1; public int getParallelism() { return parallelism; @@ -33,4 +36,28 @@ public int getParallelism() { public void setParallelism(int parallelism) { this.parallelism = parallelism; } + + public int getOnHeapMemoryLoad() { + return onHeapMemoryLoad; + } + + public void setOnHeapMemoryLoad(int onHeapMemoryLoad) { + this.onHeapMemoryLoad = onHeapMemoryLoad; + } + + public int getOffHeapMemoryLoad() { + return offHeapMemoryLoad; + } + + public void setOffHeapMemoryLoad(int offHeapMemoryLoad) { + this.offHeapMemoryLoad = offHeapMemoryLoad; + } + + public int getCpuLoad() { + return cpuLoad; + } + + public void setCpuLoad(int cpuLoad) { + this.cpuLoad = cpuLoad; + } } diff --git a/external/flux/flux-core/src/test/resources/configs/tck.yaml b/external/flux/flux-core/src/test/resources/configs/tck.yaml index 5d40445a9fe..67b739c8700 100644 --- a/external/flux/flux-core/src/test/resources/configs/tck.yaml +++ b/external/flux/flux-core/src/test/resources/configs/tck.yaml @@ -51,6 +51,9 @@ spouts: - id: "spout-1" className: "org.apache.storm.testing.TestWordSpout" parallelism: 1 + onHeapMemoryLoad: 100 + offHeapMemoryLoad: 100 + cpuLoad: 100 # ... # bolt definitions @@ -58,6 +61,9 @@ bolts: - id: "bolt-1" className: "org.apache.storm.testing.TestWordCounter" parallelism: 1 + onHeapMemoryLoad: 100 + offHeapMemoryLoad: 100 + cpuLoad: 100 # ... - id: "bolt-2" From e554c6a5c5dfe6693820c3f09df44bdf84f78f6e Mon Sep 17 00:00:00 2001 From: Angus Helm Date: Mon, 24 Apr 2017 09:33:23 -0500 Subject: [PATCH 2/2] travis