Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-3370: Make StormMetricRegistry non-static, and supply it as a d… #2989

Merged
merged 1 commit into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.storm.perf;

import java.util.concurrent.locks.LockSupport;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.WaitStrategyPark;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.MutableLong;
Expand All @@ -45,8 +46,9 @@ public static void main(String[] args) throws Exception {

private static void ackingProducerSimulation() {
WaitStrategyPark ws = new WaitStrategyPark(100);
JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", 1000, 1000);
JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", 1000, 1000);
StormMetricRegistry registry = new StormMetricRegistry();
JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", 1000, 1000, registry);
JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", 1000, 1000, registry);

final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
final Acker acker = new Acker(ackQ, spoutQ);
Expand All @@ -56,8 +58,9 @@ private static void ackingProducerSimulation() {

private static void producerFwdConsumer(int prodBatchSz) {
WaitStrategyPark ws = new WaitStrategyPark(100);
JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000);
JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000);
StormMetricRegistry registry = new StormMetricRegistry();
JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);
JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);

final Producer prod = new Producer(q1);
final Forwarder fwd = new Forwarder(q1, q2);
Expand All @@ -68,7 +71,8 @@ private static void producerFwdConsumer(int prodBatchSz) {


private static void oneProducer1Consumer(int prodBatchSz) {
JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000);
JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000,
new StormMetricRegistry());

final Producer prod1 = new Producer(q1);
final Consumer cons1 = new Consumer(q1);
Expand All @@ -77,7 +81,8 @@ private static void oneProducer1Consumer(int prodBatchSz) {
}

private static void twoProducer1Consumer(int prodBatchSz) {
JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000);
JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000,
new StormMetricRegistry());

final Producer prod1 = new Producer(q1);
final Producer prod2 = new Producer(q1);
Expand All @@ -87,7 +92,8 @@ private static void twoProducer1Consumer(int prodBatchSz) {
}

private static void threeProducer1Consumer(int prodBatchSz) {
JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000);
JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000,
new StormMetricRegistry());

final Producer prod1 = new Producer(q1);
final Producer prod2 = new Producer(q1);
Expand All @@ -100,8 +106,9 @@ private static void threeProducer1Consumer(int prodBatchSz) {

private static void oneProducer2Consumers(int prodBatchSz) {
WaitStrategyPark ws = new WaitStrategyPark(100);
JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000);
JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000);
StormMetricRegistry registry = new StormMetricRegistry();
JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);
JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);

