From 75e4a027d874266b302b84c5507ca148c5be7e52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Mon, 6 Nov 2017 18:01:52 +0100 Subject: [PATCH] STORM:2803: Fix leaking threads from Nimbus/TimeCacheMap, slightly refactor Time to use more final fields, replaced uses of deprecated classes/methods and added a few tests. --- .travis.yml | 1 - .../tools/NthLastModifiedTimeTrackerTest.java | 70 ++++++------- .../auth/ShellBasedGroupsMapping.java | 57 ++++++++--- .../org/apache/storm/utils/RotatingMap.java | 2 +- .../storm/utils/ShellCommandRunner.java | 63 ++++++++++++ .../storm/utils/ShellCommandRunnerImpl.java | 46 +++++++++ .../org/apache/storm/utils/ShellUtils.java | 52 ++-------- .../src/jvm/org/apache/storm/utils/Time.java | 87 ++++++++-------- .../auth/ShellBasedGroupsMappingTest.java | 99 +++++++++++++++++++ .../apache/storm/utils/TopologySpoutLag.java | 2 +- .../apache/storm/daemon/nimbus/Nimbus.java | 3 + .../storm/daemon/supervisor/SlotTest.java | 20 ++-- .../utils/ArtifactoryConfigLoaderTest.java | 7 +- 13 files changed, 353 insertions(+), 156 deletions(-) create mode 100644 storm-client/src/jvm/org/apache/storm/utils/ShellCommandRunner.java create mode 100644 storm-client/src/jvm/org/apache/storm/utils/ShellCommandRunnerImpl.java create mode 100644 storm-client/test/jvm/org/apache/storm/security/auth/ShellBasedGroupsMappingTest.java diff --git a/.travis.yml b/.travis.yml index 5e3dab2c615..128b5e045e5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,7 +33,6 @@ before_install: install: /bin/bash ./dev-tools/travis/travis-install.sh `pwd` script: - /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES -sudo: true cache: directories: - "$HOME/.m2/repository" diff --git a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java index a28ea3818a5..2c071686cf3 100644 --- a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java +++ b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java @@ -23,6 +23,8 @@ import static org.fest.assertions.api.Assertions.assertThat; +import org.apache.storm.utils.Time.SimulatedTime; + public class NthLastModifiedTimeTrackerTest { private static final int ANY_NUM_TIMES_TO_TRACK = 3; @@ -55,19 +57,17 @@ public Object[][] whenNotYetMarkedAsModifiedData() { @Test(dataProvider = "whenNotYetMarkedAsModifiedData") public void shouldReturnCorrectModifiedTimeEvenWhenNotYetMarkedAsModified(int secondsToAdvance) { - // given - Time.startSimulating(); - NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(ANY_NUM_TIMES_TO_TRACK); - - // when - advanceSimulatedTimeBy(secondsToAdvance); - int seconds = tracker.secondsSinceOldestModification(); + // given + try (SimulatedTime t = new SimulatedTime()) { + NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(ANY_NUM_TIMES_TO_TRACK); - // then - assertThat(seconds).isEqualTo(secondsToAdvance); + // when + Time.advanceTimeSecs(secondsToAdvance); + int seconds = tracker.secondsSinceOldestModification(); - // cleanup - Time.stopSimulating(); + // then + assertThat(seconds).isEqualTo(secondsToAdvance); + } } @DataProvider @@ -94,32 +94,26 @@ public Object[][] simulatedTrackerIterations() { { 3, new int[]{ 1, 2, 3 }, new int[]{ 1, 3, 5 } } }; } - @Test(dataProvider = "simulatedTrackerIterations") - public void shouldReturnCorrectModifiedTimeWhenMarkedAsModified(int numTimesToTrack, - int[] secondsToAdvancePerIteration, int[] expLastModifiedTimes) { - // given - Time.startSimulating(); - NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(numTimesToTrack); - - int[] modifiedTimes = new int[expLastModifiedTimes.length]; - - // when - int i = 0; - for (int secondsToAdvance : secondsToAdvancePerIteration) { - advanceSimulatedTimeBy(secondsToAdvance); - tracker.markAsModified(); - modifiedTimes[i] = tracker.secondsSinceOldestModification(); - i++; + @Test(dataProvider = "simulatedTrackerIterations") + public void shouldReturnCorrectModifiedTimeWhenMarkedAsModified(int numTimesToTrack, + int[] secondsToAdvancePerIteration, int[] expLastModifiedTimes) { + // given + try (SimulatedTime t = new SimulatedTime()) { + NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(numTimesToTrack); + + int[] modifiedTimes = new int[expLastModifiedTimes.length]; + + // when + int i = 0; + for (int secondsToAdvance : secondsToAdvancePerIteration) { + Time.advanceTimeSecs(secondsToAdvance); + tracker.markAsModified(); + modifiedTimes[i] = tracker.secondsSinceOldestModification(); + i++; + } + + // then + assertThat(modifiedTimes).isEqualTo(expLastModifiedTimes); + } } - - // then - assertThat(modifiedTimes).isEqualTo(expLastModifiedTimes); - - // cleanup - Time.stopSimulating(); - } - - private void advanceSimulatedTimeBy(int seconds) { - Time.advanceTime(seconds * MILLIS_IN_SEC); - } } diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java b/storm-client/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java index 3ee57542218..d1a7596ddb6 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java @@ -18,16 +18,20 @@ package org.apache.storm.security.auth; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Set; import java.util.HashSet; import java.util.Map; -import java.util.StringTokenizer; +import java.util.concurrent.TimeUnit; import org.apache.storm.Config; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.ShellCommandRunner; +import org.apache.storm.utils.ShellCommandRunnerImpl; import org.apache.storm.utils.ShellUtils; -import org.apache.storm.utils.TimeCacheMap; import org.apache.storm.utils.ShellUtils.ExitCodeException; +import org.apache.storm.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +40,21 @@ public class ShellBasedGroupsMapping implements IGroupMappingServiceProvider { public static final Logger LOG = LoggerFactory.getLogger(ShellBasedGroupsMapping.class); - public TimeCacheMap> cachedGroups; + public RotatingMap> cachedGroups; + + private final ShellCommandRunner shellCommandRunner; + + private long timeoutMs; + private volatile long lastRotationMs; + + public ShellBasedGroupsMapping() { + this(new ShellCommandRunnerImpl()); + } + + @VisibleForTesting + ShellBasedGroupsMapping(ShellCommandRunner shellCommandRunner) { + this.shellCommandRunner = shellCommandRunner; + } /** * Invoked once immediately after construction @@ -44,10 +62,11 @@ public class ShellBasedGroupsMapping implements */ @Override public void prepare(Map topoConf) { - int timeout = ObjectReader.getInt(topoConf.get(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS)); - cachedGroups = new TimeCacheMap<>(timeout); + timeoutMs = TimeUnit.SECONDS.toMillis(ObjectReader.getInt(topoConf.get(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS))); + lastRotationMs = Time.currentTimeMillis(); + cachedGroups = new RotatingMap<>(2); } - + /** * Returns list of groups for a user * @@ -57,6 +76,7 @@ public void prepare(Map topoConf) { @Override public Set getGroups(String user) throws IOException { synchronized(this) { + rotateIfNeeded(); if (cachedGroups.containsKey(user)) { return cachedGroups.get(user); } @@ -69,6 +89,19 @@ public Set getGroups(String user) throws IOException { } return groups; } + + private void rotateIfNeeded() { + long nowMs = Time.currentTimeMillis(); + if (nowMs >= lastRotationMs + timeoutMs) { + //Rotate once per timeout period that has passed since last time this was called. + //This is necessary since this method may be called at arbitrary intervals. + int rotationsToDo = (int)((nowMs - lastRotationMs) / timeoutMs); + for (int i = 0; i < rotationsToDo; i++) { + cachedGroups.rotate(); + } + lastRotationMs = nowMs; + } + } /** * Get the current user's group list from Unix by running the command 'groups' @@ -77,21 +110,19 @@ public Set getGroups(String user) throws IOException { * @return the groups set that the user belongs to * @throws IOException if encounter any error when running the command */ - private static Set getUnixGroups(final String user) throws IOException { + private Set getUnixGroups(final String user) throws IOException { String result; try { - result = ShellUtils.execCommand(ShellUtils.getGroupsForUserCommand(user)); + result = shellCommandRunner.execCommand(ShellUtils.getGroupsForUserCommand(user)); } catch (ExitCodeException e) { // if we didn't get the group - just return empty list; - LOG.debug("unable to get groups for user " + user + ".ShellUtils command failed with exit code "+ e.getExitCode()); + LOG.debug("Unable to get groups for user " + user + ". ShellUtils command failed with exit code "+ e.getExitCode()); return new HashSet<>(); } - StringTokenizer tokenizer = - new StringTokenizer(result, ShellUtils.TOKEN_SEPARATOR_REGEX); Set groups = new HashSet<>(); - while (tokenizer.hasMoreTokens()) { - groups.add(tokenizer.nextToken()); + for (String group : result.split(shellCommandRunner.getTokenSeparatorRegex())) { + groups.add(group); } return groups; } diff --git a/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java b/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java index dfd6bdfbd20..89dedd47396 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java +++ b/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java @@ -67,7 +67,7 @@ public RotatingMap(ExpiredCallback callback) { public RotatingMap(int numBuckets) { this(numBuckets, null); - } + } public Map rotate() { Map dead = _buckets.removeLast(); diff --git a/storm-client/src/jvm/org/apache/storm/utils/ShellCommandRunner.java b/storm-client/src/jvm/org/apache/storm/utils/ShellCommandRunner.java new file mode 100644 index 00000000000..97185696897 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/utils/ShellCommandRunner.java @@ -0,0 +1,63 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.utils; + +import java.io.IOException; +import java.util.Map; + +/** + * Contains convenience functions for running shell commands for cases that are too simple to need a full {@link ShellUtils} implementation. + */ +public interface ShellCommandRunner { + + /** + * Method to execute a shell command. + * Covers most of the simple cases without requiring the user to implement + * the {@link ShellUtils} interface. + * @param cmd shell command to execute. + * @return the output of the executed command. + */ + String execCommand(String ... cmd) throws IOException; + + /** + * Method to execute a shell command. + * Covers most of the simple cases without requiring the user to implement + * the {@link ShellUtils} interface. + * @param env the map of environment key=value + * @param cmd shell command to execute. + * @param timeout time in milliseconds after which script should be marked timeout + * @return the output of the executed command. + */ + + String execCommand(Map env, String[] cmd, + long timeout) throws IOException; + + /** + * Method to execute a shell command. + * Covers most of the simple cases without requiring the user to implement + * the {@link ShellUtils} interface. + * @param env the map of environment key=value + * @param cmd shell command to execute. + * @return the output of the executed command. + */ + String execCommand(Map env, String ... cmd) + throws IOException; + + /** Token separator regex used to parse Shell tool outputs */ + String getTokenSeparatorRegex(); + +} diff --git a/storm-client/src/jvm/org/apache/storm/utils/ShellCommandRunnerImpl.java b/storm-client/src/jvm/org/apache/storm/utils/ShellCommandRunnerImpl.java new file mode 100644 index 00000000000..fd9d466676f --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/utils/ShellCommandRunnerImpl.java @@ -0,0 +1,46 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.utils; + +import java.io.IOException; +import java.util.Map; + +public class ShellCommandRunnerImpl implements ShellCommandRunner { + + @Override + public String execCommand(String... cmd) throws IOException { + return execCommand(null, cmd, 0L); + } + + @Override + public String execCommand(Map env, String[] cmd, long timeout) throws IOException { + ShellUtils.ShellCommandExecutor exec = new ShellUtils.ShellCommandExecutor(cmd, null, env, + timeout); + exec.execute(); + return exec.getOutput(); + } + + @Override + public String execCommand(Map env, String... cmd) throws IOException { + return execCommand(env, cmd, 0L); + } + + @Override + public String getTokenSeparatorRegex() { + return ShellUtils.TOKEN_SEPARATOR_REGEX; + } +} diff --git a/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java index ef869b0551b..a772787699a 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java @@ -135,8 +135,10 @@ protected void setWorkingDirectory(File dir) { /** a Unix command to get the current user's groups list */ public static String[] getGroupsCommand() { - return (WINDOWS)? new String[]{"cmd", "/c", "groups"} - : new String[]{"bash", "-c", "groups"}; + if (WINDOWS) { + throw new UnsupportedOperationException("Getting user groups is not supported on Windows"); + } + return new String[]{"bash", "-c", "groups"}; } /** @@ -146,6 +148,9 @@ public static String[] getGroupsCommand() { * i.e. the user's primary group will be included twice. */ public static String[] getGroupsForUserCommand(final String user) { + if (WINDOWS) { + throw new UnsupportedOperationException("Getting user groups is not supported on Windows"); + } //'groups username' command return is non-consistent across different unixes return new String [] {"bash", "-c", "id -gn " + user + "&& id -Gn " + user}; @@ -431,49 +436,6 @@ private void setTimedOut() { this.timedOut.set(true); } - - /** - * Static method to execute a shell command. - * Covers most of the simple cases without requiring the user to implement - * the Shell interface. - * @param cmd shell command to execute. - * @return the output of the executed command. - */ - public static String execCommand(String ... cmd) throws IOException { - return execCommand(null, cmd, 0L); - } - - /** - * Static method to execute a shell command. - * Covers most of the simple cases without requiring the user to implement - * the Shell interface. - * @param env the map of environment key=value - * @param cmd shell command to execute. - * @param timeout time in milliseconds after which script should be marked timeout - * @return the output of the executed command.o - */ - - public static String execCommand(Map env, String[] cmd, - long timeout) throws IOException { - ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env, - timeout); - exec.execute(); - return exec.getOutput(); - } - - /** - * Static method to execute a shell command. - * Covers most of the simple cases without requiring the user to implement - * the Shell interface. - * @param env the map of environment key=value - * @param cmd shell command to execute. - * @return the output of the executed command. - */ - public static String execCommand(Map env, String ... cmd) - throws IOException { - return execCommand(env, cmd, 0L); - } - /** * Timer which is used to timeout scripts spawned off by shell. */ diff --git a/storm-client/src/jvm/org/apache/storm/utils/Time.java b/storm-client/src/jvm/org/apache/storm/utils/Time.java index 0401829ad0e..142c4324525 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Time.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Time.java @@ -31,11 +31,11 @@ */ public class Time { private static final Logger LOG = LoggerFactory.getLogger(Time.class); - private static AtomicBoolean simulating = new AtomicBoolean(false); - private static AtomicLong autoAdvanceNanosOnSleep = new AtomicLong(0); - private static volatile Map threadSleepTimesNanos; - private static final Object sleepTimesLock = new Object(); - private static AtomicLong simulatedCurrTimeNanos; + private static final AtomicBoolean SIMULATING = new AtomicBoolean(false); + private static final AtomicLong AUTO_ADVANCE_NANOS_ON_SLEEP = new AtomicLong(0); + private static final Map THREAD_SLEEP_TIMES_NANOS = new ConcurrentHashMap<>(); + private static final Object SLEEP_TIMES_LOCK = new Object(); + private static final AtomicLong SIMULATED_CURR_TIME_NANOS = new AtomicLong(0); public static class SimulatedTime implements AutoCloseable { @@ -44,12 +44,14 @@ public SimulatedTime() { } public SimulatedTime(Number advanceTimeMs) { - synchronized(Time.sleepTimesLock) { - Time.simulating.set(true); - Time.simulatedCurrTimeNanos = new AtomicLong(0); - Time.threadSleepTimesNanos = new ConcurrentHashMap<>(); + synchronized(Time.SLEEP_TIMES_LOCK) { + Time.SIMULATING.set(true); + Time.SIMULATED_CURR_TIME_NANOS.set(0); + Time.THREAD_SLEEP_TIMES_NANOS.clear(); if (advanceTimeMs != null) { - Time.autoAdvanceNanosOnSleep.set(millisToNanos(advanceTimeMs.longValue())); + Time.AUTO_ADVANCE_NANOS_ON_SLEEP.set(millisToNanos(advanceTimeMs.longValue())); + } else { + Time.AUTO_ADVANCE_NANOS_ON_SLEEP.set(0); } LOG.warn("AutoCloseable Simulated Time Starting..."); } @@ -57,10 +59,8 @@ public SimulatedTime(Number advanceTimeMs) { @Override public void close() { - synchronized(Time.sleepTimesLock) { - Time.simulating.set(false); - Time.autoAdvanceNanosOnSleep.set(0); - Time.threadSleepTimesNanos = null; + synchronized(Time.SLEEP_TIMES_LOCK) { + Time.SIMULATING.set(false); LOG.warn("AutoCloseable Simulated Time Ending..."); } } @@ -68,30 +68,29 @@ public void close() { @Deprecated public static void startSimulating() { - synchronized(Time.sleepTimesLock) { - Time.simulating.set(true); - Time.simulatedCurrTimeNanos = new AtomicLong(0); - Time.threadSleepTimesNanos = new ConcurrentHashMap<>(); + synchronized(Time.SLEEP_TIMES_LOCK) { + Time.SIMULATING.set(true); + Time.SIMULATED_CURR_TIME_NANOS.set(0); + Time.THREAD_SLEEP_TIMES_NANOS.clear(); + Time.AUTO_ADVANCE_NANOS_ON_SLEEP.set(0); LOG.warn("Simulated Time Starting..."); } } @Deprecated public static void stopSimulating() { - synchronized(Time.sleepTimesLock) { - Time.simulating.set(false); - Time.autoAdvanceNanosOnSleep.set(0); - Time.threadSleepTimesNanos = null; + synchronized(Time.SLEEP_TIMES_LOCK) { + Time.SIMULATING.set(false); LOG.warn("Simulated Time Ending..."); } } public static boolean isSimulating() { - return simulating.get(); + return SIMULATING.get(); } public static void sleepUntil(long targetTimeMs) throws InterruptedException { - if(simulating.get()) { + if(SIMULATING.get()) { simulatedSleepUntilNanos(millisToNanos(targetTimeMs)); } else { long sleepTimeMs = targetTimeMs - currentTimeMillis(); @@ -102,7 +101,7 @@ public static void sleepUntil(long targetTimeMs) throws InterruptedException { } public static void sleepUntilNanos(long targetTimeNanos) throws InterruptedException { - if(simulating.get()) { + if(SIMULATING.get()) { simulatedSleepUntilNanos(targetTimeNanos); } else { long sleepTimeNanos = targetTimeNanos-nanoTime(); @@ -116,30 +115,30 @@ public static void sleepUntilNanos(long targetTimeNanos) throws InterruptedExcep private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException { try { - synchronized (sleepTimesLock) { - if (threadSleepTimesNanos == null) { + synchronized (SLEEP_TIMES_LOCK) { + if (THREAD_SLEEP_TIMES_NANOS == null) { LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); throw new InterruptedException(); } - threadSleepTimesNanos.put(Thread.currentThread(), new AtomicLong(targetTimeNanos)); + THREAD_SLEEP_TIMES_NANOS.put(Thread.currentThread(), new AtomicLong(targetTimeNanos)); } - while (simulatedCurrTimeNanos.get() < targetTimeNanos) { - synchronized (sleepTimesLock) { - if (threadSleepTimesNanos == null) { + while (SIMULATED_CURR_TIME_NANOS.get() < targetTimeNanos) { + synchronized (SLEEP_TIMES_LOCK) { + if (THREAD_SLEEP_TIMES_NANOS == null) { LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); throw new InterruptedException(); } } - long autoAdvance = autoAdvanceNanosOnSleep.get(); + long autoAdvance = AUTO_ADVANCE_NANOS_ON_SLEEP.get(); if (autoAdvance > 0) { advanceTimeNanos(autoAdvance); } Thread.sleep(10); } } finally { - synchronized (sleepTimesLock) { - if (simulating.get() && threadSleepTimesNanos != null) { - threadSleepTimesNanos.remove(Thread.currentThread()); + synchronized (SLEEP_TIMES_LOCK) { + if (SIMULATING.get() && THREAD_SLEEP_TIMES_NANOS != null) { + THREAD_SLEEP_TIMES_NANOS.remove(Thread.currentThread()); } } } @@ -160,16 +159,16 @@ public static void sleepSecs (long secs) throws InterruptedException { } public static long nanoTime() { - if (simulating.get()) { - return simulatedCurrTimeNanos.get(); + if (SIMULATING.get()) { + return SIMULATED_CURR_TIME_NANOS.get(); } else { return System.nanoTime(); } } public static long currentTimeMillis() { - if(simulating.get()) { - return nanosToMillis(simulatedCurrTimeNanos.get()); + if(SIMULATING.get()) { + return nanosToMillis(SIMULATED_CURR_TIME_NANOS.get()); } else { return System.currentTimeMillis(); } @@ -208,13 +207,13 @@ public static void advanceTime(long ms) { } public static void advanceTimeNanos(long nanos) { - if (!simulating.get()) { + if (!SIMULATING.get()) { throw new IllegalStateException("Cannot simulate time unless in simulation mode"); } if (nanos < 0) { throw new IllegalArgumentException("advanceTime only accepts positive time as an argument"); } - long newTime = simulatedCurrTimeNanos.addAndGet(nanos); + long newTime = SIMULATED_CURR_TIME_NANOS.addAndGet(nanos); LOG.debug("Advanced simulated time to {}", newTime); } @@ -223,12 +222,12 @@ public static void advanceTimeSecs(long secs) { } public static boolean isThreadWaiting(Thread t) { - if(!simulating.get()) { + if(!SIMULATING.get()) { throw new IllegalStateException("Must be in simulation mode"); } AtomicLong time; - synchronized(sleepTimesLock) { - time = threadSleepTimesNanos.get(t); + synchronized(SLEEP_TIMES_LOCK) { + time = THREAD_SLEEP_TIMES_NANOS.get(t); } return !t.isAlive() || time!=null && nanoTime() < time.longValue(); } diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/ShellBasedGroupsMappingTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/ShellBasedGroupsMappingTest.java new file mode 100644 index 00000000000..62912621651 --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/security/auth/ShellBasedGroupsMappingTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.storm.Config; +import org.apache.storm.utils.ShellCommandRunner; +import org.apache.storm.utils.ShellUtils; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.junit.Test; + +public class ShellBasedGroupsMappingTest { + + private static final String TEST_TWO_GROUPS = "group1 group2"; + private static final String TEST_NO_GROUPS = ""; + private static final String TEST_USER_1 = "TestUserOne"; + private static final String GROUP_SEPARATOR_REGEX = "\\s"; + private static final int CACHE_EXPIRATION_SECS = 10; + + private ShellCommandRunner mockShell; + private ShellBasedGroupsMapping groupsMapping; + private Map topoConf; + + @Before + public void setUp() { + mockShell = mock(ShellCommandRunner.class); + groupsMapping = new ShellBasedGroupsMapping(mockShell); + topoConf = new HashMap<>(); + topoConf.put(Config.STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS, 10); + when(mockShell.getTokenSeparatorRegex()).thenReturn(GROUP_SEPARATOR_REGEX); + } + + @Test + public void testCanGetGroups() throws Exception { + try (SimulatedTime t = new SimulatedTime()) { + groupsMapping.prepare(topoConf); + when(mockShell.execCommand(ShellUtils.getGroupsForUserCommand(TEST_USER_1))).thenReturn(TEST_TWO_GROUPS); + + Set groups = groupsMapping.getGroups(TEST_USER_1); + + assertThat(groups, containsInAnyOrder(TEST_TWO_GROUPS.split(GROUP_SEPARATOR_REGEX))); + } + } + + @Test + public void testWillCacheGroups() throws Exception { + try(SimulatedTime t = new SimulatedTime()) { + groupsMapping.prepare(topoConf); + when(mockShell.execCommand(ShellUtils.getGroupsForUserCommand(TEST_USER_1))).thenReturn(TEST_TWO_GROUPS, TEST_NO_GROUPS); + + Set firstGroups = groupsMapping.getGroups(TEST_USER_1); + Set secondGroups = groupsMapping.getGroups(TEST_USER_1); + + assertThat(firstGroups, is(secondGroups)); + } + } + + @Test + public void testWillExpireCache() throws Exception { + try(SimulatedTime t = new SimulatedTime()) { + groupsMapping.prepare(topoConf); + when(mockShell.execCommand(ShellUtils.getGroupsForUserCommand(TEST_USER_1))).thenReturn(TEST_TWO_GROUPS, TEST_NO_GROUPS); + + Set firstGroups = groupsMapping.getGroups(TEST_USER_1); + Time.advanceTimeSecs(CACHE_EXPIRATION_SECS * 2); + Set secondGroups = groupsMapping.getGroups(TEST_USER_1); + + assertThat(firstGroups, not(secondGroups)); + assertThat(secondGroups, contains(TEST_NO_GROUPS)); + } + } + +} diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java index 07573c9de4c..057726161ce 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java +++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java @@ -162,7 +162,7 @@ private static Map getLagResultForKafka (String spoutId, SpoutSp // if commands contains one or more null value, spout is compiled with lower version of storm-kafka / storm-kafka-client if (!commands.contains(null)) { - String resultFromMonitor = ShellUtils.execCommand(commands.toArray(new String[0])); + String resultFromMonitor = new ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0])); try { result = (Map) JSONValue.parseWithException(resultFromMonitor); diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index d9020e2adb5..a9bb43e7997 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -4132,6 +4132,9 @@ public void shutdown() { stormClusterState.disconnect(); downloaders.cleanup(); uploaders.cleanup(); + blobDownloaders.cleanup(); + blobUploaders.cleanup(); + blobListers.cleanup(); blobStore.shutdown(); leaderElector.close(); ITopologyActionNotifierPlugin actionNotifier = nimbusTopologyActionNotifier; diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java index 4d5691b6cc8..f627bc7561b 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java @@ -17,6 +17,9 @@ */ package org.apache.storm.daemon.supervisor; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -519,7 +522,6 @@ public void testResourcesChanged() throws Exception { LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs()); when(cContainer.readHeartbeat()).thenReturn(chb); when(cContainer.areAllProcessesDead()).thenReturn(false, true); - AsyncLocalizer localizer = mock(AsyncLocalizer.class); Container nContainer = mock(Container.class); LocalState state = mock(LocalState.class); @@ -528,7 +530,8 @@ public void testResourcesChanged() throws Exception { when(nContainer.readHeartbeat()).thenReturn(chb, chb); ISupervisor iSuper = mock(ISupervisor.class); - StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, + long heartbeatTimeoutMs = 5000; + StaticState staticState = new StaticState(localizer, heartbeatTimeoutMs, 120_000, 1000, 1000, containerLauncher, "localhost", port, iSuper, state, cb); Set changing = new HashSet<>(); @@ -539,7 +542,7 @@ public void testResourcesChanged() throws Exception { changing.add(new Slot.BlobChanging(cAssignment, stormJar, stormJarLatch)); DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment).withChangingBlobs(changing); - + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); assertEquals(MachineState.KILL_BLOB_UPDATE, nextState.state); verify(iSuper).killedWorker(port); @@ -550,7 +553,7 @@ public void testResourcesChanged() throws Exception { assertEquals(changing, nextState.changingBlobs); assertTrue(nextState.pendingChangingBlobs.isEmpty()); assertNull(nextState.pendingChangingBlobsAssignment); - assertTrue(Time.currentTimeMillis() > 1000); + assertThat(Time.currentTimeMillis(), greaterThan(1000L)); nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.KILL_BLOB_UPDATE, nextState.state); @@ -560,12 +563,12 @@ public void testResourcesChanged() throws Exception { assertEquals(changing, nextState.changingBlobs); assertTrue(nextState.pendingChangingBlobs.isEmpty()); assertNull(nextState.pendingChangingBlobsAssignment); - assertTrue(Time.currentTimeMillis() > 2000); + assertThat(Time.currentTimeMillis(), greaterThan(2000L)); nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.WAITING_FOR_BLOB_UPDATE, nextState.state); verify(cContainer).cleanUp(); - assertTrue(Time.currentTimeMillis() > 2000); + assertThat(Time.currentTimeMillis(), greaterThan(2000L)); nextState = Slot.stateMachineStep(nextState, staticState); verify(stormJarLatchFuture).get(anyLong(), any()); @@ -575,8 +578,9 @@ public void testResourcesChanged() throws Exception { assertTrue(nextState.pendingChangingBlobs.isEmpty()); assertSame(cAssignment, nextState.currentAssignment); assertSame(nContainer, nextState.container); - assertTrue(Time.currentTimeMillis() > 2000); - + assertThat(Time.currentTimeMillis(), greaterThan(2000L)); + assertThat(Time.currentTimeMillis(), lessThan(heartbeatTimeoutMs)); + nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); assertNull(nextState.pendingChangingBlobsAssignment); diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java index 009f479a117..f026f1d6a76 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java @@ -33,6 +33,7 @@ import java.nio.file.Files; import java.util.HashMap; import java.util.Map; +import org.apache.storm.utils.Time.SimulatedTime; public class ArtifactoryConfigLoaderTest { @@ -125,9 +126,7 @@ public void testArtifactUpdate() { conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, ARTIFACTORY_HTTP_SCHEME_PREFIX + "bogushost.yahoo.com:9999/location/of/test/dir"); conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); - Time.startSimulating(); - - try { + try (SimulatedTime t = new SimulatedTime()) { ArtifactoryConfigLoaderMock loaderMock = new ArtifactoryConfigLoaderMock(conf); loaderMock.setData("Anything", "/location/of/test/dir", @@ -186,8 +185,6 @@ public void testArtifactUpdate() { Assert.assertEquals(2, ret2.get("two")); Assert.assertEquals(3, ret2.get("three")); Assert.assertEquals(4, ret2.get("four")); - } finally { - Time.stopSimulating(); } }