Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARTEMIS-550] fix up test to validate CORE and implement FQQN check b… #1820

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.SelectorTranslator;

/**
Expand Down Expand Up @@ -698,8 +699,17 @@ private ActiveMQMessageConsumer createConsumer(final ActiveMQDestination dest,
*/
if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
if (response.isAutoCreateQueues()) {
SimpleString queueNameToUse = dest.getSimpleAddress();
SimpleString addressToUse = queueNameToUse;
RoutingType routingTypeToUse = RoutingType.ANYCAST;
if (CompositeAddress.isFullyQualified(queueNameToUse.toString())) {
CompositeAddress compositeAddress = CompositeAddress.getQueueName(queueNameToUse.toString());
addressToUse = new SimpleString(compositeAddress.getAddress());
queueNameToUse = new SimpleString(compositeAddress.getQueueName());
routingTypeToUse = RoutingType.MULTICAST;
Copy link
Contributor

@michaelandrepearce michaelandrepearce Jan 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to Virtual Topics support for OpenWire? My concern here is, i request a composite address on a JMS Queue, it should be ANYCAST routing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is, at least the use case in the test. It may be that some FQQN scenarios are not applicable to CORE.
I am not sure how to determine the routingTypeToUse if both the address and subscription queue need to be created.
Is there a way to know this is a fanout consumer queue/subscription queue rather than a simple ANYCAST address.
On the server in the openwire case there is a binding query such the address provides this information but I did not want to go down the route of another round trip.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original test case that I added to, had both artemis and openwire jms clients in the mix, a parameterised test. It may be that the vt use case needs to be constrained to openwire because I think your expectation is correct. I guess in the case that the address already existed I may be able to find the routing type there.
But when both address and queue need to be created core would be out of luck.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if this is related to virtual topics why complicate things for the Core JMS client and introduce such problems it has JMS 2.0 spec as such shared durable subscribers, which is the spec equiv of such feature.

Copy link
Contributor

@graben graben Jan 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, that's not correct vt and shared subscriptions are only similar but not equal. Shared subscriptions still need connections to have a clientid. Therefor those connections are bounded to a special use case. VTs has queue semantic. No "special" connections are needed. Such pooling of connections for almost everything sending/consuming/... is possible. Artemis still leaks such a feature on its core protocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at my test case in JIRA ARTEMIS-550 and you'll see that there is still a problem concerning usage of FQQN via core protocol.

Copy link
Contributor

@michaelandrepearce michaelandrepearce Jan 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t see how that’s breaking, it’s been clear virtual topics as it is in activemq5 isn’t going to be ported as is to the Core JMS 2.0 layer, as use cases will be covered either with JMS 2.0 spec or FQQN (note FQQN != virtual topic).

As noted here your use case is covered by jms shared subscriptions.

This PR atm breaks the documented address model to JMS mapping’s that exists. That a JMS queue maps onto Anycast Address and a JMS topic maps onto an Multicast Address, all FQQN does is give a fully qualified queue name, it doesnt alter or define if it should be anycast of multicast, that is taken from if the destination provided is JMSQueue or JMSTopic.

@gtully
The only two ways id see this cleanly working is:

  1. some new optional pre-fix support was added to FQQN naming convention to add the ability to define / override if multicast or anycast (as FQQN is multi protocol this should be protocol / api agnostic), thus not affecting current but allowing a JMSQueue to create a multicast address via FQQN. Though still i think we have to be honest here, that if the use case is covered with JMS 2.0 is there a real need to complicate things more at risk of having more corner cases that can break.

  2. The other is to create a new packet request and response and server side service to match the address and get the address settings, and then set to the default routing type, if its a fqqn.
    This would be a new packet, this would need to be back/forwards compatible e.g. the client should not make this service call if it connects to an older broker, and simply keep the existing default of ANYCAST.
    I guess also this should behave the same for either JMS Topic of JMS Queue's that if FQQN then it does the same.

What can't happen is to break the default and released behaviours (not without a major version bump anyhow).

I understand ensuring the support of Virtual on openwire where JMS 1.1 so JMS Shared Subscriber doesn't exits, but thats all managed within the openwire protocol manager essentially mapping those concepts all over to core server side.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelandrepearce I shared some use cases and scenarios regarding 5.x virtual topics on 550. JMS 2.0 SharedDurable subscriptions is only one use case that virtual topics help with. However, there are several that JMS 2.0 SDS won’t cover that 5.x virtual topics do. Additionally, other brokers have these features, so I believe they are “messaging patterns” vs “ActiveMQ 5.x-only” thing.

Check it out, and let me know if you want to hop on irc or a chat to go review it.

At any rate, I think Artemis would benefit from supporting virtual topics (or whatever new name is more suitable)

Copy link
Contributor

@michaelandrepearce michaelandrepearce Jan 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattrpav im on IRC. fyi im not saying all use cases are solved by SDC, but in this case it was. We should be looking to map use cases / functionality over, not lifting and shifting virt topics as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelandrepearce I agree with you, I will close out this PR.
The discussion is the valuable bit.
For this use case, SDS is the answer.
For the more general CORE FQQN scenario, using a prefix with a FQQN would be the other approach. It probably should support the prefix notation via the acceptor.
multicast://VirtualTopic.Orders::Consumer.A
This says, create me a sub queue called Consumer.A`` that is bound using multicastrouting to addressVirtualTopic.Orders```.

}
try {
session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers());
session.createQueue(addressToUse, routingTypeToUse, queueNameToUse, null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers());
} catch (ActiveMQQueueExistsException e) {
// The queue was created by another client/admin between the query check and send create queue packet
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -78,6 +80,12 @@ public FQQNOpenWireTest(String factoryType) {
}
}

@Override
protected void extraServerConfig(Configuration serverConfig) {
// allow auto creation of advisory dests
serverConfig.getAddressesSettings().put("ActiveMQ.Advisory.#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true));
}

@Test
//there isn't much use of FQQN for topics
//however we can test query functionality
Expand Down Expand Up @@ -325,9 +333,7 @@ public void testVirtualTopicFQQNAutoCreateQueue() throws Exception {
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);

try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exFact.setWatchTopicAdvisories(false);
exConn = exFact.createConnection();
exConn = factory.createConnection();
exConn.start();

Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Expand Down Expand Up @@ -372,9 +378,7 @@ public void testVirtualTopicFQQNAutoCreateQAndAddress() throws Exception {
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);

try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exFact.setWatchTopicAdvisories(false);
exConn = exFact.createConnection();
exConn = factory.createConnection();
exConn.start();

Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Expand Down Expand Up @@ -419,9 +423,7 @@ public void testVirtualTopicFQQNConsumerAutoCreateQAndAddress() throws Exception
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);

try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exFact.setWatchTopicAdvisories(false);
exConn = exFact.createConnection();
exConn = factory.createConnection();
exConn.start();

Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Expand Down Expand Up @@ -469,9 +471,7 @@ public void testVirtualTopicFQQNAutoCreateQWithExistingAddressWithAnyCastDefault
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.ANYCAST);

try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exFact.setWatchTopicAdvisories(false);
exConn = exFact.createConnection();
exConn = factory.createConnection();
exConn.start();

Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Expand Down