diff --git a/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java b/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java index 2637b54f382..bfef7455c64 100644 --- a/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java +++ b/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java @@ -26,7 +26,7 @@ public interface ITridentSpout extends Serializable { - public interface BatchCoordinator { + interface BatchCoordinator { /** * Create metadata for this particular transaction id which has never * been emitted before. The metadata should contain whatever is necessary @@ -55,7 +55,7 @@ public interface BatchCoordinator { void close(); } - public interface Emitter { + interface Emitter { /** * Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata * was created by the Coordinator in the initializeTranaction method. This method must always emit diff --git a/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java index d1b3fe82c85..9cd98eeab8c 100644 --- a/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java +++ b/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java @@ -35,7 +35,7 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout { - IOpaquePartitionedTridentSpout _spout; + IOpaquePartitionedTridentSpout _spout; public class Coordinator implements ITridentSpout.BatchCoordinator { IOpaquePartitionedTridentSpout.Coordinator _coordinator; @@ -75,10 +75,10 @@ public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) { } public class Emitter implements ICommitterTridentSpout.Emitter { - IOpaquePartitionedTridentSpout.Emitter _emitter; + IOpaquePartitionedTridentSpout.Emitter _emitter; TransactionalState _state; - TreeMap> _cachedMetas = new TreeMap>(); - Map _partitionStates = new HashMap(); + TreeMap> _cachedMetas = new TreeMap<>(); + Map _partitionStates = new HashMap<>(); int _index; int _numTasks; @@ -97,7 +97,7 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) { List partitions = _emitter.getOrderedPartitions(coordinatorMeta); _partitionStates.clear(); - List myPartitions = new ArrayList(); + List myPartitions = new ArrayList<>(); for(int i=_index; i < partitions.size(); i+=_numTasks) { ISpoutPartition p = partitions.get(i); String id = p.getId(); @@ -108,7 +108,7 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl _savedCoordinatorMeta = coordinatorMeta; _changedMeta = true; } - Map metas = new HashMap(); + Map metas = new HashMap<>(); _cachedMetas.put(tx.getTransactionId(), metas); Entry> entry = _cachedMetas.lowerEntry(tx.getTransactionId()); @@ -116,7 +116,7 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl if(entry!=null) { prevCached = entry.getValue(); } else { - prevCached = new HashMap(); + prevCached = new HashMap<>(); } for(String id: _partitionStates.keySet()) { @@ -147,8 +147,8 @@ public void commit(TransactionAttempt attempt) { // another attempt of the batch to commit, the batch phase must have succeeded in between. // hence, all tasks for the prior commit must have finished committing (whether successfully or not) if(_changedMeta && _index==0) { - Set validIds = new HashSet(); - for(ISpoutPartition p: (List) _emitter.getOrderedPartitions(_savedCoordinatorMeta)) { + Set validIds = new HashSet<>(); + for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) { validIds.add(p.getId()); } for(String existingPartition: _state.list("")) { @@ -174,7 +174,7 @@ public void close() { } } - public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout spout) { + public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout spout) { _spout = spout; } diff --git a/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java index f96efca0f2c..a11afdaf8ac 100644 --- a/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java +++ b/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java @@ -30,25 +30,25 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout { - IPartitionedTridentSpout _spout; + IPartitionedTridentSpout _spout; - public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout spout) { + public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout spout) { _spout = spout; } - public IPartitionedTridentSpout getPartitionedSpout() { + public IPartitionedTridentSpout getPartitionedSpout() { return _spout; } - class Coordinator implements ITridentSpout.BatchCoordinator { - private IPartitionedTridentSpout.Coordinator _coordinator; + class Coordinator implements ITridentSpout.BatchCoordinator { + private IPartitionedTridentSpout.Coordinator _coordinator; public Coordinator(Map conf, TopologyContext context) { _coordinator = _spout.getCoordinator(conf, context); } @Override - public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) { + public Integer initializeTransaction(long txid, Integer prevMetadata, Integer currMetadata) { if(currMetadata!=null) { return currMetadata; } else { @@ -82,10 +82,10 @@ public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) { } } - class Emitter implements ITridentSpout.Emitter { - private IPartitionedTridentSpout.Emitter _emitter; + class Emitter implements ITridentSpout.Emitter { + private IPartitionedTridentSpout.Emitter _emitter; private TransactionalState _state; - private Map _partitionStates = new HashMap(); + private Map _partitionStates = new HashMap<>(); private int _index; private int _numTasks; @@ -100,12 +100,12 @@ public Emitter(String txStateId, Map conf, TopologyContext context) { @Override - public void emitBatch(final TransactionAttempt tx, final Object coordinatorMeta, + public void emitBatch(final TransactionAttempt tx, final Integer coordinatorMeta, final TridentCollector collector) { if(_savedCoordinatorMeta == null || !_savedCoordinatorMeta.equals(coordinatorMeta)) { List partitions = _emitter.getOrderedPartitions(coordinatorMeta); _partitionStates.clear(); - List myPartitions = new ArrayList(); + List myPartitions = new ArrayList<>(); for(int i=_index; i < partitions.size(); i+=_numTasks) { ISpoutPartition p = partitions.get(i); String id = p.getId(); @@ -150,12 +150,12 @@ public void close() { } @Override - public ITridentSpout.BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) { + public ITridentSpout.BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) { return new Coordinator(conf, context); } @Override - public ITridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) { + public ITridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) { return new Emitter(txStateId, conf, context); } diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java index ab9fd4b5ddc..8e414c7a5a0 100644 --- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java +++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java @@ -78,7 +78,7 @@ public RichSpoutEmitter(Map conf, TopologyContext context) { if(batchSize==null) batchSize = 1000; _maxBatchSize = batchSize.intValue(); _collector = new CaptureCollector(); - idsMap = new RotatingMap(3); + idsMap = new RotatingMap<>(3); rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); } @@ -174,7 +174,7 @@ static class CaptureCollector implements ISpoutOutputCollector { public long pendingCount; public void reset(TridentCollector c) { _collector = c; - ids = new ArrayList(); + ids = new ArrayList<>(); } @Override diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java index 0380728f2c4..1e554d8ebb1 100644 --- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java +++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java @@ -56,7 +56,7 @@ public RichSpoutBatchTriggerer(IRichSpout delegate, String streamName, String ba @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _delegate.open(conf, context, new SpoutOutputCollector(new StreamOverrideCollector(collector))); - _outputTasks = new ArrayList(); + _outputTasks = new ArrayList<>(); for(String component: Utils.get(context.getThisTargets(), _coordStream, new HashMap()).keySet()) { @@ -119,20 +119,20 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public Map getComponentConfiguration() { Map conf = _delegate.getComponentConfiguration(); - if(conf==null) conf = new HashMap(); - else conf = new HashMap(conf); + if(conf==null) conf = new HashMap<>(); + else conf = new HashMap<>(conf); Config.registerSerialization(conf, RichSpoutBatchId.class, RichSpoutBatchIdSerializer.class); return conf; } static class FinishCondition { - Set vals = new HashSet(); + Set vals = new HashSet<>(); Object msgId; } - Map _msgIdToBatchId = new HashMap(); + Map _msgIdToBatchId = new HashMap<>(); - Map _finishConditions = new HashMap(); + Map _finishConditions = new HashMap<>(); class StreamOverrideCollector implements ISpoutOutputCollector { @@ -149,7 +149,7 @@ public List emit(String ignore, List values, Object msgId) { FinishCondition finish = new FinishCondition(); finish.msgId = msgId; List tasks = _collector.emit(_stream, new ConsList(batchId, values)); - Set outTasksSet = new HashSet(tasks); + Set outTasksSet = new HashSet<>(tasks); for(Integer t: _outputTasks) { int count = 0; if(outTasksSet.contains(t)) { diff --git a/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java b/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java index a936e19013c..d9acb5f4e07 100644 --- a/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java +++ b/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java @@ -38,14 +38,14 @@ public class TridentSpoutCoordinator implements IBasicBolt { public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutCoordinator.class); private static final String META_DIR = "meta"; - ITridentSpout _spout; - ITridentSpout.BatchCoordinator _coord; + ITridentSpout _spout; + ITridentSpout.BatchCoordinator _coord; RotatingTransactionalState _state; TransactionalState _underlyingState; String _id; - public TridentSpoutCoordinator(String id, ITridentSpout spout) { + public TridentSpoutCoordinator(String id, ITridentSpout spout) { _spout = spout; _id = id; } diff --git a/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java index 22b304a6532..6431426d040 100644 --- a/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java +++ b/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java @@ -42,14 +42,14 @@ public class TridentSpoutExecutor implements ITridentBatchBolt { public static Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class); AddIdCollector _collector; - ITridentSpout _spout; - ITridentSpout.Emitter _emitter; + ITridentSpout _spout; + ITridentSpout.Emitter _emitter; String _streamName; String _txStateId; - TreeMap _activeBatches = new TreeMap(); + TreeMap _activeBatches = new TreeMap<>(); - public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout spout) { + public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout spout) { _txStateId = txStateId; _spout = spout; _streamName = streamName; @@ -91,7 +91,7 @@ public void cleanup() { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - List fields = new ArrayList(_spout.getOutputFields().toList()); + List fields = new ArrayList<>(_spout.getOutputFields().toList()); fields.add(0, ID_FIELD); declarer.declareStream(_streamName, new Fields(fields)); } diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java index a23e5559e56..5266e1c2fc6 100644 --- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java +++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java @@ -18,7 +18,6 @@ package storm.trident.topology; import backtype.storm.Config; -import backtype.storm.Constants; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.coordination.BatchOutputCollectorImpl; import backtype.storm.generated.GlobalStreamId; @@ -106,7 +105,7 @@ public String toString() { long _messageTimeoutMs; long _lastRotate; - RotatingMap _batches; + RotatingMap _batches; // map from batchgroupid to coordspec public TridentBoltExecutor(ITridentBatchBolt bolt, Map batchGroupIds, Map coordinationSpecs) { @@ -122,7 +121,7 @@ public static class TrackedBatch { int reportedTasks = 0; int expectedTupleCount = 0; int receivedTuples = 0; - Map taskEmittedTuples = new HashMap(); + Map taskEmittedTuples = new HashMap<>(); boolean failed = false; boolean receivedCommit; Tuple delayedAck = null; @@ -143,7 +142,7 @@ public String toString() { public class CoordinatedOutputCollector implements IOutputCollector { IOutputCollector _delegate; - TrackedBatch _currBatch = null;; + TrackedBatch _currBatch = null; public void setCurrBatch(TrackedBatch batch) { _currBatch = batch; @@ -197,7 +196,7 @@ private void updateTaskCounts(List tasks) { public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L; _lastRotate = System.currentTimeMillis(); - _batches = new RotatingMap(2); + _batches = new RotatingMap<>(2); _context = context; _collector = collector; _coordCollector = new CoordinatedOutputCollector(collector); @@ -205,7 +204,7 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector _coordConditions = (Map) context.getExecutorData("__coordConditions"); if(_coordConditions==null) { - _coordConditions = new HashMap(); + _coordConditions = new HashMap<>(); for(String batchGroup: _coordSpecs.keySet()) { CoordSpec spec = _coordSpecs.get(batchGroup); CoordCondition cond = new CoordCondition(); @@ -219,7 +218,7 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector cond.expectedTaskReports+=context.getComponentTasks(comp).size(); } } - cond.targetTasks = new HashSet(); + cond.targetTasks = new HashSet<>(); for(String component: Utils.get(context.getThisTargets(), COORD_STREAM(batchGroup), new HashMap()).keySet()) { @@ -399,7 +398,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public Map getComponentConfiguration() { Map ret = _bolt.getComponentConfiguration(); - if(ret==null) ret = new HashMap(); + if(ret==null) ret = new HashMap<>(); ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5); // TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization return ret;