diff --git a/integration-test/config/Vagrantfile b/integration-test/config/Vagrantfile index 740d0b02cc4..def461d7056 100644 --- a/integration-test/config/Vagrantfile +++ b/integration-test/config/Vagrantfile @@ -19,7 +19,7 @@ require 'uri' # Vagrantfile API/syntax version. Don't touch unless you know what you're doing! VAGRANTFILE_API_VERSION = "2" -STORM_BOX_TYPE = "hashicorp/precise64" +STORM_BOX_TYPE = "ubuntu/xenial64" STORM_ZIP = Dir.glob("../../storm-dist/binary/**/*.zip") if(STORM_ZIP.length != 1) raise "Expected one storm-binary found: " + STORM_ZIP.join(",") + ". Did you run : cd ${STORM_SRC_DIR}/storm-dist/binary && mvn clean package -Dgpg.skip=true" @@ -53,8 +53,8 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| } end - config.vm.synced_folder "../../", "/home/vagrant/build/vagrant/storm" - config.vm.synced_folder "~/.m2", "/home/vagrant/.m2" + config.vm.synced_folder "../../", "/home/ubuntu/build/vagrant/storm" + config.vm.synced_folder "~/.m2", "/home/ubuntu/.m2" config.vm.define "node1" do |node1| node1.vm.provider "virtualbox" do |v| @@ -62,7 +62,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| end node1.vm.network "private_network", ip: "192.168.50.3" node1.vm.hostname = "node1" - node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/vagrant/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false + node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/ubuntu/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false #node1.vm.provision :shell, :inline => "sudo ln -fs /vagrant/etc-hosts /etc/hosts" end diff --git a/integration-test/config/cluster.xml b/integration-test/config/cluster.xml deleted file mode 100644 index 97968e46234..00000000000 --- a/integration-test/config/cluster.xml +++ /dev/null @@ -1,101 +0,0 @@ - - - - - - - %d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n - %d %-8r %m%n - - - - /var/log/storm/${logfile.name} - - /var/log/storm/${logfile.name}.%i - 1 - 9 - - - - 100MB - - - - ${pattern} - - - - - /var/log/storm/access.log - - /var/log/storm/access.log.%i - 1 - 9 - - - - 100MB - - - - ${pattern} - - - - - /var/log/storm/metrics.log - - metrics.log.%i - 1 - 9 - - - - 2MB - - - - ${patternMetrics} - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/integration-test/config/install-storm.sh b/integration-test/config/install-storm.sh index b08e2bc3cab..8316c753ba0 100644 --- a/integration-test/config/install-storm.sh +++ b/integration-test/config/install-storm.sh @@ -31,7 +31,6 @@ chown -R storm:storm /etc/storm rm /usr/share/storm/conf/storm.yaml cp "${SCRIPT_DIR}/storm.yaml" /usr/share/storm/conf/ -cp "${SCRIPT_DIR}/cluster.xml" /usr/share/storm/logback/ ln -s /usr/share/storm/conf/storm.yaml /etc/storm/conf/storm.yaml mkdir /var/log/storm diff --git a/integration-test/config/install-zookeeper.sh b/integration-test/config/install-zookeeper.sh index a81a07c55ee..98253d7b8a6 100644 --- a/integration-test/config/install-zookeeper.sh +++ b/integration-test/config/install-zookeeper.sh @@ -14,7 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # -apt-get --yes install zookeeper=3.3.5* zookeeperd=3.3.5* +# $1 is the Zookeeper version to install +apt-get --yes install zookeeper=$1 zookeeperd=$1 service zookeeper stop echo maxClientCnxns=200 >> /etc/zookeeper/conf/zoo.cfg service zookeeper start diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 7ec95840242..8197c72a430 100755 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -78,7 +78,7 @@ com.google.code.gson gson - 2.3.1 + 2.8.0 com.google.code.findbugs diff --git a/integration-test/run-it.sh b/integration-test/run-it.sh index 3194a926e00..d57a4d6eb2f 100755 --- a/integration-test/run-it.sh +++ b/integration-test/run-it.sh @@ -32,11 +32,11 @@ function list_storm_processes() { list_storm_processes || true # increasing swap space so we can run lots of workers -sudo dd if=/dev/zero of=/swapfile.img bs=8192 count=1M +sudo dd if=/dev/zero of=/swapfile.img bs=4096 count=1M sudo mkswap /swapfile.img sudo swapon /swapfile.img -if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8 +if [[ "${USER}" == "ubuntu" ]]; then # install oracle jdk8 sudo apt-get update sudo apt-get -y install python-software-properties sudo apt-add-repository -y ppa:webupd8team/java @@ -44,16 +44,18 @@ if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8 echo "oracle-java8-installer shared/accepted-oracle-license-v1-1 select true" | sudo debconf-set-selections sudo apt-get install -y oracle-java8-installer sudo apt-get -y install maven + sudo apt-get install unzip java -version mvn --version export MAVEN_OPTS="-Xmx3000m" + zookeeper_version=3.4.8* else ( while true; do echo "heartbeat"; sleep 300; done ) & #heartbeat needed by travis ci - (cd "${STORM_SRC_DIR}" && mvn clean install -DskipTests=true) || die "maven install command failed" if [[ "${USER}" == "travis" ]]; then ( cd "${STORM_SRC_DIR}/storm-dist/binary" && mvn clean package -Dgpg.skip=true ) fi (( $(find "${STORM_SRC_DIR}/storm-dist/binary" -iname 'apache-storm*.zip' | wc -l) == 1 )) || die "expected exactly one zip file, did you run: cd ${STORM_SRC_DIR}/storm-dist/binary && mvn clean package -Dgpg.skip=true" + zookeeper_version=3.3.5* fi storm_binary_zip=$(find "${STORM_SRC_DIR}/storm-dist" -iname '*.zip') @@ -64,7 +66,7 @@ echo "Using storm version:" ${STORM_VERSION} # setup storm cluster list_storm_processes || true sudo bash "${SCRIPT_DIR}/config/common.sh" -sudo bash "${SCRIPT_DIR}/config/install-zookeeper.sh" +sudo bash "${SCRIPT_DIR}/config/install-zookeeper.sh" "$zookeeper_version" sudo bash "${SCRIPT_DIR}/config/install-storm.sh" "$storm_binary_zip" export JAVA_HOME="${JAVA_HOME}" env diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java index be00ffca553..7a9163df125 100644 --- a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java @@ -114,7 +114,9 @@ public void open(Map conf, TopologyContext context, SpoutOutputC @Override public void nextTuple() { - TimeUtil.sleepMilliSec(rng.nextInt(800)); + //Emitting too quickly can lead to spurious test failures because the worker log may roll right before we read it + //Sleep a bit between emits + TimeUtil.sleepMilliSec(rng.nextInt(100)); currentNum++; TimeData data = TimeData.newData(currentNum); final Values tuple = data.getValues(); diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java index 5d1b53beebb..a52ae7c6413 100644 --- a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.TimeUnit; /** * Computes sliding window sum @@ -74,7 +73,6 @@ public StormTopology newTopology() { builder.setSpout(getSpoutName(), new IncrementingSpout(), 1); builder.setBolt(getBoltName(), new VerificationBolt() - .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)) .withWindow(new BaseWindowedBolt.Count(windowSize), new BaseWindowedBolt.Count(slideSize)), 1) .shuffleGrouping(getSpoutName()); diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java index d7fbbca4141..64d7441a664 100644 --- a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java @@ -111,7 +111,9 @@ public void open(Map conf, TopologyContext context, SpoutOutputC @Override public void nextTuple() { - TimeUtil.sleepMilliSec(rng.nextInt(800)); + //Emitting too quickly can lead to spurious test failures because the worker log may roll right before we read it + //Sleep a bit between emits + TimeUtil.sleepMilliSec(rng.nextInt(100)); currentNum++; TimeData data = TimeData.newData(currentNum); final Values tuple = data.getValues(); diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java index 110c982519c..05351df74a9 100644 --- a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.TimeUnit; /** * Computes sliding window sum @@ -72,7 +71,6 @@ public StormTopology newTopology() { builder.setSpout(getSpoutName(), new IncrementingSpout(), 1); builder.setBolt(getBoltName(), new VerificationBolt() - .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)) .withTumblingWindow(new BaseWindowedBolt.Count(tumbleSize)), 1) .shuffleGrouping(getSpoutName()); return builder.createTopology(); diff --git a/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java index 34c2b65592a..690121244f3 100644 --- a/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java +++ b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java @@ -17,6 +17,7 @@ package org.apache.storm.st.utils; +import java.nio.charset.StandardCharsets; import org.apache.commons.lang.StringUtils; public class StringDecorator { diff --git a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java index 5cef9fa3e6f..3de1a7d45c2 100644 --- a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java +++ b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java @@ -17,6 +17,7 @@ package org.apache.storm.st.tests.window; +import java.io.IOException; import org.apache.storm.st.helper.AbstractTest; import org.apache.storm.st.wrapper.LogData; import org.apache.storm.st.wrapper.TopoWrap; @@ -79,21 +80,25 @@ public void testWindowCount(int windowSize, int slideSize) throws Exception { runAndVerifyCount(windowSize, slideSize, testable, topo); } - static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws TException, MalformedURLException { + static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws IOException, TException, MalformedURLException { topo.submitSuccessfully(); - final int minSpoutEmits = 1000 + windowSize; final int minBoltEmits = 5; + //Sliding windows should produce one window every slideSize tuples + //Wait for the spout to emit at least enough tuples to get minBoltEmit windows and at least one full window + final int minSpoutEmits = Math.max(windowSize, minBoltEmits * slideSize); + String boltName = testable.getBoltName(); String spoutName = testable.getSpoutName(); - topo.waitForProgress(minSpoutEmits, spoutName, 180); - topo.waitForProgress(minBoltEmits, boltName, 180); + //Waiting for spout tuples isn't strictly necessary since we also wait for bolt emits, but do it anyway + topo.assertProgress(minSpoutEmits, spoutName, 180); + topo.assertProgress(minBoltEmits, boltName, 180); List boltUrls = topo.getLogUrls(boltName); log.info(boltUrls.toString()); final List allBoltData = topo.getLogData(boltName); final List allSpoutData = topo.getLogData(spoutName); Assert.assertTrue(allBoltData.size() >= minBoltEmits, "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData); - final int numberOfWindows = allBoltData.size() - windowSize / slideSize; + final int numberOfWindows = allBoltData.size(); for(int i = 0; i < numberOfWindows; ++i ) { log.info("Comparing window: " + (i + 1) + " of " + numberOfWindows); final int toIndex = (i + 1) * slideSize; @@ -143,28 +148,29 @@ public void testTimeWindow(int windowSec, int slideSec) throws Exception { runAndVerifyTime(windowSec, slideSec, testable, topo); } - static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws TException, java.net.MalformedURLException { + static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws IOException, TException, java.net.MalformedURLException { topo.submitSuccessfully(); - final int minSpoutEmits = 1000 + windowSec; + final int minSpoutEmits = 100; final int minBoltEmits = 5; String boltName = testable.getBoltName(); String spoutName = testable.getSpoutName(); - topo.waitForProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec)); - topo.waitForProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec)); - final List allSpoutData = topo.getLogData(spoutName, TimeData.CLS); - final List allBoltLog = topo.getLogData(boltName); - final List allBoltData = topo.getLogData(boltName, TimeDataWindow.CLS); - Assert.assertTrue(allBoltLog.size() >= minBoltEmits, - "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltLog.size() + " \n\t" + allBoltLog); - final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutData.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec); - final int numberOfWindows = allBoltLog.size() - windowSec / slideSec; + //Waiting for spout tuples isn't strictly necessary since we also wait for bolt emits, but do it anyway + topo.assertProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec)); + topo.assertProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec)); + final List allSpoutDataDeserialized = topo.getLogData(spoutName, TimeData.CLS); + final List allBoltData = topo.getLogData(boltName); + final List allBoltDataDeserialized = topo.deserializeLogData(allBoltData, TimeDataWindow.CLS); + Assert.assertTrue(allBoltData.size() >= minBoltEmits, + "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData); + final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutDataDeserialized.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec); + final int numberOfWindows = allBoltData.size(); for(int i = 0; i < numberOfWindows; ++i ) { final DateTime toDate = firstEndTime.plusSeconds(i * slideSec); final DateTime fromDate = toDate.minusSeconds(windowSec); log.info("Comparing window: " + fromDate + " to " + toDate + " iter " + (i+1) + "/" + numberOfWindows); - final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutData,fromDate, toDate); - final LogData oneBoltLog = allBoltLog.get(i); - final TimeDataWindow actualWindow = allBoltData.get(i); + final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutDataDeserialized,fromDate, toDate); + final LogData oneBoltLog = allBoltData.get(i); + final TimeDataWindow actualWindow = allBoltDataDeserialized.get(i); log.info("Actual window: " + actualWindow.getDescription()); log.info("Computed window: " + computedWindow.getDescription()); for (TimeData oneLog : computedWindow) { diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java index ac6d0c7ed2a..af1e1bead77 100644 --- a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java +++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java @@ -53,6 +53,7 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.text.NumberFormat; import java.util.ArrayList; import java.util.Arrays; @@ -65,6 +66,8 @@ import java.util.Map; import java.util.Set; import java.util.regex.Pattern; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; public class TopoWrap { private static Logger log = LoggerFactory.getLogger(TopoWrap.class); @@ -97,6 +100,9 @@ private static Map getSubmitConf() { submitConf.put("storm.zookeeper.topology.auth.scheme", "digest"); submitConf.put("topology.workers", 3); submitConf.put("topology.debug", true); + //Set the metrics sample rate to 1 to force update the executor stats every time something happens + //This is necessary because getAllTimeEmittedCount relies on the executor emit stats to be accurate + submitConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 1); return submitConf; } @@ -180,7 +186,7 @@ public Long getEmittedCount(@Nonnull ExecutorSummary input, @Nonnull String sinc Map allTime = emitted.get(since); if (allTime == null) return 0L; - return allTime.get("default"); + return allTime.get(Utils.DEFAULT_STREAM_ID); } }); return sum(ackCounts).longValue(); @@ -213,7 +219,7 @@ public void waitForProgress(int minEmits, String componentName, int maxWaitSec) log.info(getInfo().toString()); long emitCount = getAllTimeEmittedCount(componentName); log.info("Count for component " + componentName + " is " + emitCount); - if (emitCount > minEmits) { + if (emitCount >= minEmits) { break; } TimeUtil.sleepSec(10); @@ -280,8 +286,13 @@ public String toString() { } } - public > List getLogData(final String componentId, final FromJson cls) throws TException, MalformedURLException { + public > List getLogData(final String componentId, final FromJson cls) + throws IOException, TException, MalformedURLException { final List logData = getLogData(componentId); + return deserializeLogData(logData, cls); + } + + public > List deserializeLogData(final List logData, final FromJson cls) { final List data = new ArrayList<>( Collections2.transform(logData, new Function() { @Nullable @@ -294,7 +305,7 @@ public T apply(@Nullable LogData input) { return data; } - public List getLogData(final String componentId) throws TException, MalformedURLException { + public List getLogData(final String componentId) throws IOException, TException, MalformedURLException { final String logs = getLogs(componentId); final String dateRegex = "\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}"; Pattern pattern = Pattern.compile("(?=\\n" + dateRegex + ")"); @@ -318,48 +329,38 @@ public LogData apply(@Nullable String input) { return sortedLogs; } - public String getLogs(final String componentId) throws TException, MalformedURLException { + public String getLogs(final String componentId) throws IOException, TException, MalformedURLException { log.info("Fetching logs for componentId = " + componentId); List exclaim2Urls = getLogUrls(componentId); log.info("Found " + exclaim2Urls.size() + " urls: " + exclaim2Urls.toString()); - Collection urlOuputs = Collections2.transform(exclaim2Urls, new Function() { - @Nullable - @Override - public String apply(@Nullable ExecutorURL executorURL) { - if (executorURL == null || executorURL.getDownloadUrl() == null) { - return ""; - } - String warnMessage = "Couldn't fetch executorURL: " + executorURL; + List urlContents = new ArrayList<>(); + for(ExecutorURL executorUrl : exclaim2Urls) { + if(executorUrl == null || executorUrl.getDownloadUrl() == null) { + continue; + } + log.info("Fetching: " + executorUrl); + URL downloadUrl = executorUrl.downloadUrl; + String urlContent = IOUtils.toString(downloadUrl, StandardCharsets.UTF_8); + urlContents.add(urlContent); + if (urlContent.length() < 500) { + log.info("Fetched: " + urlContent); + } else { + log.info("Fetched: " + NumberFormat.getNumberInstance(Locale.US).format(urlContent.length()) + " bytes."); + } + if (System.getProperty("regression.downloadWorkerLogs").equalsIgnoreCase("true")) { + final String userDir = System.getProperty("user.dir"); + final File target = new File(userDir, "target"); + final File logDir = new File(target, "logs"); + final File logFile = new File(logDir, downloadUrl.getHost() + "-" + downloadUrl.getFile().split("/")[2]); try { - log.info("Fetching: " + executorURL); - final URL downloadUrl = executorURL.downloadUrl; - final String urlContent = IOUtils.toString(downloadUrl); - if (urlContent.length() < 500) { - log.info("Fetched: " + urlContent); - } else { - log.info("Fetched: " + NumberFormat.getNumberInstance(Locale.US).format(urlContent.length()) + " bytes."); - } - if (System.getProperty("regression.downloadWorkerLogs").equalsIgnoreCase("true")) { - final String userDir = System.getProperty("user.dir"); - final File target = new File(userDir, "target"); - final File logDir = new File(target, "logs"); - final File logFile = new File(logDir, downloadUrl.getHost() + "-" + downloadUrl.getFile().split("/")[2]); - try { - FileUtils.forceMkdir(logDir); - FileUtils.write(logFile, urlContent); - } catch (Throwable throwable) { - log.info("Caught exteption: " + ExceptionUtils.getFullStackTrace(throwable)); - } - } - return urlContent; - } catch (IOException e) { - log.warn(warnMessage); + FileUtils.forceMkdir(logDir); + FileUtils.write(logFile, urlContent, StandardCharsets.UTF_8); + } catch (Throwable throwable) { + log.info("Caught exception: " + ExceptionUtils.getFullStackTrace(throwable)); } - String stars = StringUtils.repeat("*", 30); - return stars + " " + warnMessage + " " + stars; } - }); - return StringUtils.join(urlOuputs, '\n'); + } + return StringUtils.join(urlContents, '\n'); } private Number sum(Collection nums) { diff --git a/pom.xml b/pom.xml index bdcc716d634..f96cfd57a15 100644 --- a/pom.xml +++ b/pom.xml @@ -284,6 +284,7 @@ 4.11 2.5.1 2.1.7 + 1.3 1.11.0 @@ -619,7 +620,7 @@ jersey-container-servlet-core ${jersey.version} - + org.glassfish.jersey.containers jersey-container-jetty-http ${jersey.version} @@ -957,6 +958,24 @@ + + org.mockito + mockito-core + ${mockito.version} + test + + + org.hamcrest + hamcrest-core + ${hamcrest.version} + test + + + org.hamcrest + hamcrest-library + ${hamcrest.version} + test + org.mockito mockito-all @@ -1097,7 +1116,7 @@ com.puppycrawl.tools checkstyle + based on the google_checks.xml from the version of checkstyle you are choosing. --> 7.7 diff --git a/storm-client/pom.xml b/storm-client/pom.xml index 30d7685d6c1..538b7b91013 100644 --- a/storm-client/pom.xml +++ b/storm-client/pom.xml @@ -187,7 +187,12 @@ org.mockito - mockito-all + mockito-core + test + + + org.hamcrest + hamcrest-library test diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java index 2ada9bf8962..56c329ee369 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java @@ -199,8 +199,7 @@ private WindowManager initWindowManager(WindowLifecycleListener li // validate validate(topoConf, windowLengthCount, windowLengthDuration, slidingIntervalCount, slidingIntervalDuration); - evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration, - manager); + evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration); triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, manager, evictionPolicy); manager.setEvictionPolicy(evictionPolicy); @@ -251,8 +250,7 @@ private TriggerPolicy getTriggerPolicy(Count slidingIntervalCount, Durati } } - private EvictionPolicy getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration, - WindowManager manager) { + private EvictionPolicy getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration) { if (windowLengthCount != null) { if (isTupleTs()) { return new WatermarkCountEvictionPolicy<>(windowLengthCount.value); diff --git a/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java b/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java index 37dcfd90cc4..ee5fdb90e56 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java @@ -38,7 +38,7 @@ public interface EvictionContext { Long getSlidingCount(); /** - * Returns the current count of events in the queue up to the reference tim + * Returns the current count of events in the queue up to the reference time * based on which count based evictions can be performed. * * @return the current count diff --git a/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java index 774d0a3714b..fa44444040d 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java @@ -27,7 +27,7 @@ public interface EvictionPolicy { /** * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked. */ - enum Action { + public enum Action { /** * expire the event and remove it from the queue */ diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java index 802e6bb4291..570b05779a5 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java @@ -38,7 +38,7 @@ public TimeEvictionPolicy(int windowLength) { * {@inheritDoc} */ @Override - public Action evict(Event event) { + public Action evict(Event event) { long now = evictionContext == null ? System.currentTimeMillis() : evictionContext.getReferenceTime(); long diff = now - event.getTimestamp(); if (diff >= windowLength) { diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java index b057afb5057..6b6d9fa586b 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java @@ -114,9 +114,7 @@ public void run() { * set the current timestamp as the reference time for the eviction policy * to evict the events */ - if (evictionPolicy != null) { - evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis())); - } + evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis())); handler.onTrigger(); } catch (Throwable th) { LOG.error("handler.onTrigger failed ", th); diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java b/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java index e1df72c39cb..ef81d6ee393 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java @@ -77,7 +77,7 @@ public void run() { try { long waterMarkTs = computeWaterMarkTs(); if (waterMarkTs > lastWaterMarkTs) { - this.windowManager.add(new WaterMarkEvent(waterMarkTs)); + this.windowManager.add(new WaterMarkEvent<>(waterMarkTs)); lastWaterMarkTs = waterMarkTs; } } catch (Throwable th) { diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java index c5d7b496385..0fe6f753beb 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java @@ -19,7 +19,7 @@ /** * An eviction policy that tracks count based on watermark ts and - * evicts events upto the watermark based on a threshold count. + * evicts events up to the watermark based on a threshold count. * * @param the type of event tracked by this policy. */ @@ -32,6 +32,13 @@ public WatermarkCountEvictionPolicy(int count) { @Override public Action evict(Event event) { + if(getContext() == null) { + //It is possible to get asked about eviction before we have a context, due to WindowManager.compactWindow. + //In this case we should hold on to all the events. When the first watermark is received, the context will be set, + //and the events will be reevaluated for eviction + return Action.STOP; + } + Action action; if (event.getTimestamp() <= getContext().getReferenceTime() && processed < currentCount.get()) { action = super.evict(event); diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java index e5ecba4710d..fdb3917583b 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java @@ -57,7 +57,14 @@ public WatermarkTimeEvictionPolicy(int windowLength, int lag) { */ @Override public Action evict(Event event) { - long referenceTime = evictionContext.getReferenceTime() != null ? evictionContext.getReferenceTime() : 0L; + if(evictionContext == null) { + //It is possible to get asked about eviction before we have a context, due to WindowManager.compactWindow. + //In this case we should hold on to all the events. When the first watermark is received, the context will be set, + //and the events will be reevaluated for eviction + return Action.STOP; + } + + long referenceTime = evictionContext.getReferenceTime(); long diff = referenceTime - event.getTimestamp(); if (diff < -lag) { return Action.STOP; diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java b/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java index 8021ba83729..f6cc52159ae 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java @@ -46,6 +46,9 @@ public class WindowManager implements TriggerHandler { /** * Expire old events every EXPIRE_EVENTS_THRESHOLD to * keep the window size in check. + * + * Note that if the eviction policy is based on watermarks, events will not be evicted until a new + * watermark would cause them to be considered expired anyway, regardless of this limit */ public static final int EXPIRE_EVENTS_THRESHOLD = 100; diff --git a/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java b/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java index 6c170c64b33..178c1bb0e8a 100644 --- a/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java +++ b/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java @@ -29,9 +29,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -import static org.apache.storm.topology.base.BaseWindowedBolt.Count; import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.empty; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** @@ -155,7 +158,48 @@ public void testExpireThreshold() throws Exception { // window should be compacted and events should be expired. assertEquals(seq(1, threshold - windowLength), listener.onExpiryEvents); } - + + private void testEvictBeforeWatermarkForWatermarkEvictionPolicy(EvictionPolicy watermarkEvictionPolicy, int windowLength) throws Exception { + /** + * The watermark eviction policy must not evict tuples until the first watermark has been received. + * The policies can't make a meaningful decision prior to the first watermark, so the safe decision + * is to postpone eviction. + */ + int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD; + windowManager.setEvictionPolicy(watermarkEvictionPolicy); + WatermarkCountTriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(windowLength, windowManager, + watermarkEvictionPolicy, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + for (int i : seq(1, threshold)) { + windowManager.add(i, i); + } + assertThat("The watermark eviction policies should never evict events before the first watermark is received", listener.onExpiryEvents, is(empty())); + windowManager.add(new WaterMarkEvent<>(threshold)); + // The events should be put in a window when the first watermark is received + assertEquals(seq(1, threshold), listener.onActivationEvents); + //Now add some more events and a new watermark, and check that the previous events are expired + for(int i : seq(threshold+1, threshold*2)) { + windowManager.add(i, i); + } + windowManager.add(new WaterMarkEvent<>(threshold + windowLength+1)); + //All the events should be expired when the next watermark is received + assertThat("All the events should be expired after the second watermark", listener.onExpiryEvents, equalTo(seq(1, threshold))); + } + + @Test + public void testExpireThresholdWithWatermarkCountEvictionPolicy() throws Exception { + int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD; + EvictionPolicy watermarkCountEvictionPolicy = new WatermarkCountEvictionPolicy(windowLength); + testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkCountEvictionPolicy, windowLength); + } + + @Test + public void testExpireThresholdWithWatermarkTimeEvictionPolicy() throws Exception { + int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD; + EvictionPolicy watermarkTimeEvictionPolicy = new WatermarkTimeEvictionPolicy(windowLength); + testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkTimeEvictionPolicy, windowLength); + } @Test public void testTimeBasedWindow() throws Exception {