From bfc66f7f352690605ffeb03b6146a13e969f1bfd Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Tue, 6 Jun 2017 11:20:35 +0100 Subject: [PATCH 1/6] Test tear-down: limit number of threads in Entities.destroyAll In stress/scale tests that create 1000s of apps, trying to stop them all concurrently with a thread per app causes an OutOfMemoryError due to too many threads. --- .../main/java/org/apache/brooklyn/core/entity/Entities.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java index acad9d268a..46a9ceafbb 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java @@ -879,6 +879,8 @@ public static void destroyCatching(Location loc) { * Apps will be stopped+destroyed+unmanaged concurrently, waiting for all to complete. */ public static void destroyAll(final ManagementContext mgmt) { + final int MAX_THREADS = 100; + if (mgmt instanceof NonDeploymentManagementContext) { // log here because it is easy for tests to destroyAll(app.getMgmtContext()) // which will *not* destroy the mgmt context if the app has been stopped! @@ -889,7 +891,7 @@ public static void destroyAll(final ManagementContext mgmt) { } if (!mgmt.isRunning()) return; - ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS)); List> futures = Lists.newArrayList(); final AtomicReference error = Atomics.newReference(); try { From ffd799eba6f5768383f1fe651515d39450a2f942 Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Tue, 6 Jun 2017 11:21:25 +0100 Subject: [PATCH 2/6] AbstractPerformanceTest to extends BrooklynAppUnitTestSupport --- .../performance/AbstractPerformanceTest.java | 34 +++++++++++-------- .../BlobStorePersistencePerformanceTest.java | 4 +++ 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/AbstractPerformanceTest.java b/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/AbstractPerformanceTest.java index d6b77d36fa..ab6978d595 100644 --- a/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/AbstractPerformanceTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/AbstractPerformanceTest.java @@ -23,18 +23,15 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.brooklyn.api.mgmt.ManagementContext; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.entity.factory.ApplicationBuilder; +import org.apache.brooklyn.core.internal.BrooklynProperties; import org.apache.brooklyn.core.location.SimulatedLocation; -import org.apache.brooklyn.core.test.entity.TestApplication; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.test.performance.PerformanceMeasurer; import org.apache.brooklyn.test.performance.PerformanceTestDescriptor; import org.apache.brooklyn.test.performance.PerformanceTestResult; import org.apache.brooklyn.util.internal.DoubleSystemProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import com.google.common.base.Stopwatch; @@ -50,7 +47,7 @@ * We are also not running the tests for long enough to check if object creation is going to kill * performance in the long-term, etc. */ -public class AbstractPerformanceTest { +public class AbstractPerformanceTest extends BrooklynAppUnitTestSupport { private static final Logger LOG = LoggerFactory.getLogger(AbstractPerformanceTest.class); @@ -68,21 +65,30 @@ public class AbstractPerformanceTest { protected static final long TIMEOUT_MS = 10*1000; - protected TestApplication app; protected SimulatedLocation loc; - protected ManagementContext mgmt; @BeforeMethod(alwaysRun=true) + @Override public void setUp() throws Exception { + super.setUp(); for (int i = 0; i < 5; i++) System.gc(); - loc = new SimulatedLocation(); - app = ApplicationBuilder.newManagedApp(TestApplication.class); - mgmt = app.getManagementContext(); + loc = app.newSimulatedLocation(); + } + + @Override + protected BrooklynProperties getBrooklynProperties() { + if (useLiveManagementContext()) { + return BrooklynProperties.Factory.newDefault(); + } else { + return BrooklynProperties.Factory.newEmpty(); + } } - @AfterMethod(alwaysRun=true) - public void tearDown() throws Exception { - if (app != null) Entities.destroyAll(app.getManagementContext()); + /** + * For overriding; controls whether to load ~/.brooklyn/brooklyn.properties + */ + protected boolean useLiveManagementContext() { + return false; } protected PerformanceTestResult measure(PerformanceTestDescriptor options) { diff --git a/locations/jclouds/src/test/java/org/apache/brooklyn/core/mgmt/persist/jclouds/BlobStorePersistencePerformanceTest.java b/locations/jclouds/src/test/java/org/apache/brooklyn/core/mgmt/persist/jclouds/BlobStorePersistencePerformanceTest.java index 2238385188..70234d1feb 100644 --- a/locations/jclouds/src/test/java/org/apache/brooklyn/core/mgmt/persist/jclouds/BlobStorePersistencePerformanceTest.java +++ b/locations/jclouds/src/test/java/org/apache/brooklyn/core/mgmt/persist/jclouds/BlobStorePersistencePerformanceTest.java @@ -42,6 +42,10 @@ public class BlobStorePersistencePerformanceTest extends AbstractPerformanceTest JcloudsBlobStoreBasedObjectStore objectStore; StoreObjectAccessor blobstoreAccessor; + protected boolean useLiveManagementContext() { + return true; + } + @BeforeMethod(alwaysRun=true) @Override public void setUp() throws Exception { From 3f36a298e71027bdb3f92b6ba0425a8c98020c7a Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Tue, 6 Jun 2017 11:21:57 +0100 Subject: [PATCH 3/6] Performance tests: support numConcurrentJobs --- .../test/performance/PerformanceMeasurer.java | 34 +++++++++++++++++-- .../PerformanceTestDescriptor.java | 31 ++++++++++++++++- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java index b214c3b8c4..6aef945570 100644 --- a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java +++ b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java @@ -20,8 +20,11 @@ import static org.testng.Assert.fail; +import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -34,6 +37,10 @@ import com.google.common.annotations.Beta; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * For running simplistic performance tests, to measure the number of operations per second. @@ -89,12 +96,17 @@ public static PerformanceTestResult run(PerformanceTestDescriptor options) { sampleCpuFuture = PerformanceTestUtils.sampleProcessCpuTime(options.sampleCpuInterval, options.summary, cpuSampleFractions); } + int numConcurrentJobs = options.numConcurrentJobs; + ListeningExecutorService executorService = null; + if (numConcurrentJobs > 1) { + executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numConcurrentJobs)); + } try { long preCpuTime = PerformanceTestUtils.getProcessCpuTime(); Stopwatch watch = Stopwatch.createStarted(); while ((options.duration != null) ? options.duration.isLongerThan(watch) : counter < options.iterations) { - if (warmupWatch.elapsed(TimeUnit.MILLISECONDS) >= nextLogTime) { + if (watch.elapsed(TimeUnit.MILLISECONDS) >= nextLogTime) { LOG.info(options.summary+" iteration="+counter+" at "+Time.makeTimeStringRounded(watch)); nextLogTime += options.logInterval.toMilliseconds(); } @@ -106,7 +118,11 @@ public static PerformanceTestResult run(PerformanceTestDescriptor options) { } long before = watch.elapsed(TimeUnit.NANOSECONDS); - options.job.run(); + if (numConcurrentJobs > 1) { + runConcurrentAndBlock(executorService, options.job, numConcurrentJobs); + } else { + options.job.run(); + } if (options.histogram) { histogram.add(watch.elapsed(TimeUnit.NANOSECONDS) - before, TimeUnit.NANOSECONDS); } @@ -162,10 +178,24 @@ public static PerformanceTestResult run(PerformanceTestDescriptor options) { return result; + } catch (InterruptedException | ExecutionException e) { + throw Exceptions.propagate(e); } finally { + if (executorService != null) { + executorService.shutdownNow(); + } if (sampleCpuFuture != null) { sampleCpuFuture.cancel(true); } } } + + protected static void runConcurrentAndBlock(ListeningExecutorService executor, Runnable job, int numConcurrentJobs) throws InterruptedException, ExecutionException { + List> futures = new ArrayList>(numConcurrentJobs); + for (int i = 0; i < numConcurrentJobs; i++) { + ListenableFuture future = executor.submit(job); + futures.add(future); + Futures.allAsList(futures).get(); + } + } } diff --git a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java index a274fc7fe1..f75763ff7d 100644 --- a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java +++ b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java @@ -22,9 +22,13 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import static com.google.common.base.Preconditions.checkArgument; + import java.io.File; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.time.Duration; import org.apache.commons.io.FileUtils; @@ -52,6 +56,7 @@ public class PerformanceTestDescriptor { public Integer warmupIterations; public Duration duration; public Integer iterations; + public int numConcurrentJobs = 1; public Runnable job; public Runnable preJob; public Runnable postJob; @@ -112,7 +117,17 @@ public PerformanceTestDescriptor iterations(int val) { if (sealed) throw new IllegalStateException("Should not modify after sealed (e.g. after use)"); this.iterations = val; return this; } - + + /** + * The number of concurrent jobs to execute. If used with {@link #preJob(Runnable)} or {@link #postJob(Runnable)}, + * then those pre/post hooks will be called before/after the concurrent jobs are all done. + */ + public PerformanceTestDescriptor numConcurrentJobs(int val) { + if (sealed) throw new IllegalStateException("Should not modify after sealed (e.g. after use)"); + checkArgument(val >= 1, "val (%s) must be one or more", val); + this.numConcurrentJobs = val; return this; + } + /** * The job to be repeatedly executed. */ @@ -121,6 +136,20 @@ public PerformanceTestDescriptor job(Runnable val) { this.job = val; return this; } + /** + * See {@link #job(Runnable)} + */ + public PerformanceTestDescriptor job(Callable val) { + return job(new Runnable() { + public void run() { + try { + val.call(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + }}); + } + /** * To be run each time before the job (pausing the timer while this is run). */ From 9f758bf5b5678c5480299b97a564e46264a6148c Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Tue, 6 Jun 2017 13:01:21 +0100 Subject: [PATCH 4/6] Performance test: add options.postWarmup --- .../brooklyn/test/performance/PerformanceMeasurer.java | 2 ++ .../test/performance/PerformanceTestDescriptor.java | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java index 6aef945570..28a70439a3 100644 --- a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java +++ b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java @@ -86,6 +86,8 @@ public static PerformanceTestResult run(PerformanceTestDescriptor options) { } warmupWatch.stop(); + if (options.postWarmup != null) options.postWarmup.run(); + // Run the actual test (for the given duration / iterations); then wait for completionLatch (if supplied). nextLogTime = (options.logInterval == null) ? Long.MAX_VALUE : options.logInterval.toMilliseconds(); int counter = 0; diff --git a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java index f75763ff7d..54715ad022 100644 --- a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java +++ b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java @@ -60,6 +60,7 @@ public class PerformanceTestDescriptor { public Runnable job; public Runnable preJob; public Runnable postJob; + public Runnable postWarmup; public CountDownLatch completionLatch; public Duration completionLatchTimeout = Duration.FIVE_MINUTES; public Double minAcceptablePerSecond; @@ -166,6 +167,14 @@ public PerformanceTestDescriptor postJob(Runnable val) { this.postJob = val; return this; } + /** + * To be run once, after the warmup. + */ + public PerformanceTestDescriptor postWarmup(Runnable val) { + if (sealed) throw new IllegalStateException("Should not modify after sealed (e.g. after use)"); + this.postWarmup = val; return this; + } + /** * If non-null, the performance test will wait for this latch before stopping the timer. * This is useful for asynchronous work. For example, 1000 iterations of the job might From 06f5aac520c5504fe899710e3cc3a28c09e46cc5 Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Tue, 6 Jun 2017 13:02:04 +0100 Subject: [PATCH 5/6] Performance test: add abortIfIterationLongerThan (And log partial results if fails) --- .../test/performance/PerformanceMeasurer.java | 145 ++++++++++-------- .../PerformanceTestDescriptor.java | 15 +- 2 files changed, 90 insertions(+), 70 deletions(-) diff --git a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java index 28a70439a3..002b1bd2da 100644 --- a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java +++ b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceMeasurer.java @@ -103,85 +103,86 @@ public static PerformanceTestResult run(PerformanceTestDescriptor options) { if (numConcurrentJobs > 1) { executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numConcurrentJobs)); } + + PerformanceTestResult result = null; try { long preCpuTime = PerformanceTestUtils.getProcessCpuTime(); Stopwatch watch = Stopwatch.createStarted(); - while ((options.duration != null) ? options.duration.isLongerThan(watch) : counter < options.iterations) { - if (watch.elapsed(TimeUnit.MILLISECONDS) >= nextLogTime) { - LOG.info(options.summary+" iteration="+counter+" at "+Time.makeTimeStringRounded(watch)); - nextLogTime += options.logInterval.toMilliseconds(); + try { + while ((options.duration != null) ? options.duration.isLongerThan(watch) : counter < options.iterations) { + if (watch.elapsed(TimeUnit.MILLISECONDS) >= nextLogTime) { + LOG.info(options.summary+" iteration="+counter+" at "+Time.makeTimeStringRounded(watch)); + nextLogTime += options.logInterval.toMilliseconds(); + } + + if (options.preJob != null) { + watch.stop(); + options.preJob.run(); + watch.start(); + } + + long before = watch.elapsed(TimeUnit.NANOSECONDS); + if (numConcurrentJobs > 1) { + runConcurrentAndBlock(executorService, options.job, numConcurrentJobs); + } else { + options.job.run(); + } + Duration iterationDuration = Duration.of(watch.elapsed(TimeUnit.NANOSECONDS) - before, TimeUnit.NANOSECONDS); + + if (options.histogram) { + histogram.add(iterationDuration); + } + counter++; + + if (options.postJob != null) { + watch.stop(); + options.postJob.run(); + watch.start(); + } + + if (options.abortIfIterationLongerThan != null && options.abortIfIterationLongerThan.isShorterThan(iterationDuration)) { + fail("Iteration "+(counter-1)+" took "+iterationDuration+", which is longer than max permitted "+options.abortIfIterationLongerThan+" for "+options); + } } - if (options.preJob != null) { - watch.stop(); - options.preJob.run(); - watch.start(); - } - - long before = watch.elapsed(TimeUnit.NANOSECONDS); - if (numConcurrentJobs > 1) { - runConcurrentAndBlock(executorService, options.job, numConcurrentJobs); - } else { - options.job.run(); + if (options.completionLatch != null) { + try { + boolean success = options.completionLatch.await(options.completionLatchTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + if (!success) { + fail("Timeout waiting for completionLatch: test="+options+"; counter="+counter); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } } + } finally { + watch.stop(); + long postCpuTime = PerformanceTestUtils.getProcessCpuTime(); + + // Generate the results + result = new PerformanceTestResult(); + result.warmup = Duration.of(warmupWatch); + result.warmupIterations = warmupCounter; + result.duration = Duration.of(watch); + result.iterations = counter; + result.ratePerSecond = (((double)counter) / watch.elapsed(TimeUnit.MILLISECONDS)) * 1000; + result.cpuTotalFraction = (watch.elapsed(TimeUnit.NANOSECONDS) > 0 && preCpuTime >= 0) + ? ((double)postCpuTime-preCpuTime) / watch.elapsed(TimeUnit.NANOSECONDS) + : -1; if (options.histogram) { - histogram.add(watch.elapsed(TimeUnit.NANOSECONDS) - before, TimeUnit.NANOSECONDS); + result.histogram = histogram; } - counter++; - - if (options.postJob != null) { - watch.stop(); - options.postJob.run(); - watch.start(); + if (options.sampleCpuInterval != null) { + result.cpuSampleFractions = cpuSampleFractions; } + result.minAcceptablePerSecond = options.minAcceptablePerSecond; } - if (options.completionLatch != null) { - try { - boolean success = options.completionLatch.await(options.completionLatchTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - if (!success) { - fail("Timeout waiting for completionLatch: test="+options+"; counter="+counter); - } - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - } - watch.stop(); - long postCpuTime = PerformanceTestUtils.getProcessCpuTime(); - - // Generate the results - PerformanceTestResult result = new PerformanceTestResult(); - result.warmup = Duration.of(warmupWatch); - result.warmupIterations = warmupCounter; - result.duration = Duration.of(watch); - result.iterations = counter; - result.ratePerSecond = (((double)counter) / watch.elapsed(TimeUnit.MILLISECONDS)) * 1000; - result.cpuTotalFraction = (watch.elapsed(TimeUnit.NANOSECONDS) > 0 && preCpuTime >= 0) - ? ((double)postCpuTime-preCpuTime) / watch.elapsed(TimeUnit.NANOSECONDS) - : -1; - if (options.histogram) { - result.histogram = histogram; - } - if (options.sampleCpuInterval != null) { - result.cpuSampleFractions = cpuSampleFractions; - } - result.minAcceptablePerSecond = options.minAcceptablePerSecond; - - // Persist the results - if (options.persister != null) { - options.persister.persist(new Date(), options, result); - } - - // Fail if we didn't meet the minimum performance requirements - if (options.minAcceptablePerSecond != null && options.minAcceptablePerSecond > result.ratePerSecond) { - fail("Performance too low: test="+options+"; result="+result); - } + } catch (Throwable t) { + LOG.warn("Test failed; partial results before failure: test="+options+"; result="+result); + throw Exceptions.propagate(t); - return result; - - } catch (InterruptedException | ExecutionException e) { - throw Exceptions.propagate(e); } finally { if (executorService != null) { executorService.shutdownNow(); @@ -190,6 +191,18 @@ public static PerformanceTestResult run(PerformanceTestDescriptor options) { sampleCpuFuture.cancel(true); } } + + // Persist the results + if (options.persister != null) { + options.persister.persist(new Date(), options, result); + } + + // Fail if we didn't meet the minimum performance requirements + if (options.minAcceptablePerSecond != null && options.minAcceptablePerSecond > result.ratePerSecond) { + fail("Performance too low: test="+options+"; result="+result); + } + + return result; } protected static void runConcurrentAndBlock(ListeningExecutorService executor, Runnable job, int numConcurrentJobs) throws InterruptedException, ExecutionException { diff --git a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java index 54715ad022..68e1599d44 100644 --- a/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java +++ b/test-support/src/main/java/org/apache/brooklyn/test/performance/PerformanceTestDescriptor.java @@ -18,12 +18,11 @@ */ package org.apache.brooklyn.test.performance; +import static com.google.common.base.Preconditions.checkArgument; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; -import static com.google.common.base.Preconditions.checkArgument; - import java.io.File; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -33,7 +32,7 @@ import org.apache.commons.io.FileUtils; import com.google.common.annotations.Beta; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; /** * For building up a description of what to measure. @@ -57,6 +56,7 @@ public class PerformanceTestDescriptor { public Duration duration; public Integer iterations; public int numConcurrentJobs = 1; + public Duration abortIfIterationLongerThan; public Runnable job; public Runnable preJob; public Runnable postJob; @@ -129,6 +129,11 @@ public PerformanceTestDescriptor numConcurrentJobs(int val) { this.numConcurrentJobs = val; return this; } + public PerformanceTestDescriptor abortIfIterationLongerThan(Duration val) { + if (sealed) throw new IllegalStateException("Should not modify after sealed (e.g. after use)"); + this.abortIfIterationLongerThan = val; return this; + } + /** * The job to be repeatedly executed. */ @@ -249,7 +254,7 @@ public void seal() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .omitNullValues() .add("summary", summary) .add("duration", duration) @@ -259,6 +264,8 @@ public String toString() { .add("job", job) .add("completionLatch", completionLatch) .add("minAcceptablePerSecond", minAcceptablePerSecond) + .add("abortIfIterationLongerThan", abortIfIterationLongerThan) + .add("numConcurrentJobs", numConcurrentJobs) .toString(); } } \ No newline at end of file From dc74f72886d652a672fa35ee2dd6be973b18c549 Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Tue, 6 Jun 2017 11:22:20 +0100 Subject: [PATCH 6/6] Add scalability test for many entities --- .../ScalabilityPerformanceTest.java | 262 ++++++++++++++++++ 1 file changed, 262 insertions(+) create mode 100644 software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/qa/performance/ScalabilityPerformanceTest.java diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/qa/performance/ScalabilityPerformanceTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/qa/performance/ScalabilityPerformanceTest.java new file mode 100644 index 0000000000..a3b064e7b7 --- /dev/null +++ b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/qa/performance/ScalabilityPerformanceTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.software.base.test.qa.performance; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.brooklyn.api.entity.Application; +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.internal.BrooklynProperties; +import org.apache.brooklyn.core.mgmt.internal.BrooklynGarbageCollector; +import org.apache.brooklyn.core.test.entity.TestApplication; +import org.apache.brooklyn.core.test.qa.performance.AbstractPerformanceTest; +import org.apache.brooklyn.entity.group.DynamicCluster; +import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcess; +import org.apache.brooklyn.entity.stock.BasicStartable; +import org.apache.brooklyn.location.byon.FixedListMachineProvisioningLocation; +import org.apache.brooklyn.location.ssh.SshMachineLocation; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.test.performance.PerformanceTestDescriptor; +import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Atomics; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * These tests are work-in-progress - they are currently more useful for investigating + * performance limits (and the behaviour at that limit) than for regression/automated tests. + * + * For example: + *
    + *
  1. Tweak the {@link #NUM_ITERATIONS} and {@link #NUM_CONCURRENT_JOBS} values + *
  2. Set {@code -Xms} and {@code -Xmx} + *
  3. Run the desired test method + *
  4. Examine the logs (as the test runs, if you want), + * e.g. {@code grep -E "iteration=|CPU fraction|brooklyn gc .after" brooklyn.debug.log} + *
  5. Examine things like the thread and memory usage, + * e.g. {@code TEST_PID=ps aux | grep [t]estng | awk '{print $2}'; jmap -histo:live ${JAVA_PID}; jstack-active ${JAVA_PID}} + *
+ * + * Over time, we should establish a base-line for scalability and performance at scale, and use + * these for regression testing. + */ +public class ScalabilityPerformanceTest extends AbstractPerformanceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ScalabilityPerformanceTest.class); + + // Adds up to 2000 apps in each test (200*10) + private static final int NUM_ITERATIONS = 200; + private static final int NUM_CONCURRENT_JOBS = 10; + + ListeningExecutorService executor; + + @BeforeMethod(alwaysRun=true) + @Override + public void setUp() throws Exception { + super.setUp(); + executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + } + + @AfterMethod(alwaysRun=true, timeOut=Asserts.THIRTY_SECONDS_TIMEOUT_MS) + @Override + public void tearDown() throws Exception { + if (executor != null) executor.shutdownNow(); + super.tearDown(); + } + + @Override + protected BrooklynProperties getBrooklynProperties() { + BrooklynProperties result = super.getBrooklynProperties(); + result.put(BrooklynGarbageCollector.GC_PERIOD, Duration.FIVE_SECONDS); + return result; + } + + @Test(groups={"Integration", "Acceptance"}) + public void testManyEmptyApps() { + int numIterations = NUM_ITERATIONS; + double minRatePerSec = 2 * PERFORMANCE_EXPECTATION; + final AtomicInteger counter = new AtomicInteger(); + + app.start(ImmutableList.of(loc)); + + measure(PerformanceTestDescriptor.create() + .summary("ScalabilityPerformanceTest.testManyEmptyApps") + .iterations(numIterations) + .minAcceptablePerSecond(minRatePerSec) + .numConcurrentJobs(NUM_CONCURRENT_JOBS) + .abortIfIterationLongerThan(Duration.seconds(5)) + .postWarmup(new Runnable() { + @Override + public void run() { + destroyApps(mgmt.getApplications()); + }}) + .job(new Runnable() { + @Override + public void run() { + newEmptyApp(counter.incrementAndGet()); + }})); + } + + @Test(groups={"Integration", "Acceptance"}) + public void testManyBasicClusterApps() { + int numIterations = NUM_ITERATIONS; + double minRatePerSec = 1 * PERFORMANCE_EXPECTATION; + final AtomicInteger counter = new AtomicInteger(); + + app.start(ImmutableList.of(loc)); + + measure(PerformanceTestDescriptor.create() + .summary("ScalabilityPerformanceTest.testManyBasicClusterApps") + .iterations(numIterations) + .minAcceptablePerSecond(minRatePerSec) + .numConcurrentJobs(NUM_CONCURRENT_JOBS) + .abortIfIterationLongerThan(Duration.seconds(5)) + .postWarmup(new Runnable() { + @Override + public void run() { + destroyApps(mgmt.getApplications()); + }}) + .job(new Runnable() { + @Override + public void run() { + newClusterApp(counter.incrementAndGet()); + }})); + } + + @Test(groups={"Integration", "Acceptance"}) + public void testManySshApps() { + int numIterations = NUM_ITERATIONS; + double minRatePerSec = 1 * PERFORMANCE_EXPECTATION; + final AtomicInteger counter = new AtomicInteger(); + + app.start(ImmutableList.of(loc)); + + measure(PerformanceTestDescriptor.create() + .summary("ScalabilityPerformanceTest.testManySshApps") + .iterations(numIterations) + .minAcceptablePerSecond(minRatePerSec) + .numConcurrentJobs(NUM_CONCURRENT_JOBS) + .abortIfIterationLongerThan(Duration.seconds(5)) + .postWarmup(new Runnable() { + @Override + public void run() { + destroyApps(mgmt.getApplications()); + }}) + .job(new Runnable() { + @Override + public void run() { + newVanillaSoftwareProcessApp(counter.incrementAndGet()); + }})); + } + + private TestApplication newEmptyApp(int suffix) { + TestApplication app = mgmt.getEntityManager().createEntity(EntitySpec.create(EntitySpec.create(TestApplication.class) + .displayName("app-"+suffix))); + app.start(ImmutableList.of(app.newLocalhostProvisioningLocation())); + return app; + } + + private TestApplication newClusterApp(int suffix) { + TestApplication app = mgmt.getEntityManager().createEntity(EntitySpec.create(EntitySpec.create(TestApplication.class) + .displayName("app-"+suffix) + .child(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.INITIAL_SIZE, 1) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(BasicStartable.class))))); + app.start(ImmutableList.of(app.newLocalhostProvisioningLocation())); + return app; + } + + private TestApplication newVanillaSoftwareProcessApp(int suffix) { + Location loc = mgmt.getLocationManager().createLocation(LocationSpec.create(FixedListMachineProvisioningLocation.class) + .configure(FixedListMachineProvisioningLocation.MACHINE_SPECS, ImmutableList.of( + LocationSpec.create(SshMachineLocation.class) + .configure("address", "1.2.3.4") + .configure("sshToolClass", RecordingSshTool.class.getName())))); + + TestApplication app = mgmt.getEntityManager().createEntity(EntitySpec.create(EntitySpec.create(TestApplication.class) + .displayName("app-"+suffix) + .child(EntitySpec.create(VanillaSoftwareProcess.class) + .configure(VanillaSoftwareProcess.INSTALL_COMMAND, "myInstall") + .configure(VanillaSoftwareProcess.LAUNCH_COMMAND, "myLaunch") + .configure(VanillaSoftwareProcess.CHECK_RUNNING_COMMAND, "myCheckRunning") + .configure(VanillaSoftwareProcess.STOP_COMMAND, "myStop") + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(BasicStartable.class))))); + app.start(ImmutableList.of(loc)); + return app; + } + + // TODO duplicates part of Entities.destroyAll(ManagementContext). + // But we want to just destroy the apps rather than the management context. + // This is useful after warm-up (before the main test) so we don't have any extra apps around. + private void destroyApps(Iterable apps) { + final int MAX_THREADS = 100; + + ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS)); + List> futures = Lists.newArrayList(); + final AtomicReference error = Atomics.newReference(); + try { + for (final Application app: apps) { + futures.add(executor.submit(new Runnable() { + @Override + public void run() { + ManagementContext mgmt = app.getManagementContext(); + LOG.debug("destroying app "+app+" (managed? "+Entities.isManaged(app)+"; mgmt is "+mgmt+")"); + try { + Entities.destroy(app); + LOG.debug("destroyed app "+app+"; mgmt now "+mgmt); + } catch (Exception e) { + LOG.warn("problems destroying app "+app+" (mgmt now "+mgmt+", will rethrow at least one exception): "+e); + error.compareAndSet(null, e); + } + }})); + } + Futures.allAsList(futures).get(); + + if (error.get() != null) throw Exceptions.propagate(error.get()); + } catch (Exception e) { + if (!mgmt.isRunning()) { + LOG.debug("Destroying apps gave an error, but mgmt context was concurrently stopped so not really a problem; swallowing (unless fatal): "+e); + Exceptions.propagateIfFatal(e); + } else { + throw Exceptions.propagate(e); + } + } finally { + executor.shutdownNow(); + } + } +}