/
TopologyBuilder.java
588 lines (526 loc) · 27.2 KB
/
TopologyBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.topology;
import org.apache.storm.Config;
import org.apache.storm.generated.*;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.PartialKeyGrouping;
import org.apache.storm.hooks.IWorkerHook;
import org.apache.storm.spout.CheckpointSpout;
import org.apache.storm.state.State;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import java.io.NotSerializableException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.windowing.TupleWindow;
import org.json.simple.JSONValue;
import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_COMPONENT_ID;
import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID;
/**
* TopologyBuilder exposes the Java API for specifying a topology for Storm
* to execute. Topologies are Thrift structures in the end, but since the Thrift API
* is so verbose, TopologyBuilder greatly eases the process of creating topologies.
* The template for creating and submitting a topology looks something like:
*
* ```java
* TopologyBuilder builder = new TopologyBuilder();
*
* builder.setSpout("1", new TestWordSpout(true), 5);
* builder.setSpout("2", new TestWordSpout(true), 3);
* builder.setBolt("3", new TestWordCounter(), 3)
* .fieldsGrouping("1", new Fields("word"))
* .fieldsGrouping("2", new Fields("word"));
* builder.setBolt("4", new TestGlobalCount())
* .globalGrouping("1");
*
* Map conf = new HashMap();
* conf.put(Config.TOPOLOGY_WORKERS, 4);
*
* StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());
* ```
*
* Running the exact same topology in local mode (in process), and configuring it to log all tuples
* emitted, looks like the following. Note that it lets the topology run for 10 seconds
* before shutting down the local cluster.
*
* ```java
* TopologyBuilder builder = new TopologyBuilder();
*
* builder.setSpout("1", new TestWordSpout(true), 5);
* builder.setSpout("2", new TestWordSpout(true), 3);
* builder.setBolt("3", new TestWordCounter(), 3)
* .fieldsGrouping("1", new Fields("word"))
* .fieldsGrouping("2", new Fields("word"));
* builder.setBolt("4", new TestGlobalCount())
* .globalGrouping("1");
*
* Map conf = new HashMap();
* conf.put(Config.TOPOLOGY_WORKERS, 4);
* conf.put(Config.TOPOLOGY_DEBUG, true);
*
* LocalCluster cluster = new LocalCluster();
* cluster.submitTopology("mytopology", conf, builder.createTopology());
* Utils.sleep(10000);
* cluster.shutdown();
* ```
*
* The pattern for `TopologyBuilder` is to map component ids to components using the setSpout
* and setBolt methods. Those methods return objects that are then used to declare
* the inputs for that component.
*/
public class TopologyBuilder {
private Map<String, IRichBolt> _bolts = new HashMap<>();
private Map<String, IRichSpout> _spouts = new HashMap<>();
private Map<String, ComponentCommon> _commons = new HashMap<>();
private boolean hasStatefulBolt = false;
// private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();
private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<>();
private List<ByteBuffer> _workerHooks = new ArrayList<>();
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
maybeAddCheckpointSpout();
for(String boltId: _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
bolt = maybeAddCheckpointTupleForwarder(bolt);
ComponentCommon common = getComponentCommon(boltId, bolt);
try{
maybeAddCheckpointInputs(common);
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
}catch(RuntimeException wrapperCause){
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){
throw new IllegalStateException(
"Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
"which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
"should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause);
}
throw wrapperCause;
}
}
for(String spoutId: _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
try{
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
}catch(RuntimeException wrapperCause){
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){
throw new IllegalStateException(
"Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
"which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
"should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause);
}
throw wrapperCause;
}
}
StormTopology stormTopology = new StormTopology(spoutSpecs,
boltSpecs,
new HashMap<String, StateSpoutSpec>());
stormTopology.set_worker_hooks(_workerHooks);
return stormTopology;
}
/**
* Define a new bolt in this topology with parallelism of just one thread.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the bolt
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
/**
* Define a new bolt in this topology with the specified amount of parallelism.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}
/**
* Define a new bolt in this topology. This defines a basic bolt, which is a
* simpler to use but more restricted kind of bolt. Basic bolts are intended
* for non-aggregation processing and automate the anchoring/acking process to
* achieve proper reliability in the topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the basic bolt
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IBasicBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
/**
* Define a new bolt in this topology. This defines a basic bolt, which is a
* simpler to use but more restricted kind of bolt. Basic bolts are intended
* for non-aggregation processing and automate the anchoring/acking process to
* achieve proper reliability in the topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the basic bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
/**
* Define a new bolt in this topology. This defines a windowed bolt, intended
* for windowing operations. The {@link IWindowedBolt#execute(TupleWindow)} method
* is triggered for each window interval with the list of current events in the window.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the windowed bolt
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
/**
* Define a new bolt in this topology. This defines a windowed bolt, intended
* for windowing operations. The {@link IWindowedBolt#execute(TupleWindow)} method
* is triggered for each window interval with the list of current events in the window.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the windowed bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
return setBolt(id, new WindowedBoltExecutor(bolt), parallelism_hint);
}
/**
* Define a new bolt in this topology. This defines a stateful bolt, that requires its
* state (of computation) to be saved. When this bolt is initialized, the {@link IStatefulBolt#initState(State)} method
* is invoked after {@link IStatefulBolt#prepare(Map, TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)}
* with its previously saved state.
* <p>
* The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology
* are expected to anchor the tuples while emitting and ack the input tuples once its processed.
* </p>
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the stateful bolt
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
/**
* Define a new bolt in this topology. This defines a stateful bolt, that requires its
* state (of computation) to be saved. When this bolt is initialized, the {@link IStatefulBolt#initState(State)} method
* is invoked after {@link IStatefulBolt#prepare(Map, TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)}
* with its previously saved state.
* <p>
* The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology
* are expected to anchor the tuples while emitting and ack the input tuples once its processed.
* </p>
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the stateful bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt, Number parallelism_hint) throws IllegalArgumentException {
hasStatefulBolt = true;
return setBolt(id, new StatefulBoltExecutor<T>(bolt), parallelism_hint);
}
/**
* Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful
* windowing operations. The {@link IStatefulWindowedBolt#execute(TupleWindow)} method is triggered
* for each window interval with the list of current events in the window. During initialization of
* this bolt {@link IStatefulWindowedBolt#initState(State)} is invoked with its previously saved state.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the stateful windowed bolt
* @param <T> the type of the state (e.g. {@link org.apache.storm.state.KeyValueState})
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
/**
* Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful
* windowing operations. The {@link IStatefulWindowedBolt#execute(TupleWindow)} method is triggered
* for each window interval with the list of current events in the window. During initialization of
* this bolt {@link IStatefulWindowedBolt#initState(State)} is invoked with its previously saved state.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the stateful windowed bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
* @param <T> the type of the state (e.g. {@link org.apache.storm.state.KeyValueState})
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt, Number parallelism_hint) throws IllegalArgumentException {
hasStatefulBolt = true;
return setBolt(id, new StatefulBoltExecutor<T>(new StatefulWindowedBoltExecutor<T>(bolt)), parallelism_hint);
}
/**
* Define a new spout in this topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
* @param spout the spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public SpoutDeclarer setSpout(String id, IRichSpout spout) throws IllegalArgumentException {
return setSpout(id, spout, null);
}
/**
* Define a new spout in this topology with the specified parallelism. If the spout declares
* itself as non-distributed, the parallelism_hint will be ignored and only one task
* will be allocated to this component.
*
* @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
* @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somewhere around the cluster.
* @param spout the spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, spout, parallelism_hint);
_spouts.put(id, spout);
return new SpoutGetter(id);
}
public void setStateSpout(String id, IRichStateSpout stateSpout) throws IllegalArgumentException {
setStateSpout(id, stateSpout, null);
}
public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
// TODO: finish
}
/**
* Add a new worker lifecycle hook
*
* @param workerHook the lifecycle hook to add
*/
public void addWorkerHook(IWorkerHook workerHook) {
if(null == workerHook) {
throw new IllegalArgumentException("WorkerHook must not be null.");
}
_workerHooks.add(ByteBuffer.wrap(Utils.javaSerialize(workerHook)));
}
private void validateUnusedId(String id) {
if(_bolts.containsKey(id)) {
throw new IllegalArgumentException("Bolt has already been declared for id " + id);
}
if(_spouts.containsKey(id)) {
throw new IllegalArgumentException("Spout has already been declared for id " + id);
}
if(_stateSpouts.containsKey(id)) {
throw new IllegalArgumentException("State spout has already been declared for id " + id);
}
}
/**
* If the topology has at least one stateful bolt
* add a {@link CheckpointSpout} component to the topology.
*/
private void maybeAddCheckpointSpout() {
if (hasStatefulBolt) {
setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
}
}
private void maybeAddCheckpointInputs(ComponentCommon common) {
if (hasStatefulBolt) {
addCheckPointInputs(common);
}
}
/**
* If the topology has at least one stateful bolt all the non-stateful bolts
* are wrapped in {@link CheckpointTupleForwarder} so that the checkpoint
* tuples can flow through the topology.
*/
private IRichBolt maybeAddCheckpointTupleForwarder(IRichBolt bolt) {
if (hasStatefulBolt && !(bolt instanceof StatefulBoltExecutor)) {
bolt = new CheckpointTupleForwarder(bolt);
}
return bolt;
}
/**
* For bolts that has incoming streams from spouts (the root bolts),
* add checkpoint stream from checkpoint spout to its input. For other bolts,
* add checkpoint stream from the previous bolt to its input.
*/
private void addCheckPointInputs(ComponentCommon component) {
Set<GlobalStreamId> checkPointInputs = new HashSet<>();
for (GlobalStreamId inputStream : component.get_inputs().keySet()) {
String sourceId = inputStream.get_componentId();
if (_spouts.containsKey(sourceId)) {
checkPointInputs.add(new GlobalStreamId(CHECKPOINT_COMPONENT_ID, CHECKPOINT_STREAM_ID));
} else {
checkPointInputs.add(new GlobalStreamId(sourceId, CHECKPOINT_STREAM_ID));
}
}
for (GlobalStreamId streamId : checkPointInputs) {
component.put_to_inputs(streamId, Grouping.all(new NullStruct()));
}
}
private ComponentCommon getComponentCommon(String id, IComponent component) {
ComponentCommon ret = new ComponentCommon(_commons.get(id));
OutputFieldsGetter getter = new OutputFieldsGetter();
component.declareOutputFields(getter);
ret.set_streams(getter.getFieldsDeclaration());
return ret;
}
private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if(parallelism!=null) {
int dop = parallelism.intValue();
if(dop < 1) {
throw new IllegalArgumentException("Parallelism must be positive.");
}
common.set_parallelism_hint(dop);
}
Map conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}
protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
String _id;
public ConfigGetter(String id) {
_id = id;
}
@Override
public T addConfigurations(Map<String, Object> conf) {
if(conf!=null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
}
String currConf = _commons.get(_id).get_json_conf();
_commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
return (T) this;
}
}
protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {
public SpoutGetter(String id) {
super(id);
}
}
protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {
private String _boltId;
public BoltGetter(String boltId) {
super(boltId);
_boltId = boltId;
}
public BoltDeclarer fieldsGrouping(String componentId, Fields fields) {
return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields);
}
public BoltDeclarer fieldsGrouping(String componentId, String streamId, Fields fields) {
return grouping(componentId, streamId, Grouping.fields(fields.toList()));
}
public BoltDeclarer globalGrouping(String componentId) {
return globalGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer globalGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.fields(new ArrayList<String>()));
}
public BoltDeclarer shuffleGrouping(String componentId) {
return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer shuffleGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.shuffle(new NullStruct()));
}
public BoltDeclarer localOrShuffleGrouping(String componentId) {
return localOrShuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer localOrShuffleGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.local_or_shuffle(new NullStruct()));
}
public BoltDeclarer noneGrouping(String componentId) {
return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer noneGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.none(new NullStruct()));
}
public BoltDeclarer allGrouping(String componentId) {
return allGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer allGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.all(new NullStruct()));
}
public BoltDeclarer directGrouping(String componentId) {
return directGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer directGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.direct(new NullStruct()));
}
private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) {
_commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping);
return this;
}
@Override
public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
return customGrouping(componentId, new PartialKeyGrouping(fields));
}
@Override
public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
}
@Override
public BoltDeclarer customGrouping(String componentId, CustomStreamGrouping grouping) {
return customGrouping(componentId, Utils.DEFAULT_STREAM_ID, grouping);
}
@Override
public BoltDeclarer customGrouping(String componentId, String streamId, CustomStreamGrouping grouping) {
return grouping(componentId, streamId, Grouping.custom_serialized(Utils.javaSerialize(grouping)));
}
@Override
public BoltDeclarer grouping(GlobalStreamId id, Grouping grouping) {
return grouping(id.get_componentId(), id.get_streamId(), grouping);
}
}
private static Map parseJson(String json) {
if (json==null) {
return new HashMap();
} else {
try {
return (Map) JSONValue.parseWithException(json);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}
private static String mergeIntoJson(Map into, Map newMap) {
Map res = new HashMap(into);
if(newMap!=null) res.putAll(newMap);
return JSONValue.toJSONString(res);
}
}