From a1912eafab7f4313c7f45f27748fff28cb37d8f0 Mon Sep 17 00:00:00 2001 From: Dan Blanchard Date: Mon, 6 Jul 2015 16:45:29 -0400 Subject: [PATCH 1/2] Minor type cleanup things in storm.task.TopologyContext.java --- .../backtype/storm/task/TopologyContext.java | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java index 81246511564..b3c6d220917 100644 --- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java +++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java @@ -29,6 +29,7 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -36,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.commons.lang.NotImplementedException; import org.json.simple.JSONValue; @@ -61,7 +63,7 @@ public TopologyContext(StormTopology topology, Map stormConf, Map> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List workerTasks, Map defaultResources, - Map userResources, Map executorData, Map registeredMetrics, + Map userResources, Map executorData, Map>> registeredMetrics, clojure.lang.Atom openOrPrepareWasCalled) { super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, @@ -154,11 +156,11 @@ public Fields getThisOutputFields(String streamId) { } /** - * Gets the declared output fields for the specified stream id for the + * Gets the declared output fields for all streams for the * component this task is a part of. */ public Map> getThisOutputFieldsForStreams() { - Map> streamToFields = new HashMap>(); + Map> streamToFields = new HashMap<>(); for (String stream : this.getThisStreams()) { streamToFields.put(stream, this.getThisOutputFields(stream).toList()); } @@ -178,7 +180,7 @@ public Set getThisStreams() { * accesses which resource in a distributed resource to ensure an even distribution. */ public int getThisTaskIndex() { - List tasks = new ArrayList(getComponentTasks(getThisComponentId())); + List tasks = new ArrayList<>(getComponentTasks(getThisComponentId())); Collections.sort(tasks); for(int i=0; i getHooks() { } private static Map groupingToJSONableMap(Grouping grouping) { - Map groupingMap = new HashMap(); + Map groupingMap = new HashMap<>(); groupingMap.put("type", grouping.getSetField().toString()); if (grouping.is_set_fields()) { groupingMap.put("fields", grouping.get_fields()); @@ -242,18 +244,18 @@ private static Map groupingToJSONableMap(Grouping grouping) { @Override public String toJSONString() { - Map obj = new HashMap(); + Map obj = new HashMap<>(); obj.put("task->component", this.getTaskToComponent()); obj.put("taskid", this.getThisTaskId()); obj.put("componentid", this.getThisComponentId()); - List streamList = new ArrayList(); + List streamList = new ArrayList<>(); streamList.addAll(this.getThisStreams()); obj.put("streams", streamList); obj.put("stream->outputfields", this.getThisOutputFieldsForStreams()); // Convert targets to a JSON serializable format - Map stringTargets = new HashMap(); + Map> stringTargets = new HashMap<>(); for (Map.Entry> entry : this.getThisTargets().entrySet()) { - Map stringTargetMap = new HashMap(); + Map stringTargetMap = new HashMap<>(); for (Map.Entry innerEntry : entry.getValue().entrySet()) { stringTargetMap.put(innerEntry.getKey(), groupingToJSONableMap(innerEntry.getValue())); } @@ -261,12 +263,12 @@ public String toJSONString() { } obj.put("stream->target->grouping", stringTargets); // Convert sources to a JSON serializable format - Map> stringSources = new HashMap>(); + Map> stringSources = new HashMap<>(); for (Map.Entry entry : this.getThisSources().entrySet()) { GlobalStreamId gid = entry.getKey(); Map stringSourceMap = stringSources.get(gid.get_componentId()); if (stringSourceMap == null) { - stringSourceMap = new HashMap(); + stringSourceMap = new HashMap<>(); stringSources.put(gid.get_componentId(), stringSourceMap); } stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue())); @@ -301,17 +303,17 @@ public T registerMetric(String name, T metric, int timeBucke throw new RuntimeException("The same metric name `" + name + "` was registered twice." ); } - Map m1 = _registeredMetrics; + Map>> m1 = _registeredMetrics; if(!m1.containsKey(timeBucketSizeInSecs)) { - m1.put(timeBucketSizeInSecs, new HashMap()); + m1.put(timeBucketSizeInSecs, new HashMap>()); } - Map m2 = (Map)m1.get(timeBucketSizeInSecs); + Map> m2 = m1.get(timeBucketSizeInSecs); if(!m2.containsKey(_taskId)) { - m2.put(_taskId, new HashMap()); + m2.put(_taskId, new HashMap()); } - Map m3 = (Map)m2.get(_taskId); + Map m3 = m2.get(_taskId); if(m3.containsKey(name)) { throw new RuntimeException("The same metric name `" + name + "` was registered twice." ); } else { @@ -346,13 +348,13 @@ public IMetric getRegisteredMetricByName(String name) { } /* - * Convinience method for registering ReducedMetric. + * Convenience method for registering ReducedMetric. */ public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) { return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs); } /* - * Convinience method for registering CombinedMetric. + * Convenience method for registering CombinedMetric. */ public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); From 427cef54c74204a257bc97a3e8fb793854603aaf Mon Sep 17 00:00:00 2001 From: Dan Blanchard Date: Tue, 7 Jul 2015 10:25:58 -0400 Subject: [PATCH 2/2] Add source->stream->fields mapping to multi-lang handshake. --- docs/documentation/Multilang-protocol.md | 9 ++++++-- .../backtype/storm/task/TopologyContext.java | 23 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/docs/documentation/Multilang-protocol.md b/docs/documentation/Multilang-protocol.md index 017ad328c06..2a90059d029 100644 --- a/docs/documentation/Multilang-protocol.md +++ b/docs/documentation/Multilang-protocol.md @@ -66,7 +66,7 @@ The initial handshake is the same for both types of shell components: "4": "example-bolt2" }, "taskid": 3, - // Everything below this line is only available in Storm 0.11.0+ + // Everything below this line is only available in Storm 0.10.0+ "componentid": "example-bolt" "stream->target->grouping": { "default": { @@ -82,6 +82,11 @@ The initial handshake is the same for both types of shell components: } } } + "source->stream->fields": { + "example-spout": { + "default": ["word"] + } + } } } ``` @@ -90,7 +95,7 @@ Your script should create an empty file named with its PID in this directory. e. the PID is 1234, so an empty file named 1234 is created in the directory. This file lets the supervisor know the PID so it can shutdown the process later on. -As of Storm 0.11.0, the context sent by Storm to shell components has been +As of Storm 0.10.0, the context sent by Storm to shell components has been enhanced substantially to include all aspects of the topology context available to JVM components. One key addition is the ability to determine a shell component's source and targets (i.e., inputs and outputs) in the topology via diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java index b3c6d220917..cefa2078e79 100644 --- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java +++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java @@ -190,6 +190,28 @@ public int getThisTaskIndex() { throw new RuntimeException("Fatal: could not find this task id in this component"); } + /** + * Gets the declared input fields for this component. + * + * @return A map from sources to streams to fields. + */ + public Map>> getThisInputFields() { + Map>> outputMap = new HashMap<>(); + for (Map.Entry entry : this.getThisSources().entrySet()) { + String componentId = entry.getKey().get_componentId(); + Set streams = getComponentStreams(componentId); + for (String stream : streams) { + Map> streamFieldMap = outputMap.get(componentId); + if (streamFieldMap == null) { + streamFieldMap = new HashMap<>(); + outputMap.put(componentId, streamFieldMap); + } + streamFieldMap.put(stream, getComponentOutputFields(componentId, stream).toList()); + } + } + return outputMap; + } + /** * Gets the declared inputs to this component. * @@ -274,6 +296,7 @@ public String toJSONString() { stringSourceMap.put(gid.get_streamId(), groupingToJSONableMap(entry.getValue())); } obj.put("source->stream->grouping", stringSources); + obj.put("source->stream->fields", this.getThisInputFields()); return JSONValue.toJSONString(obj); }