Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,8 @@ public static void destroyCatching(Location loc) {
* Apps will be stopped+destroyed+unmanaged concurrently, waiting for all to complete.
*/
public static void destroyAll(final ManagementContext mgmt) {
final int MAX_THREADS = 100;

if (mgmt instanceof NonDeploymentManagementContext) {
// log here because it is easy for tests to destroyAll(app.getMgmtContext())
// which will *not* destroy the mgmt context if the app has been stopped!
Expand All @@ -889,7 +891,7 @@ public static void destroyAll(final ManagementContext mgmt) {
}
if (!mgmt.isRunning()) return;

ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS));
List<ListenableFuture<?>> futures = Lists.newArrayList();
final AtomicReference<Exception> error = Atomics.newReference();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
import org.apache.brooklyn.core.internal.BrooklynProperties;
import org.apache.brooklyn.core.location.SimulatedLocation;
import org.apache.brooklyn.core.test.entity.TestApplication;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.test.performance.PerformanceMeasurer;
import org.apache.brooklyn.test.performance.PerformanceTestDescriptor;
import org.apache.brooklyn.test.performance.PerformanceTestResult;
import org.apache.brooklyn.util.internal.DoubleSystemProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

import com.google.common.base.Stopwatch;
Expand All @@ -50,7 +47,7 @@
* We are also not running the tests for long enough to check if object creation is going to kill
* performance in the long-term, etc.
*/
public class AbstractPerformanceTest {
public class AbstractPerformanceTest extends BrooklynAppUnitTestSupport {

private static final Logger LOG = LoggerFactory.getLogger(AbstractPerformanceTest.class);

Expand All @@ -68,21 +65,30 @@ public class AbstractPerformanceTest {

protected static final long TIMEOUT_MS = 10*1000;

protected TestApplication app;
protected SimulatedLocation loc;
protected ManagementContext mgmt;

@BeforeMethod(alwaysRun=true)
@Override
public void setUp() throws Exception {
super.setUp();
for (int i = 0; i < 5; i++) System.gc();
loc = new SimulatedLocation();
app = ApplicationBuilder.newManagedApp(TestApplication.class);
mgmt = app.getManagementContext();
loc = app.newSimulatedLocation();
}

@Override
protected BrooklynProperties getBrooklynProperties() {
if (useLiveManagementContext()) {
return BrooklynProperties.Factory.newDefault();
} else {
return BrooklynProperties.Factory.newEmpty();
}
}

@AfterMethod(alwaysRun=true)
public void tearDown() throws Exception {
if (app != null) Entities.destroyAll(app.getManagementContext());
/**
* For overriding; controls whether to load ~/.brooklyn/brooklyn.properties
*/
protected boolean useLiveManagementContext() {
return false;
}

protected PerformanceTestResult measure(PerformanceTestDescriptor options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class BlobStorePersistencePerformanceTest extends AbstractPerformanceTest
JcloudsBlobStoreBasedObjectStore objectStore;
StoreObjectAccessor blobstoreAccessor;

protected boolean useLiveManagementContext() {
return true;
}

@BeforeMethod(alwaysRun=true)
@Override
public void setUp() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.software.base.test.qa.performance;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.brooklyn.api.entity.Application;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.location.LocationSpec;
import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.internal.BrooklynProperties;
import org.apache.brooklyn.core.mgmt.internal.BrooklynGarbageCollector;
import org.apache.brooklyn.core.test.entity.TestApplication;
import org.apache.brooklyn.core.test.qa.performance.AbstractPerformanceTest;
import org.apache.brooklyn.entity.group.DynamicCluster;
import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcess;
import org.apache.brooklyn.entity.stock.BasicStartable;
import org.apache.brooklyn.location.byon.FixedListMachineProvisioningLocation;
import org.apache.brooklyn.location.ssh.SshMachineLocation;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.test.performance.PerformanceTestDescriptor;
import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Atomics;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

/**
* These tests are work-in-progress - they are currently more useful for investigating
* performance limits (and the behaviour at that limit) than for regression/automated tests.
*
* For example:
* <ol>
* <li>Tweak the {@link #NUM_ITERATIONS} and {@link #NUM_CONCURRENT_JOBS} values
* <li>Set {@code -Xms} and {@code -Xmx}
* <li>Run the desired test method
* <li>Examine the logs (as the test runs, if you want),
* e.g. {@code grep -E "iteration=|CPU fraction|brooklyn gc .after" brooklyn.debug.log}
* <li>Examine things like the thread and memory usage,
* e.g. {@code TEST_PID=ps aux | grep [t]estng | awk '{print $2}'; jmap -histo:live ${JAVA_PID}; jstack-active ${JAVA_PID}}
* </ol>
*
* Over time, we should establish a base-line for scalability and performance at scale, and use
* these for regression testing.
*/
public class ScalabilityPerformanceTest extends AbstractPerformanceTest {

private static final Logger LOG = LoggerFactory.getLogger(ScalabilityPerformanceTest.class);

// Adds up to 2000 apps in each test (200*10)
private static final int NUM_ITERATIONS = 200;
private static final int NUM_CONCURRENT_JOBS = 10;

ListeningExecutorService executor;

@BeforeMethod(alwaysRun=true)
@Override
public void setUp() throws Exception {
super.setUp();
executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
}

@AfterMethod(alwaysRun=true, timeOut=Asserts.THIRTY_SECONDS_TIMEOUT_MS)
@Override
public void tearDown() throws Exception {
if (executor != null) executor.shutdownNow();
super.tearDown();
}

@Override
protected BrooklynProperties getBrooklynProperties() {
BrooklynProperties result = super.getBrooklynProperties();
result.put(BrooklynGarbageCollector.GC_PERIOD, Duration.FIVE_SECONDS);
return result;
}

@Test(groups={"Integration", "Acceptance"})
public void testManyEmptyApps() {
int numIterations = NUM_ITERATIONS;
double minRatePerSec = 2 * PERFORMANCE_EXPECTATION;
final AtomicInteger counter = new AtomicInteger();

app.start(ImmutableList.of(loc));

measure(PerformanceTestDescriptor.create()
.summary("ScalabilityPerformanceTest.testManyEmptyApps")
.iterations(numIterations)
.minAcceptablePerSecond(minRatePerSec)
.numConcurrentJobs(NUM_CONCURRENT_JOBS)
.abortIfIterationLongerThan(Duration.seconds(5))
.postWarmup(new Runnable() {
@Override
public void run() {
destroyApps(mgmt.getApplications());
}})
.job(new Runnable() {
@Override
public void run() {
newEmptyApp(counter.incrementAndGet());
}}));
}

@Test(groups={"Integration", "Acceptance"})
public void testManyBasicClusterApps() {
int numIterations = NUM_ITERATIONS;
double minRatePerSec = 1 * PERFORMANCE_EXPECTATION;
final AtomicInteger counter = new AtomicInteger();

app.start(ImmutableList.of(loc));

measure(PerformanceTestDescriptor.create()
.summary("ScalabilityPerformanceTest.testManyBasicClusterApps")
.iterations(numIterations)
.minAcceptablePerSecond(minRatePerSec)
.numConcurrentJobs(NUM_CONCURRENT_JOBS)
.abortIfIterationLongerThan(Duration.seconds(5))
.postWarmup(new Runnable() {
@Override
public void run() {
destroyApps(mgmt.getApplications());
}})
.job(new Runnable() {
@Override
public void run() {
newClusterApp(counter.incrementAndGet());
}}));
}

@Test(groups={"Integration", "Acceptance"})
public void testManySshApps() {
int numIterations = NUM_ITERATIONS;
double minRatePerSec = 1 * PERFORMANCE_EXPECTATION;
final AtomicInteger counter = new AtomicInteger();

app.start(ImmutableList.of(loc));

measure(PerformanceTestDescriptor.create()
.summary("ScalabilityPerformanceTest.testManySshApps")
.iterations(numIterations)
.minAcceptablePerSecond(minRatePerSec)
.numConcurrentJobs(NUM_CONCURRENT_JOBS)
.abortIfIterationLongerThan(Duration.seconds(5))
.postWarmup(new Runnable() {
@Override
public void run() {
destroyApps(mgmt.getApplications());
}})
.job(new Runnable() {
@Override
public void run() {
newVanillaSoftwareProcessApp(counter.incrementAndGet());
}}));
}

private TestApplication newEmptyApp(int suffix) {
TestApplication app = mgmt.getEntityManager().createEntity(EntitySpec.create(EntitySpec.create(TestApplication.class)
.displayName("app-"+suffix)));
app.start(ImmutableList.of(app.newLocalhostProvisioningLocation()));
return app;
}

private TestApplication newClusterApp(int suffix) {
TestApplication app = mgmt.getEntityManager().createEntity(EntitySpec.create(EntitySpec.create(TestApplication.class)
.displayName("app-"+suffix)
.child(EntitySpec.create(DynamicCluster.class)
.configure(DynamicCluster.INITIAL_SIZE, 1)
.configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(BasicStartable.class)))));
app.start(ImmutableList.of(app.newLocalhostProvisioningLocation()));
return app;
}

private TestApplication newVanillaSoftwareProcessApp(int suffix) {
Location loc = mgmt.getLocationManager().createLocation(LocationSpec.create(FixedListMachineProvisioningLocation.class)
.configure(FixedListMachineProvisioningLocation.MACHINE_SPECS, ImmutableList.of(
LocationSpec.create(SshMachineLocation.class)
.configure("address", "1.2.3.4")
.configure("sshToolClass", RecordingSshTool.class.getName()))));

TestApplication app = mgmt.getEntityManager().createEntity(EntitySpec.create(EntitySpec.create(TestApplication.class)
.displayName("app-"+suffix)
.child(EntitySpec.create(VanillaSoftwareProcess.class)
.configure(VanillaSoftwareProcess.INSTALL_COMMAND, "myInstall")
.configure(VanillaSoftwareProcess.LAUNCH_COMMAND, "myLaunch")
.configure(VanillaSoftwareProcess.CHECK_RUNNING_COMMAND, "myCheckRunning")
.configure(VanillaSoftwareProcess.STOP_COMMAND, "myStop")
.configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(BasicStartable.class)))));
app.start(ImmutableList.of(loc));
return app;
}

// TODO duplicates part of Entities.destroyAll(ManagementContext).
// But we want to just destroy the apps rather than the management context.
// This is useful after warm-up (before the main test) so we don't have any extra apps around.
private void destroyApps(Iterable<? extends Application> apps) {
final int MAX_THREADS = 100;

ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS));
List<ListenableFuture<?>> futures = Lists.newArrayList();
final AtomicReference<Exception> error = Atomics.newReference();
try {
for (final Application app: apps) {
futures.add(executor.submit(new Runnable() {
@Override
public void run() {
ManagementContext mgmt = app.getManagementContext();
LOG.debug("destroying app "+app+" (managed? "+Entities.isManaged(app)+"; mgmt is "+mgmt+")");
try {
Entities.destroy(app);
LOG.debug("destroyed app "+app+"; mgmt now "+mgmt);
} catch (Exception e) {
LOG.warn("problems destroying app "+app+" (mgmt now "+mgmt+", will rethrow at least one exception): "+e);
error.compareAndSet(null, e);
}
}}));
}
Futures.allAsList(futures).get();

if (error.get() != null) throw Exceptions.propagate(error.get());
} catch (Exception e) {
if (!mgmt.isRunning()) {
LOG.debug("Destroying apps gave an error, but mgmt context was concurrently stopped so not really a problem; swallowing (unless fatal): "+e);
Exceptions.propagateIfFatal(e);
} else {
throw Exceptions.propagate(e);
}
} finally {
executor.shutdownNow();
}
}
}
Loading