Skip to content

Commit

Permalink
STORM-2525: Fix flaky integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
srdo committed May 20, 2017
1 parent 64e29f3 commit ebd3288
Show file tree
Hide file tree
Showing 25 changed files with 185 additions and 195 deletions.
8 changes: 4 additions & 4 deletions integration-test/config/Vagrantfile
Expand Up @@ -19,7 +19,7 @@
require 'uri'
# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"
STORM_BOX_TYPE = "hashicorp/precise64"
STORM_BOX_TYPE = "ubuntu/xenial64"
STORM_ZIP = Dir.glob("../../storm-dist/binary/**/*.zip")
if(STORM_ZIP.length != 1)
raise "Expected one storm-binary found: " + STORM_ZIP.join(",") + ". Did you run : cd ${STORM_SRC_DIR}/storm-dist/binary && mvn clean package -Dgpg.skip=true"
Expand Down Expand Up @@ -53,16 +53,16 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
}
end

config.vm.synced_folder "../../", "/home/vagrant/build/vagrant/storm"
config.vm.synced_folder "~/.m2", "/home/vagrant/.m2"
config.vm.synced_folder "../../", "/home/ubuntu/build/vagrant/storm"
config.vm.synced_folder "~/.m2", "/home/ubuntu/.m2"

config.vm.define "node1" do |node1|
node1.vm.provider "virtualbox" do |v|
v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"]
end
node1.vm.network "private_network", ip: "192.168.50.3"
node1.vm.hostname = "node1"
node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/vagrant/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false
node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/ubuntu/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false
#node1.vm.provision :shell, :inline => "sudo ln -fs /vagrant/etc-hosts /etc/hosts"
end

Expand Down
101 changes: 0 additions & 101 deletions integration-test/config/cluster.xml

This file was deleted.

1 change: 0 additions & 1 deletion integration-test/config/install-storm.sh
Expand Up @@ -31,7 +31,6 @@ chown -R storm:storm /etc/storm

rm /usr/share/storm/conf/storm.yaml
cp "${SCRIPT_DIR}/storm.yaml" /usr/share/storm/conf/
cp "${SCRIPT_DIR}/cluster.xml" /usr/share/storm/logback/
ln -s /usr/share/storm/conf/storm.yaml /etc/storm/conf/storm.yaml

