Skip to content

Commit

Permalink
This closes #1596
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Oct 18, 2017
2 parents 02af1f4 + fdcae9d commit b7125d5
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 17 deletions.
Expand Up @@ -90,7 +90,7 @@ public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorServ
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
this(scheduledExecutorService, executor, checkPeriod, checkPeriod, timeUnit, onDemand);
this(scheduledExecutorService, executor, -1, checkPeriod, timeUnit, onDemand);
}

/**
Expand Down Expand Up @@ -144,7 +144,7 @@ public synchronized void start() {
this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);

if (period >= 0) {
future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay, period, timeUnit);
future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay >= 0 ? initialDelay : period, period, timeUnit);
} else {
logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
}
Expand Down
Expand Up @@ -78,6 +78,25 @@ public void run() {
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
}

@Test
public void testVerifyInitialDelayChanged() {
final long initialDelay = 10;
final long period = 100;
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, initialDelay, period, TimeUnit.MILLISECONDS, false) {
@Override
public void run() {

}
};
local.start();
final long newInitialDelay = 1000;
//the parameters are valid?
assert initialDelay != newInitialDelay && newInitialDelay != period;
local.setInitialDelay(newInitialDelay);
local.stop();
Assert.assertEquals("the initial dalay can't change", newInitialDelay, local.getInitialDelay());
}

@Test
public void testAccumulationOwnPool() throws Exception {
final AtomicInteger count = new AtomicInteger(0);
Expand Down Expand Up @@ -187,15 +206,4 @@ public void run() {
local.stop();
}
}

@Test
public void testVerifyDefaultInitialDelay() throws InterruptedException {
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, false) {
@Override
public void run() {

}
};
Assert.assertEquals("The initial delay must be defaulted to the period", local.getPeriod(), local.getInitialDelay());
}
}
Expand Up @@ -244,6 +244,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {

private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);

private volatile boolean caused = false;

private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());

private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
Expand Down Expand Up @@ -593,7 +595,6 @@ public void addHead(final MessageReference ref, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
flushDeliveriesInTransit();
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
return;
}
Expand All @@ -613,7 +614,6 @@ public void addHead(final List<MessageReference> refs, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
flushDeliveriesInTransit();
for (MessageReference ref : refs) {
addHead(ref, scheduling);
}
Expand Down Expand Up @@ -717,6 +717,10 @@ protected boolean scheduleIfPossible(MessageReference ref) {
private boolean flushDeliveriesInTransit() {
try {

if (!deliveriesInTransit.await(100, TimeUnit.MILLISECONDS)) {
caused = true;
System.err.println("There are currently " + deliveriesInTransit.getCount() + " credits");
}
if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) {
return true;
} else {
Expand Down Expand Up @@ -835,8 +839,6 @@ public void addConsumer(final Consumer consumer) throws Exception {
throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
}

flushDeliveriesInTransit();

consumersChanged = true;

if (!consumer.supportsDirectDelivery()) {
Expand Down
@@ -0,0 +1,140 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.apache.activemq.artemis.tests.integration.amqp;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class ExtremeCancelsTest extends JMSClientTestSupport {

private SimpleString anycastAddress = new SimpleString("theQueue");


@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}

private boolean isAMQP;

public ExtremeCancelsTest(boolean isAMQP) {
this.isAMQP = isAMQP;
}


@Parameterized.Parameters(name = "{index}: isAMQP={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}


@Test(timeout = 120000)
public void testLotsOfCloseOpenConsumer() throws Exception {

server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastAddress, null, true, false);

AtomicInteger errors = new AtomicInteger(0);
AtomicBoolean runnning = new AtomicBoolean(true);
Runnable runnable = new Runnable() {
@Override
public void run() {

try {
ConnectionFactory factory = createCF();

Connection connection = factory.createConnection();
Session session = connection.createSession();
connection.start();
Queue queue = session.createQueue(anycastAddress.toString());

while (runnning.get()) {
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage)consumer.receive(100);
if (message != null) {
consumer.close();
}
}


connection.close();

} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};

Thread[] consumers = new Thread[10];

for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Thread(runnable);
consumers[i].start();
}

ConnectionFactory factory = createCF();

Connection connection = factory.createConnection();
Session session = connection.createSession();
Queue queue = session.createQueue(anycastAddress.toString());
MessageProducer producer = session.createProducer(queue);


final int NUMBER_OF_MESSAGES = 500;


for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage("Hello guys " + i));
}

runnning.set(false);


for (Thread c : consumers) {
c.join();
}

Assert.assertEquals(0, errors.get());
}

private ConnectionFactory createCF() {
if (isAMQP) {
return new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
} else {
return new ActiveMQConnectionFactory("tcp://localhost:5672");
}
}

}

0 comments on commit b7125d5

Please sign in to comment.