From ed0548e8b61a8ea67007d906953406a264b15c99 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 24 Jan 2018 14:47:01 -0600 Subject: [PATCH 1/3] STORM-2910: Override local nimbus client by default --- .../main/java/org/apache/storm/LocalCluster.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java index 502f4541c56..20a46a3ca0f 100644 --- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java +++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java @@ -368,6 +368,7 @@ public IBolt makeAckerBoltImpl() { private final String trackId; private final StormCommonInstaller commonInstaller; private final SimulatedTime time; + private final NimbusClient.LocalOverride nimbusOverride; /** * Create a default LocalCluster. @@ -476,6 +477,13 @@ private LocalCluster(Builder builder) throws Exception { } catch (Exception e) { //Ignore any exceptions we might be doing a test for authentication } + if (thriftServer == null) { + //We don't want to override the client if there is a thrift server up and running, or we would not test any + // Of the actual thrift code + this.nimbusOverride = new NimbusClient.LocalOverride(this); + } else { + this.nimbusOverride = null; + } success = true; } finally { if (!success) { @@ -658,6 +666,9 @@ public int getThriftServerPort() { @Override public synchronized void close() throws Exception { + if (nimbusOverride != null) { + nimbusOverride.close(); + } if (nimbus != null) { nimbus.shutdown(); } @@ -1098,6 +1109,7 @@ public TopologyHistoryInfo getTopologyHistory(String user) throws AuthorizationE /** * Run c with a local mode cluster overriding the NimbusClient and DRPCClient calls. + * NOTE local mode override happens by default now unless netty is turned on for the local cluster. * @param c the callable to run in this mode * @param ttlSec the number of seconds to let the cluster run after c has completed * @return the result of calling C @@ -1106,7 +1118,6 @@ public TopologyHistoryInfo getTopologyHistory(String user) throws AuthorizationE public static T withLocalModeOverride(Callable c, long ttlSec) throws Exception { LOG.info("\n\n\t\tSTARTING LOCAL MODE CLUSTER\n\n"); try (LocalCluster local = new LocalCluster(); - NimbusClient.LocalOverride nimbusOverride = new NimbusClient.LocalOverride(local); LocalDRPC drpc = new LocalDRPC(); DRPCClient.LocalOverride drpcOverride = new DRPCClient.LocalOverride(drpc)) { From 48c2fda867aa1d1123f6ba9c9f623ae80c9df280 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 25 Jan 2018 09:49:20 -0600 Subject: [PATCH 2/3] STORM-2910: have metrics reported in the background --- .../storm/daemon/supervisor/Container.java | 18 +++--- .../daemon/supervisor/OnlyLatestExecutor.java | 55 +++++++++++++++++++ .../daemon/supervisor/ReadClusterState.java | 6 +- .../apache/storm/daemon/supervisor/Slot.java | 33 ++++++----- .../storm/daemon/supervisor/Supervisor.java | 16 +++++- .../storm/daemon/supervisor/SlotTest.java | 15 +++-- 6 files changed, 112 insertions(+), 31 deletions(-) create mode 100644 storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java index f45ce25ec69..a06e44c3dee 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java @@ -50,6 +50,7 @@ import org.apache.storm.utils.ServerConfigUtils; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; @@ -709,7 +710,7 @@ public String getWorkerId() { /** * Send worker metrics to Nimbus. */ - void processMetrics() { + void processMetrics(OnlyLatestExecutor exec) { try { if (_usedMemory.get(_port) != null) { // Make sure we don't process too frequently. @@ -725,20 +726,23 @@ void processMetrics() { long timestamp = System.currentTimeMillis(); double value = _usedMemory.get(_port).memory; WorkerMetricPoint workerMetric = new WorkerMetricPoint(MEMORY_USED_METRIC, timestamp, value, SYSTEM_COMPONENT_ID, - INVALID_EXECUTOR_ID, INVALID_STREAM_ID); + INVALID_EXECUTOR_ID, INVALID_STREAM_ID); WorkerMetricList metricList = new WorkerMetricList(); metricList.add_to_metrics(workerMetric); WorkerMetrics metrics = new WorkerMetrics(_topologyId, _port, hostname, metricList); - try (NimbusClient client = NimbusClient.getConfiguredClient(_conf)) { - client.getClient().processWorkerMetrics(metrics); - } - - this.lastMetricProcessTime = currentTimeMsec; + exec.execute(_port, () -> { + try (NimbusClient client = NimbusClient.getConfiguredClient(_conf)) { + client.getClient().processWorkerMetrics(metrics); + } catch (Exception e) { + LOG.error("Failed to process metrics", e); + } + }); } } catch (Exception e) { LOG.error("Failed to process metrics", e); + } finally { this.lastMetricProcessTime = System.currentTimeMillis(); } } diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java new file mode 100644 index 00000000000..bd73766b7d3 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; + +/** + * This allows you to submit a Runnable with a key. If the previous submission for that key has not yet run, + * it will be replaced with the latest one. + */ +public class OnlyLatestExecutor { + private final Executor exec; + private final ConcurrentMap latest; + + public OnlyLatestExecutor(Executor exec) { + this.exec = exec; + latest = new ConcurrentHashMap<>(); + } + + /** + * Run something in the future, but replace it with the latest if it is taking too long + * @param key what to use to dedupe things. + * @param r what you want to run. + */ + public void execute(final K key, Runnable r) { + Runnable old = latest.put(key, r); + if (old == null) { + //It was not there before so we need to run it. + exec.execute(() -> { + Runnable run = latest.remove(key); + if (run != null) { + run.run();; + } + }); + } + } +} diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java index 884efcb0e16..5d8bb336115 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java @@ -65,7 +65,8 @@ public class ReadClusterState implements Runnable, AutoCloseable { private final String host; private final LocalState localState; private final AtomicReference> cachedAssignments; - + private final OnlyLatestExecutor metricsExec; + public ReadClusterState(Supervisor supervisor) throws Exception { this.superConf = supervisor.getConf(); this.stormClusterState = supervisor.getStormClusterState(); @@ -77,6 +78,7 @@ public ReadClusterState(Supervisor supervisor) throws Exception { this.host = supervisor.getHostName(); this.localState = supervisor.getLocalState(); this.cachedAssignments = supervisor.getCurrAssignment(); + this.metricsExec = new OnlyLatestExecutor<>(supervisor.getHeartbeatExecutor()); this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext()); @@ -108,7 +110,7 @@ public ReadClusterState(Supervisor supervisor) throws Exception { private Slot mkSlot(int port) throws Exception { return new Slot(localizer, superConf, launcher, host, port, - localState, stormClusterState, iSuper, cachedAssignments); + localState, stormClusterState, iSuper, cachedAssignments, metricsExec); } @Override diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index fe30c935fe2..670029114cf 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -95,12 +95,14 @@ static class StaticState { public final ISupervisor iSupervisor; public final LocalState localState; public final BlobChangingCallback changingCallback; - + public final OnlyLatestExecutor metricsExec; + StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs, - long killSleepMs, long monitorFreqMs, - ContainerLauncher containerLauncher, String host, int port, - ISupervisor iSupervisor, LocalState localState, - BlobChangingCallback changingCallback) { + long killSleepMs, long monitorFreqMs, + ContainerLauncher containerLauncher, String host, int port, + ISupervisor iSupervisor, LocalState localState, + BlobChangingCallback changingCallback, + OnlyLatestExecutor metricsExec) { this.localizer = localizer; this.hbTimeoutMs = hbTimeoutMs; this.firstHbTimeoutMs = firstHbTimeoutMs; @@ -112,6 +114,7 @@ static class StaticState { this.iSupervisor = iSupervisor; this.localState = localState; this.changingCallback = changingCallback; + this.metricsExec = metricsExec; } } @@ -937,7 +940,7 @@ static DynamicState handleRunning(DynamicState dynamicState, StaticState staticS dynamicState = dynamicState.withProfileActions(mod, modPending); } - dynamicState.container.processMetrics(); + dynamicState.container.processMetrics(staticState.metricsExec); Time.sleep(staticState.monitorFreqMs); return dynamicState; @@ -971,14 +974,17 @@ static DynamicState handleEmpty(DynamicState dynamicState, StaticState staticSta private volatile boolean done = false; private volatile DynamicState dynamicState; private final AtomicReference> cachedCurrentAssignments; - + private final OnlyLatestExecutor metricsExec; + public Slot(AsyncLocalizer localizer, Map conf, - ContainerLauncher containerLauncher, String host, - int port, LocalState localState, - IStormClusterState clusterState, - ISupervisor iSupervisor, - AtomicReference> cachedCurrentAssignments) throws Exception { + ContainerLauncher containerLauncher, String host, + int port, LocalState localState, + IStormClusterState clusterState, + ISupervisor iSupervisor, + AtomicReference> cachedCurrentAssignments, + OnlyLatestExecutor metricsExec) throws Exception { super("SLOT_"+port); + this.metricsExec = metricsExec; this.cachedCurrentAssignments = cachedCurrentAssignments; this.clusterState = clusterState; @@ -1024,7 +1030,8 @@ public Slot(AsyncLocalizer localizer, Map conf, port, iSupervisor, localState, - this); + this, + metricsExec); this.newAssignment.set(dynamicState.newAssignment); if (MachineState.RUNNING == dynamicState.state) { //We are running so we should recover the blobs. diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java index ee5a55c7b20..147a8aa369f 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.FileUtils; @@ -74,6 +76,10 @@ public class Supervisor implements DaemonCommon, AutoCloseable { private final AtomicReference> currAssignment; private final StormTimer heartbeatTimer; private final StormTimer eventTimer; + //Right now this is only used for sending metrics to nimbus, + // but we may want to combine it with the heartbeatTimer at some point + // to really make this work well. + private final ExecutorService heartbeatExecutor; private final AsyncLocalizer asyncLocalizer; private EventManager eventManager; private ReadClusterState readState; @@ -89,6 +95,7 @@ public Supervisor(Map conf, IContext sharedContext, ISupervisor this.upTime = Utils.makeUptimeComputer(); this.stormVersion = VersionInfo.getVersion(); this.sharedContext = sharedContext; + this.heartbeatExecutor = Executors.newFixedThreadPool(1); iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf)); @@ -125,7 +132,14 @@ public Supervisor(Map conf, IContext sharedContext, ISupervisor this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); } - + + /** + * Get the executor service that is supposed to be used for heart-beats. + */ + public ExecutorService getHeartbeatExecutor() { + return heartbeatExecutor; + } + public String getId() { return supervisorId; } diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java index f627bc7561b..08a52663cba 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java @@ -17,7 +17,6 @@ */ package org.apache.storm.daemon.supervisor; -import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.*; @@ -150,7 +149,7 @@ public void testEmptyToEmpty() throws Exception { ContainerLauncher containerLauncher = mock(ContainerLauncher.class); ISupervisor iSuper = mock(ISupervisor.class); StaticState staticState = new StaticState(localizer, 1000, 1000, 1000, 1000, - containerLauncher, "localhost", 8080, iSuper, state, cb); + containerLauncher, "localhost", 8080, iSuper, state, cb, null); DynamicState dynamicState = new DynamicState(null, null, null); DynamicState nextState = Slot.handleEmpty(dynamicState, staticState); assertEquals(MachineState.EMPTY, nextState.state); @@ -182,7 +181,7 @@ public void testLaunchContainerFromEmpty() throws Exception { ISupervisor iSuper = mock(ISupervisor.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb); + containerLauncher, "localhost", port, iSuper, state, cb, null); DynamicState dynamicState = new DynamicState(null, null, null) .withNewAssignment(newAssignment); @@ -251,7 +250,7 @@ public void testRelaunch() throws Exception { ISupervisor iSuper = mock(ISupervisor.class); LocalState state = mock(LocalState.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb); + containerLauncher, "localhost", port, iSuper, state, cb, null); DynamicState dynamicState = new DynamicState(assignment, container, assignment); DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); @@ -312,7 +311,7 @@ public void testReschedule() throws Exception { ISupervisor iSuper = mock(ISupervisor.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb); + containerLauncher, "localhost", port, iSuper, state, cb, null); DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment); DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); @@ -393,7 +392,7 @@ public void testRunningToEmpty() throws Exception { ISupervisor iSuper = mock(ISupervisor.class); LocalState state = mock(LocalState.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb); + containerLauncher, "localhost", port, iSuper, state, cb, null); DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null); DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); @@ -454,7 +453,7 @@ public void testRunWithProfileActions() throws Exception { ISupervisor iSuper = mock(ISupervisor.class); LocalState state = mock(LocalState.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb); + containerLauncher, "localhost", port, iSuper, state, cb, null); Set profileActions = new HashSet<>(); ProfileRequest request = new ProfileRequest(); request.set_action(ProfileAction.JPROFILE_STOP); @@ -532,7 +531,7 @@ public void testResourcesChanged() throws Exception { ISupervisor iSuper = mock(ISupervisor.class); long heartbeatTimeoutMs = 5000; StaticState staticState = new StaticState(localizer, heartbeatTimeoutMs, 120_000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb); + containerLauncher, "localhost", port, iSuper, state, cb, null); Set changing = new HashSet<>(); LocallyCachedBlob stormJar = mock(LocallyCachedBlob.class); From 77be31bc4533cb9437242261ff89d1a750eef5c4 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 31 Jan 2018 10:17:43 -0600 Subject: [PATCH 3/3] Addressed review comments --- .../storm/daemon/supervisor/OnlyLatestExecutor.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java index bd73766b7d3..7dc9b0bb319 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java @@ -21,12 +21,15 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This allows you to submit a Runnable with a key. If the previous submission for that key has not yet run, * it will be replaced with the latest one. */ public class OnlyLatestExecutor { + private static final Logger LOG = LoggerFactory.getLogger(OnlyLatestExecutor.class); private final Executor exec; private final ConcurrentMap latest; @@ -47,9 +50,11 @@ public void execute(final K key, Runnable r) { exec.execute(() -> { Runnable run = latest.remove(key); if (run != null) { - run.run();; + run.run(); } }); - } + } else { + LOG.debug("Replacing runnable for {} - {}", key, r); + } } }