Skip to content

Commit

Permalink
STORM-2153: get rid of ConcurrentHashMap via preinitializing all poss…
Browse files Browse the repository at this point in the history
…ible metrics
  • Loading branch information
HeartSaVioR committed Jan 12, 2018
1 parent 7947a07 commit 16bfb84
Showing 1 changed file with 59 additions and 34 deletions.
93 changes: 59 additions & 34 deletions storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,69 +18,94 @@
package org.apache.storm.metrics2;

import com.codahale.metrics.Counter;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.utils.Utils;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.Set;

public class TaskMetrics {
private static final String METRIC_NAME_ACKED = "acked";
private static final String METRIC_NAME_FAILED = "failed";
private static final String METRIC_NAME_EMITTED = "emitted";
private static final String METRIC_NAME_TRANSFERRED = "transferred";

private ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>();
private ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>();
private ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>();
private ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>();
private Map<String, Counter> ackedByStream;
private Map<String, Counter> failedByStream;
private Map<String, Counter> emittedByStream;
private Map<String, Counter> transferredByStream;

private String topologyId;
private String componentId;
private Integer taskId;
private Integer workerPort;

public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid){
this.topologyId = context.getStormId();
public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskId) {
this.componentId = componentId;
this.taskId = taskid;
this.workerPort = context.getThisWorkerPort();

String topologyId = context.getStormId();
Integer workerPort = context.getThisWorkerPort();

preInitializeMetricMap(context, topologyId, componentId, taskId, workerPort);
}

public Counter getAcked(String streamId) {
Counter c = this.ackedByStream.get(streamId);
if (c == null) {
c = StormMetricRegistry.counter(METRIC_NAME_ACKED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
this.ackedByStream.put(streamId, c);
}
return c;
return getCounter(streamId, this.ackedByStream);
}

public Counter getFailed(String streamId) {
Counter c = this.failedByStream.get(streamId);
if (c == null) {
c = StormMetricRegistry.counter(METRIC_NAME_FAILED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
this.failedByStream.put(streamId, c);
}
return c;
return getCounter(streamId, this.failedByStream);
}

public Counter getEmitted(String streamId) {
Counter c = this.emittedByStream.get(streamId);
if (c == null) {
c = StormMetricRegistry.counter(METRIC_NAME_EMITTED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
this.emittedByStream.put(streamId, c);
}
return c;
return getCounter(streamId, this.emittedByStream);
}

public Counter getTransferred(String streamId) {
Counter c = this.transferredByStream.get(streamId);
return getCounter(streamId, this.transferredByStream);
}

private Counter getCounter(String streamId, Map<String, Counter> counterByStream) {
Counter c = counterByStream.get(streamId);
if (c == null) {
c = StormMetricRegistry.counter(METRIC_NAME_TRANSFERRED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
this.transferredByStream.put(streamId, c);
throw new RuntimeException("Unknown stream ID provided: " + streamId + " for component " + componentId);
}
return c;
}

private void preInitializeMetricMap(WorkerTopologyContext context, String topologyId, String componentId,
Integer taskId, Integer workerPort) {
Set<String> inputAndOutputStreams = new HashSet<>();
inputAndOutputStreams.add(Utils.DEFAULT_STREAM_ID);

// input streams
Map<GlobalStreamId, Grouping> sources = context.getSources(componentId);
for (GlobalStreamId globalStreamId : sources.keySet()) {
inputAndOutputStreams.add(globalStreamId.get_streamId());
}

// output streams
Set<String> outputStreams = context.getComponentStreams(componentId);
inputAndOutputStreams.addAll(outputStreams);

this.ackedByStream = preInitializeStreamToCounter(METRIC_NAME_ACKED, topologyId, componentId, taskId, workerPort,
inputAndOutputStreams);
this.failedByStream = preInitializeStreamToCounter(METRIC_NAME_FAILED, topologyId, componentId, taskId, workerPort,
inputAndOutputStreams);
this.emittedByStream = preInitializeStreamToCounter(METRIC_NAME_EMITTED, topologyId, componentId, taskId, workerPort,
inputAndOutputStreams);
this.transferredByStream = preInitializeStreamToCounter(METRIC_NAME_TRANSFERRED, topologyId, componentId, taskId, workerPort,
inputAndOutputStreams);
}

private Map<String, Counter> preInitializeStreamToCounter(String metricName, String topologyId, String componentId, Integer taskId,
Integer workerPort, Set<String> streams) {
Map<String, Counter> streamToCounter = new HashMap<>();
for (String stream : streams) {
streamToCounter.put(stream, StormMetricRegistry.counter(metricName, topologyId, componentId, taskId, workerPort, stream));
}
return streamToCounter;
}

}

0 comments on commit 16bfb84

Please sign in to comment.