Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public interface AddressQuery {
* queue, <code>false</code> else.
*/
boolean isAutoCreateJmsQueues();

/**
* Returns <code>true</code> if auto-creation for this address is enabled and if the address queried is for a JMS
* topic, <code>false</code> else.
*/
boolean isAutoCreateJmsTopics();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,9 @@ void addAddressSettings(@Parameter(desc = "an address match", name = "addressMat
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception;
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
@Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
@Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception;

void removeAddressSettings(String addressMatch) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ public final class AddressSettingsInfo {

private final boolean autoDeleteJmsQueues;

private final boolean autoCreateJmsTopics;

private final boolean autoDeleteJmsTopics;

// Static --------------------------------------------------------

public static AddressSettingsInfo from(final String jsonString) throws Exception {
JSONObject object = new JSONObject(jsonString);
return new AddressSettingsInfo(object.getString("addressFullMessagePolicy"), object.getLong("maxSizeBytes"), object.getInt("pageSizeBytes"), object.getInt("pageCacheMaxSize"), object.getInt("maxDeliveryAttempts"), object.getLong("redeliveryDelay"), object.getDouble("redeliveryMultiplier"), object.getLong("maxRedeliveryDelay"), object.getString("DLA"), object.getString("expiryAddress"), object.getBoolean("lastValueQueue"), object.getLong("redistributionDelay"), object.getBoolean("sendToDLAOnNoRoute"), object.getLong("slowConsumerThreshold"), object.getLong("slowConsumerCheckPeriod"), object.getString("slowConsumerPolicy"), object.getBoolean("autoCreateJmsQueues"), object.getBoolean("autoDeleteJmsQueues"));
return new AddressSettingsInfo(object.getString("addressFullMessagePolicy"), object.getLong("maxSizeBytes"), object.getInt("pageSizeBytes"), object.getInt("pageCacheMaxSize"), object.getInt("maxDeliveryAttempts"), object.getLong("redeliveryDelay"), object.getDouble("redeliveryMultiplier"), object.getLong("maxRedeliveryDelay"), object.getString("DLA"), object.getString("expiryAddress"), object.getBoolean("lastValueQueue"), object.getLong("redistributionDelay"), object.getBoolean("sendToDLAOnNoRoute"), object.getLong("slowConsumerThreshold"), object.getLong("slowConsumerCheckPeriod"), object.getString("slowConsumerPolicy"), object.getBoolean("autoCreateJmsQueues"), object.getBoolean("autoDeleteJmsQueues"), object.getBoolean("autoCreateJmsTopics"), object.getBoolean("autoDeleteJmsTopics"));
}

// Constructors --------------------------------------------------
Expand All @@ -83,7 +87,9 @@ public AddressSettingsInfo(String addressFullMessagePolicy,
long slowConsumerCheckPeriod,
String slowConsumerPolicy,
boolean autoCreateJmsQueues,
boolean autoDeleteJmsQueues) {
boolean autoDeleteJmsQueues,
boolean autoCreateJmsTopics,
boolean autoDeleteJmsTopics) {
this.addressFullMessagePolicy = addressFullMessagePolicy;
this.maxSizeBytes = maxSizeBytes;
this.pageSizeBytes = pageSizeBytes;
Expand All @@ -102,6 +108,8 @@ public AddressSettingsInfo(String addressFullMessagePolicy,
this.slowConsumerPolicy = slowConsumerPolicy;
this.autoCreateJmsQueues = autoCreateJmsQueues;
this.autoDeleteJmsQueues = autoDeleteJmsQueues;
this.autoCreateJmsTopics = autoCreateJmsTopics;
this.autoDeleteJmsTopics = autoDeleteJmsTopics;
}

// Public --------------------------------------------------------
Expand Down Expand Up @@ -181,5 +189,13 @@ public boolean isAutoCreateJmsQueues() {
public boolean isAutoDeleteJmsQueues() {
return autoDeleteJmsQueues;
}

public boolean isAutoCreateJmsTopics() {
return autoCreateJmsTopics;
}

public boolean isAutoDeleteJmsTopics() {
return autoDeleteJmsTopics;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ public class AddressQueryImpl implements ClientSession.AddressQuery {

private final boolean autoCreateJmsQueues;

private final boolean autoCreateJmsTopics;

public AddressQueryImpl(final boolean exists,
final List<SimpleString> queueNames,
final boolean autoCreateJmsQueues) {
final boolean autoCreateJmsQueues,
final boolean autoCreateJmsTopics) {
this.exists = exists;
this.queueNames = new ArrayList<>(queueNames);
this.autoCreateJmsQueues = autoCreateJmsQueues;
this.autoCreateJmsTopics = autoCreateJmsTopics;
}

@Override
Expand All @@ -52,4 +56,9 @@ public boolean isExists() {
public boolean isAutoCreateJmsQueues() {
return autoCreateJmsQueues;
}

@Override
public boolean isAutoCreateJmsTopics() {
return autoCreateJmsTopics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,18 @@
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
Expand Down Expand Up @@ -283,9 +283,9 @@ public int getServerVersion() {

@Override
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2);
SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3);

return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues());
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), response.isAutoCreateJmsTopics());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ public boolean supports(final byte packetType) {
return version >= 126;
case PacketImpl.SESS_BINDINGQUERY_RESP_V2:
return version >= 126;
case PacketImpl.SESS_BINDINGQUERY_RESP_V3:
return version >= 127;
default:
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
Expand Down Expand Up @@ -110,6 +111,7 @@
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
Expand Down Expand Up @@ -257,6 +259,10 @@ public Packet decode(byte packetType) {
packet = new SessionBindingQueryResponseMessage_V2();
break;
}
case SESS_BINDINGQUERY_RESP_V3: {
packet = new SessionBindingQueryResponseMessage_V3();
break;
}
case SESS_XA_START: {
packet = new SessionXAStartMessage();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ public class PacketImpl implements Packet {

public static final byte REPLICATION_RESPONSE_V2 = -9;

public static final byte SESS_BINDINGQUERY_RESP_V3 = -10;

// Static --------------------------------------------------------

public PacketImpl(final byte type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public SessionBindingQueryResponseMessage() {
super(SESS_BINDINGQUERY_RESP);
}

public SessionBindingQueryResponseMessage(byte v2) {
super(v2);
public SessionBindingQueryResponseMessage(byte v) {
super(v);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryResponseMessage {

private boolean autoCreateJmsQueues;
protected boolean autoCreateJmsQueues;

public SessionBindingQueryResponseMessage_V2(final boolean exists,
final List<SimpleString> queueNames,
Expand All @@ -41,6 +41,10 @@ public SessionBindingQueryResponseMessage_V2() {
super(SESS_BINDINGQUERY_RESP_V2);
}

public SessionBindingQueryResponseMessage_V2(byte v) {
super(v);
}

public boolean isAutoCreateJmsQueues() {
return autoCreateJmsQueues;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;

import java.util.List;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;

public class SessionBindingQueryResponseMessage_V3 extends SessionBindingQueryResponseMessage_V2 {

private boolean autoCreateJmsTopics;

public SessionBindingQueryResponseMessage_V3(final boolean exists,
final List<SimpleString> queueNames,
final boolean autoCreateJmsQueues,
final boolean autoCreateJmsTopics) {
super(SESS_BINDINGQUERY_RESP_V3);

this.exists = exists;

this.queueNames = queueNames;

this.autoCreateJmsQueues = autoCreateJmsQueues;

this.autoCreateJmsTopics = autoCreateJmsTopics;
}

public SessionBindingQueryResponseMessage_V3() {
super(SESS_BINDINGQUERY_RESP_V3);
}

public boolean isAutoCreateJmsTopics() {
return autoCreateJmsTopics;
}

@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeBoolean(autoCreateJmsTopics);
}

@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
autoCreateJmsTopics = buffer.readBoolean();
}

@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (autoCreateJmsTopics ? 1231 : 1237);
return result;
}

@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", exists=" + exists);
buff.append(", queueNames=" + queueNames);
buff.append(", autoCreateJmsQueues=" + autoCreateJmsQueues);
buff.append(", autoCreateJmsTopics=" + autoCreateJmsTopics);
buff.append("]");
return buff.toString();
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionBindingQueryResponseMessage_V3))
return false;
SessionBindingQueryResponseMessage_V3 other = (SessionBindingQueryResponseMessage_V3) obj;
if (autoCreateJmsTopics != other.autoCreateJmsTopics)
return false;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag}
activemq.version.compatibleVersionList=121,122,123,124,125,126
activemq.version.compatibleVersionList=121,122,123,124,125,126,127
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ private void doSendx(ActiveMQDestination destination,

// if it's autoCreateJMSQueue we will let the PostOffice.route to execute the creation at the server's side
// as that's a more efficient path for such operation
if (!query.isExists() && !query.isAutoCreateJmsQueues()) {
if (!query.isExists() && ((address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && !query.isAutoCreateJmsQueues()) || (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && !query.isAutoCreateJmsTopics()))) {
throw new InvalidDestinationException("Destination " + address + " does not exist");
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public MessageProducer createProducer(final Destination destination) throws JMSE
if (jbd != null) {
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());

if (!response.isExists() && !response.isAutoCreateJmsQueues()) {
if (!response.isExists() && ((jbd.getAddress().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && !response.isAutoCreateJmsQueues()) || (jbd.getAddress().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && !response.isAutoCreateJmsTopics()))) {
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
}

Expand Down Expand Up @@ -659,7 +659,7 @@ private ActiveMQMessageConsumer createConsumer(final ActiveMQDestination dest,
else {
AddressQuery response = session.addressQuery(dest.getSimpleAddress());

if (!response.isExists()) {
if (!response.isExists() && !response.isAutoCreateJmsTopics()) {
throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist");
}

Expand Down Expand Up @@ -1106,7 +1106,7 @@ private ActiveMQTopic lookupTopic(final String topicName, final boolean isTempor

AddressQuery query = session.addressQuery(topic.getSimpleAddress());

if (!query.isExists()) {
if (!query.isExists() && !query.isAutoCreateJmsTopics()) {
return null;
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ boolean createQueue(boolean storeConfig,
*/
boolean createTopic(boolean storeConfig, String topicName, String... bindings) throws Exception;

/**
*
* @param storeConfig
* @param topicName
* @param autoCreated
* @param bindings
* @return
* @throws Exception
*/
boolean createTopic(boolean storeConfig, String topicName, boolean autoCreated, String... bindings) throws Exception;

/**
* Remove the topic from the Binding Registry or BindingRegistry.
* Calling this method does <em>not</em> destroy the destination.
Expand Down
Loading