Skip to content

Commit

Permalink
This closes #1971
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Mar 22, 2018
2 parents 7a36e84 + a9d84a7 commit d8f22a3
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 3 deletions.
Expand Up @@ -58,6 +58,10 @@ public interface QueueBindingInfo {

void setExclusive(boolean exclusive);

boolean isLastValue();

void setLastValue(boolean lastValue);

byte getRoutingType();

void setRoutingType(byte routingType);
Expand Down
Expand Up @@ -1275,7 +1275,7 @@ private void internalQueueBinding(boolean update, final long tx, final Binding b

SimpleString filterString = filter == null ? null : filter.getFilterString();

PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.getRoutingType().getType());
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getRoutingType().getType());

readLock();
try {
Expand Down
Expand Up @@ -48,6 +48,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin

public boolean exclusive;

public boolean lastValue;

public byte routingType;

public PersistentQueueBindingEncoding() {
Expand All @@ -70,8 +72,10 @@ public String toString() {
maxConsumers +
", purgeOnNoConsumers=" +
purgeOnNoConsumers +
", exclusive=" +
exclusive +
", exclusive=" +
exclusive +
", lastValue=" +
lastValue +
", routingType=" +
routingType +
"]";
Expand All @@ -85,6 +89,7 @@ public PersistentQueueBindingEncoding(final SimpleString name,
final int maxConsumers,
final boolean purgeOnNoConsumers,
final boolean exclusive,
final boolean lastValue,
final byte routingType) {
this.name = name;
this.address = address;
Expand All @@ -94,6 +99,7 @@ public PersistentQueueBindingEncoding(final SimpleString name,
this.maxConsumers = maxConsumers;
this.purgeOnNoConsumers = purgeOnNoConsumers;
this.exclusive = exclusive;
this.lastValue = lastValue;
this.routingType = routingType;
}

Expand Down Expand Up @@ -179,6 +185,16 @@ public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}

@Override
public boolean isLastValue() {
return lastValue;
}

@Override
public void setLastValue(boolean lastValue) {
this.lastValue = lastValue;
}

@Override
public byte getRoutingType() {
return routingType;
Expand Down Expand Up @@ -225,6 +241,11 @@ public void decode(final ActiveMQBuffer buffer) {
} else {
exclusive = ActiveMQDefaultConfiguration.getDefaultExclusive();
}
if (buffer.readableBytes() > 0) {
lastValue = buffer.readBoolean();
} else {
lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue();
}
}

@Override
Expand All @@ -238,6 +259,7 @@ public void encode(final ActiveMQBuffer buffer) {
buffer.writeBoolean(purgeOnNoConsumers);
buffer.writeByte(routingType);
buffer.writeBoolean(exclusive);
buffer.writeBoolean(lastValue);
}

@Override
Expand All @@ -248,6 +270,7 @@ public int getEncodeSize() {
DataConstants.SIZE_INT +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BYTE +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN;
}

Expand Down
Expand Up @@ -152,6 +152,7 @@ public void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap,
.purgeOnNoConsumers(queueBindingInfo.isPurgeOnNoConsumers())
.maxConsumers(queueBindingInfo.getMaxConsumers())
.exclusive(queueBindingInfo.isExclusive())
.lastValue(queueBindingInfo.isLastValue())
.routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
Expand Down
Expand Up @@ -61,6 +61,51 @@ public void testQueueConfigPurgeOnNoConsumerAndRestart() throws Exception {
Assert.assertTrue(queueBinding2.getQueue().isPurgeOnNoConsumers());
}

@Test
public void testQueueConfigLastValueAndRestart() throws Exception {
ActiveMQServer server = createServer(true);

server.start();

SimpleString address = new SimpleString("test.address");
SimpleString queue = new SimpleString("test.queue");

server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, false, true, true);

QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding1.getQueue().isLastValue());

server.stop();

server.start();

QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding2.getQueue().isLastValue());
}

@Test
public void testQueueConfigExclusiveAndRestart() throws Exception {
ActiveMQServer server = createServer(true);

server.start();

SimpleString address = new SimpleString("test.address");
SimpleString queue = new SimpleString("test.queue");

server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, true, true);

QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding1.getQueue().isExclusive());

server.stop();

server.start();

QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding2.getQueue().isExclusive());
}


@Test
public void testQueueConfigUserAndRestart() throws Exception {
ActiveMQServer server = createServer(true);
Expand Down

0 comments on commit d8f22a3

Please sign in to comment.