mkdir /var/log/storm
Expand Down
3 changes: 2 additions & 1 deletion integration-test/config/install-zookeeper.sh
Expand Up @@ -14,7 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
apt-get --yes install zookeeper=3.3.5* zookeeperd=3.3.5*
# $1 is the Zookeeper version to install
apt-get --yes install zookeeper=$1 zookeeperd=$1
service zookeeper stop
echo maxClientCnxns=200 >> /etc/zookeeper/conf/zoo.cfg
service zookeeper start
2 changes: 1 addition & 1 deletion integration-test/pom.xml
Expand Up @@ -78,7 +78,7 @@
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
Expand Down
10 changes: 6 additions & 4 deletions integration-test/run-it.sh
Expand Up @@ -32,28 +32,30 @@ function list_storm_processes() {

list_storm_processes || true
# increasing swap space so we can run lots of workers
sudo dd if=/dev/zero of=/swapfile.img bs=8192 count=1M
sudo dd if=/dev/zero of=/swapfile.img bs=4096 count=1M
sudo mkswap /swapfile.img
sudo swapon /swapfile.img

if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8
if [[ "${USER}" == "ubuntu" ]]; then # install oracle jdk8
sudo apt-get update
sudo apt-get -y install python-software-properties
sudo apt-add-repository -y ppa:webupd8team/java
sudo apt-get update
echo "oracle-java8-installer shared/accepted-oracle-license-v1-1 select true" | sudo debconf-set-selections
sudo apt-get install -y oracle-java8-installer
sudo apt-get -y install maven
sudo apt-get install unzip
java -version
mvn --version
export MAVEN_OPTS="-Xmx3000m"
zookeeper_version=3.4.8*
else
( while true; do echo "heartbeat"; sleep 300; done ) & #heartbeat needed by travis ci
(cd "${STORM_SRC_DIR}" && mvn clean install -DskipTests=true) || die "maven install command failed"
if [[ "${USER}" == "travis" ]]; then
( cd "${STORM_SRC_DIR}/storm-dist/binary" && mvn clean package -Dgpg.skip=true )
fi
(( $(find "${STORM_SRC_DIR}/storm-dist/binary" -iname 'apache-storm*.zip' | wc -l) == 1 )) || die "expected exactly one zip file, did you run: cd ${STORM_SRC_DIR}/storm-dist/binary && mvn clean package -Dgpg.skip=true"
zookeeper_version=3.3.5*
fi

storm_binary_zip=$(find "${STORM_SRC_DIR}/storm-dist" -iname '*.zip')
Expand All @@ -64,7 +66,7 @@ echo "Using storm version:" ${STORM_VERSION}
# setup storm cluster
list_storm_processes || true
sudo bash "${SCRIPT_DIR}/config/common.sh"
sudo bash "${SCRIPT_DIR}/config/install-zookeeper.sh"
sudo bash "${SCRIPT_DIR}/config/install-zookeeper.sh" "$zookeeper_version"
sudo bash "${SCRIPT_DIR}/config/install-storm.sh" "$storm_binary_zip"
export JAVA_HOME="${JAVA_HOME}"
env
Expand Down
Expand Up @@ -114,7 +114,9 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC

@Override
public void nextTuple() {
TimeUtil.sleepMilliSec(rng.nextInt(800));
//Emitting too quickly can lead to spurious test failures because the worker log may roll right before we read it
//Sleep a bit between emits
TimeUtil.sleepMilliSec(rng.nextInt(100));
currentNum++;
TimeData data = TimeData.newData(currentNum);
final Values tuple = data.getValues();
Expand Down
Expand Up @@ -39,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* Computes sliding window sum
Expand Down Expand Up @@ -74,7 +73,6 @@ public StormTopology newTopology() {
builder.setSpout(getSpoutName(), new IncrementingSpout(), 1);
builder.setBolt(getBoltName(),
new VerificationBolt()
.withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
.withWindow(new BaseWindowedBolt.Count(windowSize), new BaseWindowedBolt.Count(slideSize)),
1)
.shuffleGrouping(getSpoutName());
Expand Down
Expand Up @@ -111,7 +111,9 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC

@Override
public void nextTuple() {
TimeUtil.sleepMilliSec(rng.nextInt(800));
//Emitting too quickly can lead to spurious test failures because the worker log may roll right before we read it
//Sleep a bit between emits
TimeUtil.sleepMilliSec(rng.nextInt(100));
currentNum++;
TimeData data = TimeData.newData(currentNum);
final Values tuple = data.getValues();
Expand Down
Expand Up @@ -39,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* Computes sliding window sum
Expand Down Expand Up @@ -72,7 +71,6 @@ public StormTopology newTopology() {
builder.setSpout(getSpoutName(), new IncrementingSpout(), 1);
builder.setBolt(getBoltName(),
new VerificationBolt()
.withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
.withTumblingWindow(new BaseWindowedBolt.Count(tumbleSize)), 1)
.shuffleGrouping(getSpoutName());
return builder.createTopology();
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.storm.st.utils;

import java.nio.charset.StandardCharsets;
import org.apache.commons.lang.StringUtils;

public class StringDecorator {
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.storm.st.tests.window;

import java.io.IOException;
import org.apache.storm.st.helper.AbstractTest;
import org.apache.storm.st.wrapper.LogData;
import org.apache.storm.st.wrapper.TopoWrap;
Expand Down Expand Up @@ -79,21 +80,25 @@ public void testWindowCount(int windowSize, int slideSize) throws Exception {
runAndVerifyCount(windowSize, slideSize, testable, topo);
}

static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws TException, MalformedURLException {
static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws IOException, TException, MalformedURLException {
topo.submitSuccessfully();
final int minSpoutEmits = 1000 + windowSize;
final int minBoltEmits = 5;
//Sliding windows should produce one window every slideSize tuples
//Wait for the spout to emit at least enough tuples to get minBoltEmit windows and at least one full window
final int minSpoutEmits = Math.max(windowSize, minBoltEmits * slideSize);

String boltName = testable.getBoltName();
String spoutName = testable.getSpoutName();
topo.waitForProgress(minSpoutEmits, spoutName, 180);
topo.waitForProgress(minBoltEmits, boltName, 180);
//Waiting for spout tuples isn't strictly necessary since we also wait for bolt emits, but do it anyway
topo.assertProgress(minSpoutEmits, spoutName, 180);
topo.assertProgress(minBoltEmits, boltName, 180);
List<TopoWrap.ExecutorURL> boltUrls = topo.getLogUrls(boltName);
log.info(boltUrls.toString());
final List<LogData> allBoltData = topo.getLogData(boltName);
final List<LogData> allSpoutData = topo.getLogData(spoutName);
Assert.assertTrue(allBoltData.size() >= minBoltEmits,
"Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData);
final int numberOfWindows = allBoltData.size() - windowSize / slideSize;
final int numberOfWindows = allBoltData.size();
for(int i = 0; i < numberOfWindows; ++i ) {
log.info("Comparing window: " + (i + 1) + " of " + numberOfWindows);
final int toIndex = (i + 1) * slideSize;
Expand Down Expand Up @@ -143,28 +148,29 @@ public void testTimeWindow(int windowSec, int slideSec) throws Exception {
runAndVerifyTime(windowSec, slideSec, testable, topo);
}

static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws TException, java.net.MalformedURLException {
static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws IOException, TException, java.net.MalformedURLException {
topo.submitSuccessfully();
final int minSpoutEmits = 1000 + windowSec;
final int minSpoutEmits = 100;
final int minBoltEmits = 5;
String boltName = testable.getBoltName();
String spoutName = testable.getSpoutName();
topo.waitForProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec));
topo.waitForProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec));
final List<TimeData> allSpoutData = topo.getLogData(spoutName, TimeData.CLS);
final List<LogData> allBoltLog = topo.getLogData(boltName);
final List<TimeDataWindow> allBoltData = topo.getLogData(boltName, TimeDataWindow.CLS);
Assert.assertTrue(allBoltLog.size() >= minBoltEmits,
"Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltLog.size() + " \n\t" + allBoltLog);
final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutData.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec);
final int numberOfWindows = allBoltLog.size() - windowSec / slideSec;
//Waiting for spout tuples isn't strictly necessary since we also wait for bolt emits, but do it anyway
topo.assertProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec));
topo.assertProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec));
final List<TimeData> allSpoutDataDeserialized = topo.getLogData(spoutName, TimeData.CLS);
final List<LogData> allBoltData = topo.getLogData(boltName);
final List<TimeDataWindow> allBoltDataDeserialized = topo.deserializeLogData(allBoltData, TimeDataWindow.CLS);
Assert.assertTrue(allBoltData.size() >= minBoltEmits,
"Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData);
final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutDataDeserialized.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec);
final int numberOfWindows = allBoltData.size();
for(int i = 0; i < numberOfWindows; ++i ) {
final DateTime toDate = firstEndTime.plusSeconds(i * slideSec);
final DateTime fromDate = toDate.minusSeconds(windowSec);
log.info("Comparing window: " + fromDate + " to " + toDate + " iter " + (i+1) + "/" + numberOfWindows);
final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutData,fromDate, toDate);
final LogData oneBoltLog = allBoltLog.get(i);
final TimeDataWindow actualWindow = allBoltData.get(i);
final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutDataDeserialized,fromDate, toDate);
final LogData oneBoltLog = allBoltData.get(i);
final TimeDataWindow actualWindow = allBoltDataDeserialized.get(i);
log.info("Actual window: " + actualWindow.getDescription());
log.info("Computed window: " + computedWindow.getDescription());
for (TimeData oneLog : computedWindow) {
Expand Down

0 comments on commit ebd3288

Please sign in to comment.