final Producer2 prod1 = new Producer2(q1, q2);
final Consumer cons1 = new Consumer(q1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ private TopologyContext createTopologyContext(Map<String, Object> topoConf) {
Map<Integer, String> taskToComponent = new HashMap<Integer, String>();
taskToComponent.put(7, "Xcom");
return new TopologyContext(null, topoConf, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null,
null, null);
null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -772,12 +772,7 @@ static class MockTopologyContext extends TopologyContext {
private final int componentId;

public MockTopologyContext(int componentId, Map<String, Object> topoConf) {
// StormTopology topology, Map<String, Object> topoConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>>
// componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String
// pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String,
// Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>>
// registeredMetrics, Atom openOrPrepareWasCalled
super(null, topoConf, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
super(null, topoConf, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
this.componentId = componentId;
}

Expand Down
34 changes: 18 additions & 16 deletions storm-client/src/jvm/org/apache/storm/daemon/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
import org.apache.storm.hooks.ITaskHook;
import org.apache.storm.hooks.info.EmitInfo;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.metrics2.TaskMetrics;
import org.apache.storm.spout.ShellSpout;
import org.apache.storm.stats.CommonStats;
Expand All @@ -57,20 +58,20 @@ public class Task {

private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private final TaskMetrics taskMetrics;
private Executor executor;
private WorkerState workerData;
private TopologyContext systemTopologyContext;
private TopologyContext userTopologyContext;
private WorkerTopologyContext workerTopologyContext;
private Integer taskId;
private String componentId;
private Object taskObject; // Spout/Bolt object
private Map<String, Object> topoConf;
private BooleanSupplier emitSampler;
private CommonStats executorStats;
private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
private HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> streamToGroupers;
private boolean debug;
private final Executor executor;
private final WorkerState workerData;
private final TopologyContext systemTopologyContext;
private final TopologyContext userTopologyContext;
private final WorkerTopologyContext workerTopologyContext;
private final Integer taskId;
private final String componentId;
private final Object taskObject; // Spout/Bolt object
private final Map<String, Object> topoConf;
private final BooleanSupplier emitSampler;
private final CommonStats executorStats;
private final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
private final HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> streamToGroupers;
private final boolean debug;

public Task(Executor executor, Integer taskId) throws IOException {
this.taskId = taskId;
Expand All @@ -88,7 +89,7 @@ public Task(Executor executor, Integer taskId) throws IOException {
this.taskObject = mkTaskObject();
this.debug = topoConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) topoConf.get(Config.TOPOLOGY_DEBUG);
this.addTaskHooks();
this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId);
this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId, workerData.getMetricRegistry());
}

private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> getGroupersPerStream(
Expand Down Expand Up @@ -249,7 +250,8 @@ private TopologyContext mkTopologyContext(StormTopology topology) throws IOExcep
workerData.getUserSharedResources(),
executor.getSharedExecutorData(),
executor.getIntervalToTaskToMetricToRegistry(),
executor.getOpenOrPrepareWasCalled());
executor.getOpenOrPrepareWasCalled(),
workerData.getMetricRegistry());
}

private Object mkTaskObject() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class Worker implements Shutdownable, DaemonCommon {
private final int port;
private final String workerId;
private final LogConfigManager logConfigManager;

private final StormMetricRegistry metricRegistry;

private WorkerState workerState;
private AtomicReference<List<IRunningExecutor>> executorsAtom;
Expand Down Expand Up @@ -112,6 +112,7 @@ public Worker(Map<String, Object> conf, IContext context, String topologyId, Str
this.port = port;
this.workerId = workerId;
this.logConfigManager = new LogConfigManager();
this.metricRegistry = new StormMetricRegistry();
}

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -152,7 +153,7 @@ public void start() throws Exception {
IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);

StormMetricRegistry.start(conf, DaemonType.WORKER);
metricRegistry.start(conf, DaemonType.WORKER);

Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
Map<String, String> initCreds = new HashMap<>();
Expand All @@ -172,7 +173,7 @@ private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateS
Map<String, String> initCreds, Credentials initialCredentials)
throws Exception {
workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorPort, port, workerId,
topologyConf, stateStorage, stormClusterState, autoCreds);
topologyConf, stateStorage, stormClusterState, autoCreds, metricRegistry);

// Heartbeat here so that worker process dies if this fails
// it's important that worker heartbeat to supervisor ASAP so that supervisor knows
Expand Down Expand Up @@ -484,7 +485,7 @@ public void shutdown() {

workerState.closeResources();

StormMetricRegistry.stop();
metricRegistry.stop();

LOG.info("Trigger any worker shutdown hooks");
workerState.runWorkerShutdownHooks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.cluster.VersionedData;
Expand All @@ -59,6 +60,7 @@
import org.apache.storm.messaging.IContext;
import org.apache.storm.messaging.TransportFactory;
import org.apache.storm.messaging.netty.BackPressureStatus;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.serialization.ITupleSerializer;
Expand Down Expand Up @@ -144,11 +146,14 @@ public class WorkerState {
private final AtomicLong nextLoadUpdate = new AtomicLong(0);
private final boolean trySerializeLocal;
private final Collection<IAutoCredentials> autoCredentials;
private final StormMetricRegistry metricRegistry;

public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId,
int supervisorPort, int port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage,
IStormClusterState stormClusterState, Collection<IAutoCredentials> autoCredentials) throws IOException,
IStormClusterState stormClusterState, Collection<IAutoCredentials> autoCredentials,
StormMetricRegistry metricRegistry) throws IOException,
InvalidTopologyException {
this.metricRegistry = metricRegistry;
this.autoCredentials = autoCredentials;
this.conf = conf;
this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
Expand Down Expand Up @@ -673,8 +678,8 @@ private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyC
for (List<Long> executor : executors) {
int port = this.getPort();
receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(),
recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, this.getPort()));
recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, this.getPort(), metricRegistry));

}
return receiveQueueMap;
Expand Down Expand Up @@ -754,6 +759,10 @@ public JCQueue getTransferQueue() {
return workerTransfer.getTransferQueue();
}

public StormMetricRegistry getMetricRegistry() {
return metricRegistry;
}

public interface ILocalTransferCallback {
void transfer(ArrayList<AddressedTuple> tupleBatch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ class WorkerTransfer implements JCQueue.Consumer {
static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class);

private final TransferDrainer drainer;
private WorkerState workerState;
private final WorkerState workerState;

private IWaitStrategy backPressureWaitStrategy;
private final IWaitStrategy backPressureWaitStrategy;

private JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries maybe null (if no emits to those tasksIds from this worker)

private AtomicBoolean[] remoteBackPressureStatus; // [[remoteTaskId] -> true/false : indicates if remote task is under BP.
private final AtomicBoolean[] remoteBackPressureStatus; // [[remoteTaskId] -> true/false : indicates if remote task is under BP.

public WorkerTransfer(WorkerState workerState, Map<String, Object> topologyConf, int maxTaskIdInTopo) {
this.workerState = workerState;
Expand All @@ -66,7 +66,8 @@ public WorkerTransfer(WorkerState workerState, Map<String, Object> topologyConf,
}

this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, workerState.getPort());
workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, workerState.getPort(),
workerState.getMetricRegistry());
}

public JCQueue getTransferQueue() {
Expand Down
Loading