Skip to content

Commit

Permalink
General cleanup of the generics.
Browse files Browse the repository at this point in the history
  • Loading branch information
ddebree committed Aug 13, 2015
1 parent 9e80903 commit 9e4c3df
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 50 deletions.
4 changes: 2 additions & 2 deletions storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
Expand Up @@ -26,7 +26,7 @@


public interface ITridentSpout<T> extends Serializable {
public interface BatchCoordinator<X> {
interface BatchCoordinator<X> {
/**
* Create metadata for this particular transaction id which has never
* been emitted before. The metadata should contain whatever is necessary
Expand Down Expand Up @@ -55,7 +55,7 @@ public interface BatchCoordinator<X> {
void close();
}

public interface Emitter<X> {
interface Emitter<X> {
/**
* Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata
* was created by the Coordinator in the initializeTranaction method. This method must always emit
Expand Down
Expand Up @@ -35,7 +35,7 @@


public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout<Object> {
IOpaquePartitionedTridentSpout _spout;
IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;

public class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
IOpaquePartitionedTridentSpout.Coordinator _coordinator;
Expand Down Expand Up @@ -75,10 +75,10 @@ public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
}

public class Emitter implements ICommitterTridentSpout.Emitter {
IOpaquePartitionedTridentSpout.Emitter _emitter;
IOpaquePartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
TransactionalState _state;
TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<Long, Map<String, Object>>();
Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<>();
Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
int _index;
int _numTasks;

Expand All @@ -97,7 +97,7 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl
if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
_partitionStates.clear();
List<ISpoutPartition> myPartitions = new ArrayList();
List<ISpoutPartition> myPartitions = new ArrayList<>();
for(int i=_index; i < partitions.size(); i+=_numTasks) {
ISpoutPartition p = partitions.get(i);
String id = p.getId();
Expand All @@ -108,15 +108,15 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl
_savedCoordinatorMeta = coordinatorMeta;
_changedMeta = true;
}
Map<String, Object> metas = new HashMap<String, Object>();
Map<String, Object> metas = new HashMap<>();
_cachedMetas.put(tx.getTransactionId(), metas);

Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
Map<String, Object> prevCached;
if(entry!=null) {
prevCached = entry.getValue();
} else {
prevCached = new HashMap<String, Object>();
prevCached = new HashMap<>();
}

for(String id: _partitionStates.keySet()) {
Expand Down Expand Up @@ -147,8 +147,8 @@ public void commit(TransactionAttempt attempt) {
// another attempt of the batch to commit, the batch phase must have succeeded in between.
// hence, all tasks for the prior commit must have finished committing (whether successfully or not)
if(_changedMeta && _index==0) {
Set<String> validIds = new HashSet<String>();
for(ISpoutPartition p: (List<ISpoutPartition>) _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
Set<String> validIds = new HashSet<>();
for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
validIds.add(p.getId());
}
for(String existingPartition: _state.list("")) {
Expand All @@ -174,7 +174,7 @@ public void close() {
}
}

public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout spout) {
public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) {
_spout = spout;
}

Expand Down
Expand Up @@ -30,25 +30,25 @@


public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
IPartitionedTridentSpout _spout;
IPartitionedTridentSpout<Integer, ISpoutPartition, Object> _spout;

public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout spout) {
public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout<Integer, ISpoutPartition, Object> spout) {
_spout = spout;
}

public IPartitionedTridentSpout getPartitionedSpout() {
public IPartitionedTridentSpout<Integer, ISpoutPartition, Object> getPartitionedSpout() {
return _spout;
}

class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
private IPartitionedTridentSpout.Coordinator _coordinator;
class Coordinator implements ITridentSpout.BatchCoordinator<Integer> {
private IPartitionedTridentSpout.Coordinator<Integer> _coordinator;

public Coordinator(Map conf, TopologyContext context) {
_coordinator = _spout.getCoordinator(conf, context);
}

@Override
public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
public Integer initializeTransaction(long txid, Integer prevMetadata, Integer currMetadata) {
if(currMetadata!=null) {
return currMetadata;
} else {
Expand Down Expand Up @@ -82,10 +82,10 @@ public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
}
}

class Emitter implements ITridentSpout.Emitter<Object> {
private IPartitionedTridentSpout.Emitter _emitter;
class Emitter implements ITridentSpout.Emitter<Integer> {
private IPartitionedTridentSpout.Emitter<Integer, ISpoutPartition, Object> _emitter;
private TransactionalState _state;
private Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
private Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
private int _index;
private int _numTasks;

Expand All @@ -100,12 +100,12 @@ public Emitter(String txStateId, Map conf, TopologyContext context) {


@Override
public void emitBatch(final TransactionAttempt tx, final Object coordinatorMeta,
public void emitBatch(final TransactionAttempt tx, final Integer coordinatorMeta,
final TridentCollector collector) {
if(_savedCoordinatorMeta == null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
_partitionStates.clear();
List<ISpoutPartition> myPartitions = new ArrayList();
List<ISpoutPartition> myPartitions = new ArrayList<>();
for(int i=_index; i < partitions.size(); i+=_numTasks) {
ISpoutPartition p = partitions.get(i);
String id = p.getId();
Expand Down Expand Up @@ -150,12 +150,12 @@ public void close() {
}

@Override
public ITridentSpout.BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
public ITridentSpout.BatchCoordinator<Integer> getCoordinator(String txStateId, Map conf, TopologyContext context) {
return new Coordinator(conf, context);
}

@Override
public ITridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
public ITridentSpout.Emitter<Integer> getEmitter(String txStateId, Map conf, TopologyContext context) {
return new Emitter(txStateId, conf, context);
}

Expand Down
Expand Up @@ -78,7 +78,7 @@ public RichSpoutEmitter(Map conf, TopologyContext context) {
if(batchSize==null) batchSize = 1000;
_maxBatchSize = batchSize.intValue();
_collector = new CaptureCollector();
idsMap = new RotatingMap(3);
idsMap = new RotatingMap<>(3);
rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
}

Expand Down Expand Up @@ -174,7 +174,7 @@ static class CaptureCollector implements ISpoutOutputCollector {
public long pendingCount;
public void reset(TridentCollector c) {
_collector = c;
ids = new ArrayList<Object>();
ids = new ArrayList<>();
}

@Override
Expand Down
Expand Up @@ -56,7 +56,7 @@ public RichSpoutBatchTriggerer(IRichSpout delegate, String streamName, String ba
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_delegate.open(conf, context, new SpoutOutputCollector(new StreamOverrideCollector(collector)));
_outputTasks = new ArrayList<Integer>();
_outputTasks = new ArrayList<>();
for(String component: Utils.get(context.getThisTargets(),
_coordStream,
new HashMap<String, Grouping>()).keySet()) {
Expand Down Expand Up @@ -119,20 +119,20 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = _delegate.getComponentConfiguration();
if(conf==null) conf = new HashMap();
else conf = new HashMap(conf);
if(conf==null) conf = new HashMap<>();
else conf = new HashMap<>(conf);
Config.registerSerialization(conf, RichSpoutBatchId.class, RichSpoutBatchIdSerializer.class);
return conf;
}

static class FinishCondition {
Set<Long> vals = new HashSet<Long>();
Set<Long> vals = new HashSet<>();
Object msgId;
}

Map<Long, Long> _msgIdToBatchId = new HashMap();
Map<Long, Long> _msgIdToBatchId = new HashMap<>();

Map<Long, FinishCondition> _finishConditions = new HashMap();
Map<Long, FinishCondition> _finishConditions = new HashMap<>();

class StreamOverrideCollector implements ISpoutOutputCollector {

Expand All @@ -149,7 +149,7 @@ public List<Integer> emit(String ignore, List<Object> values, Object msgId) {
FinishCondition finish = new FinishCondition();
finish.msgId = msgId;
List<Integer> tasks = _collector.emit(_stream, new ConsList(batchId, values));
Set<Integer> outTasksSet = new HashSet<Integer>(tasks);
Set<Integer> outTasksSet = new HashSet<>(tasks);
for(Integer t: _outputTasks) {
int count = 0;
if(outTasksSet.contains(t)) {
Expand Down
Expand Up @@ -38,14 +38,14 @@ public class TridentSpoutCoordinator implements IBasicBolt {
public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutCoordinator.class);
private static final String META_DIR = "meta";

ITridentSpout _spout;
ITridentSpout.BatchCoordinator _coord;
ITridentSpout<Object> _spout;
ITridentSpout.BatchCoordinator<Object> _coord;
RotatingTransactionalState _state;
TransactionalState _underlyingState;
String _id;


public TridentSpoutCoordinator(String id, ITridentSpout spout) {
public TridentSpoutCoordinator(String id, ITridentSpout<Object> spout) {
_spout = spout;
_id = id;
}
Expand Down
10 changes: 5 additions & 5 deletions storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
Expand Up @@ -42,14 +42,14 @@ public class TridentSpoutExecutor implements ITridentBatchBolt {
public static Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);

AddIdCollector _collector;
ITridentSpout _spout;
ITridentSpout.Emitter _emitter;
ITridentSpout<Object> _spout;
ITridentSpout.Emitter<Object> _emitter;
String _streamName;
String _txStateId;

TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<Long, TransactionAttempt>();
TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<>();

public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout spout) {
public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout<Object> spout) {
_txStateId = txStateId;
_spout = spout;
_streamName = streamName;
Expand Down Expand Up @@ -91,7 +91,7 @@ public void cleanup() {

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
List<String> fields = new ArrayList(_spout.getOutputFields().toList());
List<String> fields = new ArrayList<>(_spout.getOutputFields().toList());
fields.add(0, ID_FIELD);
declarer.declareStream(_streamName, new Fields(fields));
}
Expand Down
Expand Up @@ -18,7 +18,6 @@
package storm.trident.topology;

import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.coordination.BatchOutputCollectorImpl;
import backtype.storm.generated.GlobalStreamId;
Expand Down Expand Up @@ -106,7 +105,7 @@ public String toString() {
long _messageTimeoutMs;
long _lastRotate;

RotatingMap _batches;
RotatingMap<Object, TrackedBatch> _batches;

// map from batchgroupid to coordspec
public TridentBoltExecutor(ITridentBatchBolt bolt, Map<GlobalStreamId, String> batchGroupIds, Map<String, CoordSpec> coordinationSpecs) {
Expand All @@ -122,7 +121,7 @@ public static class TrackedBatch {
int reportedTasks = 0;
int expectedTupleCount = 0;
int receivedTuples = 0;
Map<Integer, Integer> taskEmittedTuples = new HashMap();
Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
boolean failed = false;
boolean receivedCommit;
Tuple delayedAck = null;
Expand All @@ -143,7 +142,7 @@ public String toString() {
public class CoordinatedOutputCollector implements IOutputCollector {
IOutputCollector _delegate;

TrackedBatch _currBatch = null;;
TrackedBatch _currBatch = null;

public void setCurrBatch(TrackedBatch batch) {
_currBatch = batch;
Expand Down Expand Up @@ -197,15 +196,15 @@ private void updateTaskCounts(List<Integer> tasks) {
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L;
_lastRotate = System.currentTimeMillis();
_batches = new RotatingMap(2);
_batches = new RotatingMap<>(2);
_context = context;
_collector = collector;
_coordCollector = new CoordinatedOutputCollector(collector);
_coordOutputCollector = new BatchOutputCollectorImpl(new OutputCollector(_coordCollector));

_coordConditions = (Map) context.getExecutorData("__coordConditions");
if(_coordConditions==null) {
_coordConditions = new HashMap();
_coordConditions = new HashMap<>();
for(String batchGroup: _coordSpecs.keySet()) {
CoordSpec spec = _coordSpecs.get(batchGroup);
CoordCondition cond = new CoordCondition();
Expand All @@ -219,7 +218,7 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector
cond.expectedTaskReports+=context.getComponentTasks(comp).size();
}
}
cond.targetTasks = new HashSet<Integer>();
cond.targetTasks = new HashSet<>();
for(String component: Utils.get(context.getThisTargets(),
COORD_STREAM(batchGroup),
new HashMap<String, Grouping>()).keySet()) {
Expand Down Expand Up @@ -399,7 +398,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> ret = _bolt.getComponentConfiguration();
if(ret==null) ret = new HashMap();
if(ret==null) ret = new HashMap<>();
ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
// TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization
return ret;
Expand Down

0 comments on commit 9e4c3df

Please sign in to comment.