Skip to content
Permalink
Browse files
This closes #1171
  • Loading branch information
ahgittin committed Jan 25, 2016
2 parents 80f7f62 + e77f62a commit ac0636cdfa3a8f21fcc2cbaed3fc82c216dff258
Showing 7 changed files with 71 additions and 23 deletions.
@@ -67,7 +67,6 @@
import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ExecutionList;
@@ -565,7 +564,15 @@ public boolean cancel(TaskCancellationMode mode) {
int subtasksReallyCancelled=0;

if (task instanceof HasTaskChildren) {
for (Task<?> child: ((HasTaskChildren)task).getChildren()) {
// cancel tasks in reverse order --
// it should be the case that if child1 is cancelled,
// a parentTask should NOT call a subsequent child2,
// but just in case, we cancel child2 first
// NB: DST and others may apply their own recursive cancel behaviour
MutableList<Task<?>> childrenReversed = MutableList.copyOf( ((HasTaskChildren)task).getChildren() );
Collections.reverse(childrenReversed);

for (Task<?> child: childrenReversed) {
if (log.isTraceEnabled()) {
log.trace("Cancelling "+child+" on recursive cancellation of "+task);
}
@@ -298,7 +298,7 @@ public final synchronized boolean cancel(boolean mayInterruptIfRunning) {
public synchronized boolean cancel(TaskCancellationMode mode) {
if (isDone()) return false;
if (log.isTraceEnabled()) {
log.trace("BT cancelling "+this+" mode "+mode);
log.trace("BT cancelling "+this+" mode "+mode+", from thread "+Thread.currentThread());
}
cancelled = true;
doCancel(mode);
@@ -139,6 +139,7 @@ public String call() {
Thread.sleep(duration.toMillisecondsRoundingUp());
}
} catch (InterruptedException e) {
log.info("releasing semaphore on interruption after saying "+message);
cancellations.release();
throw Exceptions.propagate(e);
}
@@ -159,7 +160,8 @@ public Task<String> sayTask(String message) {
}

public Task<String> sayTask(String message, Duration duration, String message2) {
return Tasks.<String>builder().displayName("say:"+message).body(sayCallable(message, duration, message2)).build();
return Tasks.<String>builder().displayName("say:"+message+(duration!=null ? ":wait("+duration+")" : "")+(message2!=null ? ":"+message2 : ""))
.body(sayCallable(message, duration, message2)).build();
}

public <T> Task<T> submitting(final Task<T> task) {
@@ -193,33 +195,52 @@ public void testCancelled() throws Exception {
sayTask("2a", Duration.THIRTY_SECONDS, "2b"),
sayTask("3"));
ec.submit(t);


// wait for 2 to start, saying "2a", and the first interruptible block is when it waits for its 30s
waitForMessages(Predicates.compose(MathPredicates.greaterThanOrEqual(2), CollectionFunctionals.sizeFunction()), TIMEOUT);
Assert.assertEquals(messages, Arrays.asList("1", "2a"));
Time.sleep(Duration.millis(50));

// now cancel, and make sure we get the right behaviour
t.cancel(true);
Assert.assertTrue(t.isDone());
// 2 should get cancelled, and invoke the cancellation semaphore
// 2 should get cancelled, and invoke the cancellation semaphore, but not say 2b
// 3 should get cancelled and not run at all
Assert.assertEquals(messages, Arrays.asList("1", "2a"));

// Need to ensure that 2 has been started; race where we might cancel it before its run method
// is even begun. Hence doing "2a; pause; 2b" where nothing is interruptable before pause.
// cancel(..) currently cancels everything in the tree in the calling thread
// so we could even assert task3.isCancelled() now
// but not sure we will guarantee that for subtasks, so weaker assertion
// that it is eventually cancelled, and that it for sure never starts

// message list is still 1, 2a
Assert.assertEquals(messages, Arrays.asList("1", "2a"));
// And 2 when cancelled should release the semaphore
log.info("testCancelled waiting on semaphore; permits left is "+cancellations.availablePermits());
Assert.assertTrue(cancellations.tryAcquire(10, TimeUnit.SECONDS));
log.info("testCancelled acquired semaphore; permits left is "+cancellations.availablePermits());

Iterator<Task<?>> ci = ((HasTaskChildren)t).getChildren().iterator();
// 1 completed fine
Assert.assertEquals(ci.next().get(), "1");
// 2 is cancelled -- cancelled flag should always be set *before* the interrupt sent
// (and that released the semaphore above, even if thread is not completed, so this should be set)
Task<?> task2 = ci.next();
Assert.assertTrue(task2.isBegun());
Assert.assertTrue(task2.isDone());
Assert.assertTrue(task2.isCancelled());

Task<?> task3 = ci.next();
// 3 is being cancelled in the thread which cancelled 2, and should either be
// *before* 2 was cancelled or *not run* because the parent was cancelled
// so we shouldn't need to wait ... but if we did:
// Asserts.eventually(Suppliers.ofInstance(task3), TaskPredicates.isDone());
Assert.assertTrue(task3.isDone());
Assert.assertTrue(task3.isCancelled());
Assert.assertFalse(task3.isBegun());
Assert.assertTrue(task2.isDone());
Assert.assertTrue(task2.isCancelled());

// but we do _not_ get a mutex from task3 as it does not run (is not interrupted)
// messages unchanged
Assert.assertEquals(messages, Arrays.asList("1", "2a"));
// no further mutexes should be available (ie 3 should not run)
// TODO for some reason this was observed to fail on the jenkins box (2016-01)
// but i can't see why; have added logging in case it happens again
Assert.assertEquals(cancellations.availablePermits(), 0);
}

@@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Random;

import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanNotificationInfo;
@@ -47,6 +46,7 @@
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.entity.java.UsesJmx;
import org.apache.brooklyn.feed.jmx.JmxHelper;
import org.apache.brooklyn.test.NetworkingTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -68,8 +68,10 @@ public class JmxService {
private String url;

public JmxService() throws Exception {
this("localhost", 28000 + (int)Math.floor(new Random().nextDouble() * 1000));
logger.warn("use of deprecated default host and port in JmxService");
this("localhost", NetworkingTestUtils.randomPortAround(28000));

// TODO why this message if the constructor is not actually deprecated, and it seems useful?
//logger.warn("use of deprecated default host and port in JmxService");
}

/**
@@ -151,10 +153,12 @@ public GeneralisedDynamicMBean registerMBean(String name) throws InstanceAlready
* @throws MBeanRegistrationException
* @throws InstanceAlreadyExistsException
*/
@SuppressWarnings({ "rawtypes" })
public GeneralisedDynamicMBean registerMBean(Map initialAttributes, String name) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException, MalformedObjectNameException, NullPointerException {
return registerMBean(initialAttributes, ImmutableMap.of(), name);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public GeneralisedDynamicMBean registerMBean(Map initialAttributes, Map operations, String name) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException, MalformedObjectNameException, NullPointerException {
GeneralisedDynamicMBean mbean = new GeneralisedDynamicMBean(initialAttributes, operations);
server.registerMBean(mbean, new ObjectName(name));
@@ -65,6 +65,7 @@
import org.apache.brooklyn.entity.software.base.test.jmx.JmxService;
import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.test.NetworkingTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -114,7 +115,13 @@ public static class TestEntityWithJmx extends TestEntityImpl {
@Override public void init() {
sensors().set(Attributes.HOSTNAME, "localhost");
sensors().set(UsesJmx.JMX_PORT,
LocalhostMachineProvisioningLocation.obtainPort(PortRanges.fromString("40123+")));
LocalhostMachineProvisioningLocation.obtainPort(PortRanges.fromString(
// just doing "40123+" was not enough to avoid collisions (on 40125),
// observed on jenkins, not sure why but
// maybe something else had a UDP connection we weren't detected,
// or the static lock our localhost uses was being bypassed;
// this should improve things (2016-01)
NetworkingTestUtils.randomPortAround(40000)+"+")));
// only supports no-agent, at the moment
config().set(UsesJmx.JMX_AGENT_MODE, JmxAgentModes.NONE);
sensors().set(UsesJmx.RMI_REGISTRY_PORT, -1); // -1 means to use the JMX_PORT only
@@ -22,13 +22,13 @@

import java.util.Map;

import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.net.Networking;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableMap;

public class NetworkingTestUtils {
@@ -65,4 +65,14 @@ public static void assertPortsAvailable(final Map<String, Integer> ports) {
assertTrue(Networking.isPortAvailable(entry.getValue()), errmsg);
}
}

/** Returns a port not in use somewhere around the seed;
* this is not a foolproof way to prevent collisions,
* but strikes a good balance of traceability (different callers will use different ranges)
* and collision avoidance, esp when combined with <code>Localhost...obtain(thisResult+"+");</code>.
*/
@Beta
public static int randomPortAround(int seed) {
return Networking.nextAvailablePort( seed + (int)Math.floor(Math.random() * 1000) );
}
}
@@ -18,6 +18,8 @@
*/
package org.apache.brooklyn.util.net;

import static com.google.common.base.Preconditions.checkArgument;

import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
@@ -26,7 +28,6 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Enumeration;
@@ -48,8 +49,6 @@
import com.google.common.net.HostAndPort;
import com.google.common.primitives.UnsignedBytes;

import static com.google.common.base.Preconditions.checkArgument;

public class Networking {

private static final Logger log = LoggerFactory.getLogger(Networking.class);

0 comments on commit ac0636c

Please sign in to comment.