Skip to content

Commit

Permalink
ARTEMIS-4396 make address/queue internal prop durable
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and gemmellr committed Aug 31, 2023
1 parent 424ed61 commit cd8a2e5
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,8 @@ public static String getDefaultHapolicyBackupStrategy() {

public static final boolean DEFAULT_ENABLED = true;

public static final boolean DEFAULT_INTERNAL = false;

public static final boolean DEFAULT_QUEUE_AUTO_DELETE = true;

public static final boolean DEFAULT_CREATED_QUEUE_AUTO_DELETE = false;
Expand Down Expand Up @@ -1573,6 +1575,10 @@ public static boolean getDefaultEnabled() {
return DEFAULT_ENABLED;
}

public static boolean getDefaultInternal() {
return DEFAULT_INTERNAL;
}

public static boolean getDefaultQueueAutoDelete(boolean autoCreated) {
return autoCreated ? getDefaultQueueAutoDelete() : getDefaultCreatedQueueAutoDelete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ public interface AddressBindingInfo {

SimpleString getName();

boolean getAutoCreated();
boolean isAutoCreated();

EnumSet<RoutingType> getRoutingTypes();

AddressStatusEncoding getAddressStatusEncoding();

boolean isInternal();
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,6 @@ public interface QueueBindingInfo {
long getAutoDeleteMessageCount();

long getRingSize();

boolean isInternal();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ private void internalQueueBinding(boolean update, final long tx, final Binding b

SimpleString filterString = filter == null ? null : filter.getFilterString();

PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize());
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize(), queue.isInternalQueue());

try (ArtemisCloseable lock = closeableReadLock()) {
if (update) {
Expand Down Expand Up @@ -1506,7 +1506,7 @@ public void deleteAddressStatus(long recordID) throws Exception {

@Override
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.isAutoCreated());
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.isAutoCreated(), addressInfo.isInternal());

try (ArtemisCloseable lock = closeableReadLock()) {
long recordID = idGenerator.generateID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.EnumSet;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
Expand All @@ -29,14 +30,17 @@

public class PersistentAddressBindingEncoding implements EncodingSupport, AddressBindingInfo {

public long id;
private long id;

public SimpleString name;
private SimpleString name;

public boolean autoCreated;
public AddressStatusEncoding addressStatusEncoding;
private boolean autoCreated;

public EnumSet<RoutingType> routingTypes;
private AddressStatusEncoding addressStatusEncoding;

private EnumSet<RoutingType> routingTypes;

private boolean internal;

public PersistentAddressBindingEncoding() {
routingTypes = EnumSet.noneOf(RoutingType.class);
Expand All @@ -54,19 +58,22 @@ public String toString() {
sb.deleteCharAt(sb.length() - 1);
}
sb.append("}");
sb.append(", autoCreated=" + autoCreated + "]");
sb.append(", autoCreated=" + autoCreated);
sb.append(", internal=" + internal + "]");
return sb.toString();
}

public PersistentAddressBindingEncoding(final SimpleString name,
final EnumSet<RoutingType> routingTypes,
final boolean autoCreated) {
final boolean autoCreated,
final boolean internal) {
checkNotNull(name);
checkNotNull(routingTypes);

this.name = name;
this.routingTypes = routingTypes;
this.autoCreated = autoCreated;
this.internal = internal;
}

@Override
Expand All @@ -84,7 +91,7 @@ public SimpleString getName() {
}

@Override
public boolean getAutoCreated() {
public boolean isAutoCreated() {
return autoCreated;
}

Expand All @@ -102,6 +109,11 @@ public void setAddressStatusEncoding(AddressStatusEncoding addressStatusEncoding
this.addressStatusEncoding = addressStatusEncoding;
}

@Override
public boolean isInternal() {
return internal;
}

@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
Expand All @@ -110,6 +122,12 @@ public void decode(final ActiveMQBuffer buffer) {
routingTypes.add(RoutingType.getType(buffer.readByte()));
}
autoCreated = buffer.readBoolean();

if (buffer.readableBytes() > 0) {
internal = buffer.readBoolean();
} else {
internal = ActiveMQDefaultConfiguration.getDefaultInternal();
}
}

@Override
Expand All @@ -120,13 +138,15 @@ public void encode(final ActiveMQBuffer buffer) {
buffer.writeByte(d.getType());
}
buffer.writeBoolean(autoCreated);
buffer.writeBoolean(internal);
}

@Override
public int getEncodeSize() {
return SimpleString.sizeofString(name) +
DataConstants.SIZE_INT +
(DataConstants.SIZE_BYTE * routingTypes.size()) +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,110 +28,90 @@

public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo {

public long id;
private long id;

public SimpleString name;
private SimpleString name;

public SimpleString address;
private SimpleString address;

public SimpleString filterString;
private SimpleString filterString;

public boolean autoCreated;
private boolean autoCreated;

public SimpleString user;
private SimpleString user;

public List<QueueStatusEncoding> queueStatusEncodings;
private List<QueueStatusEncoding> queueStatusEncodings;

public int maxConsumers;
private int maxConsumers;

public boolean purgeOnNoConsumers;
private boolean purgeOnNoConsumers;

public boolean enabled;
private boolean enabled;

public boolean exclusive;
private boolean exclusive;

public boolean lastValue;
private boolean lastValue;

public SimpleString lastValueKey;
private SimpleString lastValueKey;

public boolean nonDestructive;
private boolean nonDestructive;

public int consumersBeforeDispatch;
private int consumersBeforeDispatch;

public long delayBeforeDispatch;
private long delayBeforeDispatch;

public byte routingType;
private byte routingType;

public boolean configurationManaged;
private boolean configurationManaged;

public boolean groupRebalance;
private boolean groupRebalance;

public boolean groupRebalancePauseDispatch;
private boolean groupRebalancePauseDispatch;

public int groupBuckets;
private int groupBuckets;

public SimpleString groupFirstKey;
private SimpleString groupFirstKey;

public boolean autoDelete;
private boolean autoDelete;

public long autoDeleteDelay;
private long autoDeleteDelay;

public long autoDeleteMessageCount;
private long autoDeleteMessageCount;

public long ringSize;
private long ringSize;

private boolean internal;

public PersistentQueueBindingEncoding() {
}

@Override
public String toString() {
return "PersistentQueueBindingEncoding [id=" + id +
", name=" +
name +
", address=" +
address +
", filterString=" +
filterString +
", user=" +
user +
", autoCreated=" +
autoCreated +
", maxConsumers=" +
maxConsumers +
", purgeOnNoConsumers=" +
purgeOnNoConsumers +
", enabled=" +
enabled +
", exclusive=" +
exclusive +
", lastValue=" +
lastValue +
", lastValueKey=" +
lastValueKey +
", nonDestructive=" +
nonDestructive +
", consumersBeforeDispatch=" +
consumersBeforeDispatch +
", delayBeforeDispatch=" +
delayBeforeDispatch +
", routingType=" +
routingType +
", configurationManaged=" +
configurationManaged +
", groupRebalance=" +
groupRebalance +
", groupRebalancePauseDispatch=" +
groupRebalancePauseDispatch +
", groupBuckets=" +
groupBuckets +
", groupFirstKey=" +
groupFirstKey +
", autoDelete=" +
autoDelete +
", autoDeleteDelay=" +
autoDeleteDelay +
", autoDeleteMessageCount=" +
autoDeleteMessageCount +
", name=" + name +
", address=" + address +
", filterString=" + filterString +
", user=" + user +
", autoCreated=" + autoCreated +
", maxConsumers=" + maxConsumers +
", purgeOnNoConsumers=" + purgeOnNoConsumers +
", enabled=" + enabled +
", exclusive=" + exclusive +
", lastValue=" + lastValue +
", lastValueKey=" + lastValueKey +
", nonDestructive=" + nonDestructive +
", consumersBeforeDispatch=" + consumersBeforeDispatch +
", delayBeforeDispatch=" + delayBeforeDispatch +
", routingType=" + routingType +
", configurationManaged=" + configurationManaged +
", groupRebalance=" + groupRebalance +
", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch +
", groupBuckets=" + groupBuckets +
", groupFirstKey=" + groupFirstKey +
", autoDelete=" + autoDelete +
", autoDeleteDelay=" + autoDeleteDelay +
", autoDeleteMessageCount=" + autoDeleteMessageCount +
", internal=" + internal +
"]";
}

Expand All @@ -158,7 +138,8 @@ public PersistentQueueBindingEncoding(final SimpleString name,
final long autoDeleteMessageCount,
final byte routingType,
final boolean configurationManaged,
final long ringSize) {
final long ringSize,
final boolean internal) {
this.name = name;
this.address = address;
this.filterString = filterString;
Expand All @@ -183,6 +164,7 @@ public PersistentQueueBindingEncoding(final SimpleString name,
this.routingType = routingType;
this.configurationManaged = configurationManaged;
this.ringSize = ringSize;
this.internal = internal;
}

@Override
Expand Down Expand Up @@ -387,6 +369,11 @@ public long getRingSize() {
return ringSize;
}

@Override
public boolean isInternal() {
return internal;
}

@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
Expand Down Expand Up @@ -499,6 +486,12 @@ public void decode(final ActiveMQBuffer buffer) {
} else {
groupRebalancePauseDispatch = ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch();
}

if (buffer.readableBytes() > 0) {
internal = buffer.readBoolean();
} else {
internal = ActiveMQDefaultConfiguration.getDefaultInternal();
}
}

@Override
Expand Down Expand Up @@ -527,6 +520,7 @@ public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(ringSize);
buffer.writeBoolean(enabled);
buffer.writeBoolean(groupRebalancePauseDispatch);
buffer.writeBoolean(internal);
}

@Override
Expand All @@ -552,6 +546,7 @@ public int getEncodeSize() {
SimpleString.sizeofNullableString(groupFirstKey) +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void initAddresses(List<AddressBindingInfo> addressBindingInfos) throws E
for (AddressBindingInfo addressBindingInfo : addressBindingInfos) {
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes());
addressInfo.setId(addressBindingInfo.getId());
addressInfo.setAutoCreated(addressBindingInfo.getAutoCreated());
addressInfo.setAutoCreated(addressBindingInfo.isAutoCreated());
if (addressBindingInfo.getAddressStatusEncoding() != null && addressBindingInfo.getAddressStatusEncoding().getStatus() == AddressQueueStatus.PAUSED) {
addressInfo.setStorageManager(storageManager);
addressInfo.setPostOffice(postOffice);
Expand Down

0 comments on commit cd8a2e5

Please sign in to comment.