From 9ef90f8defdce6845b6d01cb03613111972ecabd Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 13 Dec 2017 21:53:19 -0500 Subject: [PATCH 1/2] ARTEMIS-1546 Adding tests to validate compatibility https://issues.apache.org/jira/browse/ARTEMIS-1546 - the dependency scan is changed to allow adding an extra repository - adding groovy so we won't require compilation dependencies (just runtime) without needing reflection (thanks Groovy :) ) - Adding hornetq to the mesh of version tests --- .../maven/ArtemisDependencyScanPlugin.java | 22 + .../send-acknowledgements-fail/pom.xml | 94 ++++ .../send-acknowledgements-fail/readme.html | 140 ++++++ .../example/SendAcknowledgementsExample.java | 151 ++++++ .../src/main/resources/jndi.properties | 20 + pom.xml | 6 + tests/compatibility-tests/pom.xml | 475 ++++++++++++++++++ .../tests/compatibility/GroovyRun.java | 118 +++++ .../resources/clients/artemisClient.groovy | 31 ++ .../resources/clients/hornetqClient.groovy | 33 ++ .../resources/meshTest/sendMessages.groovy | 205 ++++++++ .../sendAckTest/sendAckMessages.groovy | 97 ++++ .../src/main/resources/serial/serial.groovy | 54 ++ .../resources/servers/artemisServer.groovy | 61 +++ .../resources/servers/hornetqServer.groovy | 54 ++ .../artemis/tests/compatibility/MeshTest.java | 79 +++ .../tests/compatibility/SendAckTest.java | 78 +++ .../compatibility/SerializationTest.java | 76 +++ .../compatibility/VersionedBaseTest.java | 179 +++++++ .../tests/compatibility/package-info.java | 23 + tests/pom.xml | 1 + .../artemis/tests/util/SpawnedVMSupport.java | 63 ++- 22 files changed, 2057 insertions(+), 3 deletions(-) create mode 100644 examples/features/standard/send-acknowledgements-fail/pom.xml create mode 100644 examples/features/standard/send-acknowledgements-fail/readme.html create mode 100644 examples/features/standard/send-acknowledgements-fail/src/main/java/org/apache/activemq/artemis/jms/example/SendAcknowledgementsExample.java create mode 100644 examples/features/standard/send-acknowledgements-fail/src/main/resources/jndi.properties create mode 100644 tests/compatibility-tests/pom.xml create mode 100644 tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java create mode 100644 tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy create mode 100644 tests/compatibility-tests/src/main/resources/clients/hornetqClient.groovy create mode 100644 tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy create mode 100644 tests/compatibility-tests/src/main/resources/sendAckTest/sendAckMessages.groovy create mode 100644 tests/compatibility-tests/src/main/resources/serial/serial.groovy create mode 100644 tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy create mode 100644 tests/compatibility-tests/src/main/resources/servers/hornetqServer.groovy create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MeshTest.java create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/SendAckTest.java create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/SerializationTest.java create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/package-info.java diff --git a/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ArtemisDependencyScanPlugin.java b/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ArtemisDependencyScanPlugin.java index e59bc366c8b..4f1d1d8aaf4 100644 --- a/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ArtemisDependencyScanPlugin.java +++ b/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ArtemisDependencyScanPlugin.java @@ -19,6 +19,8 @@ import java.io.File; import java.nio.file.Files; import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; @@ -29,6 +31,7 @@ import org.apache.maven.plugins.annotations.Mojo; import org.apache.maven.plugins.annotations.Parameter; import org.apache.maven.project.MavenProject; +import org.eclipse.aether.repository.RemoteRepository; @Mojo(name = "dependency-scan", defaultPhase = LifecyclePhase.VERIFY) public class ArtemisDependencyScanPlugin extends ArtemisAbstractPlugin { @@ -47,6 +50,9 @@ public class ArtemisDependencyScanPlugin extends ArtemisAbstractPlugin { @Parameter private String[] libList; + @Parameter + private String[] extraRepositories; + @Parameter private String variableName; @@ -64,6 +70,16 @@ protected boolean isIgnore() { @Override protected void doExecute() throws MojoExecutionException, MojoFailureException { + + int repositories = 0; + List listRepo = new ArrayList<>(); + if (extraRepositories != null) { + for (String strRepo: extraRepositories) { + RemoteRepository repo = new RemoteRepository.Builder("repo" + (repositories++), "default", strRepo).build(); + listRepo.add(repo); + remoteRepos.add(repo); + } + } getLog().info("Local " + localRepository); MavenProject project = (MavenProject) getPluginContext().get("project"); @@ -102,7 +118,13 @@ protected void doExecute() throws MojoExecutionException, MojoFailureException { } catch (Throwable e) { getLog().error(e); throw new MojoFailureException(e.getMessage()); + } finally { + for (RemoteRepository repository : listRepo) { + remoteRepos.remove(repository); + } } + + } } diff --git a/examples/features/standard/send-acknowledgements-fail/pom.xml b/examples/features/standard/send-acknowledgements-fail/pom.xml new file mode 100644 index 00000000000..369f94e7208 --- /dev/null +++ b/examples/features/standard/send-acknowledgements-fail/pom.xml @@ -0,0 +1,94 @@ + + + + + 4.0.0 + + + org.apache.activemq.examples.broker + jms-examples + 2.5.0-SNAPSHOT + + + send-acknowledgements-fail + jar + ActiveMQ Artemis JMS Send Acknowledgements Example + + + ${project.basedir}/../../../.. + + + + + org.apache.activemq + artemis-jms-client-all + ${project.version} + + + org.apache.activemq + artemis-cli + ${project.version} + + + + + + + org.apache.activemq + artemis-maven-plugin + + + create + + create + + + ${basedir}/target/server0 + + --global-max-size + 10M + + + + + runClient + + runClient + + + org.apache.activemq.artemis.jms.example.SendAcknowledgementsExample + + ${basedir}/target/server0 + + + + + + + org.apache.activemq.examples.broker + send-acknowledgements-fail + ${project.version} + + + + + + + diff --git a/examples/features/standard/send-acknowledgements-fail/readme.html b/examples/features/standard/send-acknowledgements-fail/readme.html new file mode 100644 index 00000000000..fcc37fcc8c7 --- /dev/null +++ b/examples/features/standard/send-acknowledgements-fail/readme.html @@ -0,0 +1,140 @@ + + + + + ActiveMQ Artemis Asynchronous Send Acknowledgements Example + + + + + +

Asynchronous Send Acknowledgements Example

+ +
To run the example, simply type mvn verify from this directory, 
or mvn -PnoServer verify if you want to start and create the server manually.
+ + +

Asynchronous Send Acknowledgements are an advanced feature of ActiveMQ Artemis which allow you to + receive acknowledgements that messages were successfully received at the server in a separate thread to the sending thread

+

In this example we create a normal JMS session, then set a SendAcknowledgementHandler on the JMS + session's underlying core session. We send many messages to the server without blocking and asynchronously + receive send acknowledgements via the SendAcknowledgementHandler. + +

For more information on Asynchronous Send Acknowledgements please see the user manual

+

Example step-by-step

+

To run the example, simply type mvn verify -Pexample from this directory

+ +
    +
  1. First we need to get an initial context so we can look-up the JMS connection factory and destination objects from JNDI. This initial context will get it's properties from the client-jndi.properties file in the directory ../common/config
  2. +
    +           InitialContext initialContext = getContext();
    +        
    + +
  3. We look-up the JMS queue object from JNDI
  4. +
    +           Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
    +        
    + +
  5. We look-up the JMS connection factory object from JNDI
  6. +
    +           ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
    +        
    + +
  7. We create a JMS connection
  8. +
    +           connection = cf.createConnection();
    +        
    + +
  9. Define a SendAcknowledgementHandler which will receive asynchronous acknowledgements
  10. +
    +           
    +         class MySendAcknowledgementsHandler implements SendAcknowledgementHandler
    +         {
    +            int count = 0;
    +
    +            public void sendAcknowledged(final Message message)
    +            {
    +               System.out.println("Received send acknowledgement for message " + count++);
    +            }
    +         }
    +           
    +        
    + +
  11. Create a JMS session
  12. +
    +          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    +       
    + +
  13. Set the handler on the underlying core session
  14. +
    +           
    +         ClientSession coreSession = ((ActiveMQSession)session).getCoreSession();
    +
    +         coreSession.setSendAcknowledgementHandler(new MySendAcknowledgementsHandler());
    +
    +           
    +        
    + +
  15. Create a JMS Message Producer
  16. +
    +           
    +         MessageProducer producer = session.createProducer(queue);
    +
    +         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    +           
    +        
    + +
  17. Send 5000 messages, the handler will get called asynchronously some time later after the messages are sent.
  18. +
    +           
    +         final int numMessages = 5000;
    +
    +         for (int i = 0; i < numMessages; i++)
    +         {
    +            javax.jms.Message jmsMessage = session.createMessage();
    +
    +            producer.send(jmsMessage);
    +
    +            System.out.println("Sent message " + i);
    +         }
    +           
    +        
    + + +
  19. And finally, always remember to close your JMS connections and resources after use, in a finally block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects
  20. + +
    +           finally
    +           {
    +              if (initialContext != null)
    +              {
    +                initialContext.close();
    +              }
    +              if (connection != null)
    +              {
    +                 connection.close();
    +              }
    +           }
    +        
    + + + +
+ + diff --git a/examples/features/standard/send-acknowledgements-fail/src/main/java/org/apache/activemq/artemis/jms/example/SendAcknowledgementsExample.java b/examples/features/standard/send-acknowledgements-fail/src/main/java/org/apache/activemq/artemis/jms/example/SendAcknowledgementsExample.java new file mode 100644 index 00000000000..86c4333619a --- /dev/null +++ b/examples/features/standard/send-acknowledgements-fail/src/main/java/org/apache/activemq/artemis/jms/example/SendAcknowledgementsExample.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.util.ServerUtil; + +/** + * Asynchronous Send Acknowledgements are an advanced feature of ActiveMQ Artemis which allow you to + * receive acknowledgements that messages were successfully received at the server in a separate stream + * to the stream of messages being sent to the server. + * For more information please see the readme.html file + */ +public class SendAcknowledgementsExample { + + private static Process server0; + private static final int numMessages = 30_000; + private static final SimpleString queueName = SimpleString.toSimpleString("testQueue"); + + public static void main(final String[] args) throws Exception { + + for (int i = 0; i < 500; i++) { + System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ Running test " + i); + server0 = ServerUtil.startServer(args[0], SendAcknowledgementsExample.class.getSimpleName() + "0", 0, 10000); + sendMessages(); + + server0 = ServerUtil.startServer(args[0], SendAcknowledgementsExample.class.getSimpleName() + "0", 0, 10000); + consumeMessages(); + } + } + + private static void sendMessages() throws Exception { + try { + + ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(false).setConfirmationWindowSize(1024 * 1024); + + ClientSessionFactory factory = locator.createSessionFactory(); + + ClientSession session = factory.createSession(null, null, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE); + + try { + // Tried with and without the createAddress + session.createAddress(queueName, RoutingType.MULTICAST, false); + session.createQueue(queueName.toString(), RoutingType.MULTICAST, queueName.toString(), true); + } catch (Exception e) { + } + + ClientProducer producer = session.createProducer(queueName); + + CountDownLatch latch = new CountDownLatch(numMessages); + + for (int i = 0; i < numMessages; i++) { + + if (i % 10000 == 0) { + System.out.println("Send " + i); + } + ClientMessage message = session.createMessage(true); + message.getBodyBuffer().writeBytes("hello world".getBytes()); + + // tried with producer.send(queueName, message, ...);; // didn't make a difference + + producer.send(message, new SendAcknowledgementHandler() { + @Override + public void sendAcknowledged(Message message) { + latch.countDown(); + if (latch.getCount() % 10_000 == 0) { + System.out.println(latch.getCount() + " to go"); + } + } + }); + } + latch.await(10, TimeUnit.MINUTES); + } finally { + server0.destroy(); + server0.waitFor(); + } + } + + private static void consumeMessages() throws Exception { + try { + + ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(false).setConfirmationWindowSize(-1); + + ClientSessionFactory factory = locator.createSessionFactory(); + + ClientSession session = factory.createSession(null, null, false, false, false, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE); + + ClientConsumer consumer = session.createConsumer(queueName); + + session.start(); + + for (int i = 0; i < numMessages; i++) { + + if (i % 10000 == 0) { + System.out.println("Received " + i); + } + + ClientMessage message = consumer.receive(5000); + message.acknowledge(); + + if (message == null) { + System.err.println("Expected message at " + i); + System.exit(-1); + } + } + + session.commit(); + + ClientMessage message = consumer.receiveImmediate(); + if (message != null) { + System.err.println("Received too many messages"); + System.exit(-1); + } + + session.close(); + locator.close(); + + } finally { + server0.destroy(); + server0.waitFor(); + } + } + +} diff --git a/examples/features/standard/send-acknowledgements-fail/src/main/resources/jndi.properties b/examples/features/standard/send-acknowledgements-fail/src/main/resources/jndi.properties new file mode 100644 index 00000000000..8421f257d49 --- /dev/null +++ b/examples/features/standard/send-acknowledgements-fail/src/main/resources/jndi.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory +connectionFactory.ConnectionFactory=tcp://localhost:61616?confirmationWindowSize=1048576 +queue.queue/exampleQueue=exampleQueue diff --git a/pom.xml b/pom.xml index 885b97049e8..1baad38937f 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,9 @@ 0.7.9 0.7.9 + + 2.4.3 + 1.4.3 5.0.1.RELEASE @@ -126,6 +129,7 @@ true true true + true true true true @@ -959,6 +963,7 @@ false false false + false false true false @@ -1005,6 +1010,7 @@ true false false + false diff --git a/tests/compatibility-tests/pom.xml b/tests/compatibility-tests/pom.xml new file mode 100644 index 00000000000..443b0ede03b --- /dev/null +++ b/tests/compatibility-tests/pom.xml @@ -0,0 +1,475 @@ + + + 4.0.0 + + + org.apache.activemq.tests + artemis-tests-pom + 2.5.0-SNAPSHOT + + + compatibility-tests + jar + ActiveMQ Artemis Compatibility Tests + + + ${project.basedir}/../.. + + + + + org.apache.activemq + artemis-server + ${project.version} + test + test-jar + + + org.apache.activemq + artemis-commons + ${project.version} + test + test-jar + + + org.apache.activemq.tests + unit-tests + ${project.version} + test + test-jar + + + org.apache.activemq + artemis-core-client + ${project.version} + test + test-jar + + + org.apache.activemq + artemis-jms-client + ${project.version} + + + org.apache.activemq + artemis-jms-server + ${project.version} + + + org.apache.activemq + artemis-ra + ${project.version} + + + org.apache.activemq + artemis-cli + ${project.version} + + + org.apache.activemq + artemis-commons + ${project.version} + + + org.apache.activemq + artemis-spring-integration + ${project.version} + + + org.apache.activemq + artemis-journal + ${project.version} + + + org.apache.activemq + artemis-jdbc-store + ${project.version} + + + org.apache.activemq + artemis-amqp-protocol + ${project.version} + + + org.apache.activemq + artemis-stomp-protocol + ${project.version} + + + org.apache.activemq + artemis-openwire-protocol + ${project.version} + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + + + + + org.apache.activemq + artemis-hornetq-protocol + ${project.version} + + + org.apache.activemq + artemis-core-client + ${project.version} + + + org.apache.activemq + artemis-server + ${project.version} + + + org.apache.activemq + artemis-native + ${project.version} + + + org.apache.activemq + artemis-service-extensions + ${project.version} + + + org.apache.activemq.tests + artemis-test-support + ${project.version} + + + org.apache.activemq + artemis-features + ${project.version} + pom + + + org.apache.activemq.rest + artemis-rest + 2.5.0-SNAPSHOT + + + org.codehaus.groovy + groovy-all + ${groovy.version} + + + org.apache.activemq + artemis-junit + ${project.version} + test + + + org.apache.activemq + activemq-client + test + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + + + + + org.apache.activemq + artemis-mqtt-protocol + ${project.version} + + + org.fusesource.mqtt-client + mqtt-client + + + org.apache.geronimo.specs + geronimo-json_1.0_spec + test + + + org.apache.geronimo.specs + geronimo-j2ee-connector_1.5_spec + + + org.apache.geronimo.specs + geronimo-jta_1.1_spec + + + org.apache.geronimo.specs + geronimo-jms_2.0_spec + + + org.apache.johnzon + johnzon-core + test + + + io.netty + netty-buffer + + + io.netty + netty-codec-http + + + io.netty + netty-codec-mqtt + ${netty.version} + + + io.netty + netty-handler + + + io.netty + netty-transport + + + junit + junit + + + org.jboss.logging + jboss-logging-processor + provided + true + + + org.jboss.logging + jboss-logging-annotations + provided + true + + + org.jboss.logging + jboss-logging + + + org.jboss.logmanager + jboss-logmanager + + + org.apache.qpid + qpid-jms-client + ${qpid.jms.version} + + + org.apache.qpid + proton-j + + + org.slf4j + slf4j-api + 1.7.5 + + + + + + + src/test/resources + true + + + + + org.apache.maven.plugins + maven-jar-plugin + + + test + + test-jar + + + + + + org.apache.activemq + artemis-maven-plugin + + + + snapshot-check + compile + + dependency-scan + + + + org.apache.activemq:artemis-jms-server:${project.version} + org.apache.activemq:artemis-jms-client:${project.version} + org.apache.activemq:artemis-hornetq-protocol:${project.version} + org.apache.activemq:artemis-amqp-protocol:${project.version} + org.apache.activemq:artemis-hornetq-protocol:${project.version} + org.codehaus.groovy:groovy-all:${groovy.version} + + + org.apache.activemq.tests:compatibility-tests:${project.version} + + ARTEMIS-SNAPSHOT + + + + compile + + dependency-scan + + 240-check + + + org.apache.activemq:artemis-jms-server:2.4.0 + org.apache.activemq:artemis-jms-client:2.4.0 + org.apache.activemq:artemis-hornetq-protocol:2.4.0 + org.apache.activemq:artemis-amqp-protocol:2.4.0 + org.apache.activemq:artemis-hornetq-protocol:2.4.0 + org.codehaus.groovy:groovy-all:${groovy.version} + + + org.apache.activemq.tests:compatibility-tests:${project.version} + + ARTEMIS-240 + + + + 140-check + compile + + dependency-scan + + + + org.apache.activemq:artemis-jms-server:1.4.0 + org.apache.activemq:artemis-jms-client:1.4.0 + org.apache.activemq:artemis-hornetq-protocol:1.4.0 + org.apache.activemq:artemis-amqp-protocol:1.4.0 + org.apache.activemq:artemis-hornetq-protocol:1.4.0 + org.codehaus.groovy:groovy-all:${groovy.version} + + + org.apache.activemq.tests:compatibility-tests:${project.version} + + ARTEMIS-140 + + + + hornetq-235 + compile + + dependency-scan + + + + + https://repository.jboss.org/nexus/content/groups/public + + + org.hornetq:hornetq-jms-server:2.4.7.Final + org.codehaus.groovy:groovy-all:${groovy.version} + + + org.apache.activemq.tests:compatibility-tests:${project.version} + + HORNETQ-235 + + + + hornetq-247 + compile + + dependency-scan + + + + + https://repository.jboss.org/nexus/content/groups/public + + + org.hornetq:hornetq-jms-server:2.4.7.Final + org.codehaus.groovy:groovy-all:${groovy.version} + + + org.apache.activemq.tests:compatibility-tests:${project.version} + + HORNETQ-247 + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + ARTEMIS-SNAPSHOT + ${ARTEMIS-SNAPSHOT} + + + ARTEMIS-240 + ${ARTEMIS-240} + + + ARTEMIS-140 + ${ARTEMIS-140} + + + HORNETQ-235 + ${HORNETQ-235} + + + HORNETQ-247 + ${HORNETQ-247} + + + ${skipCompatibilityTests} + -Djgroups.bind_addr=::1 ${activemq-surefire-argline} + + + + org.apache.servicemix.tooling + depends-maven-plugin + 1.2 + + + generate-depends-file + + generate-depends-file + + + + + + + + + + jboss-releases-repository + JBoss Releases Repository + https://repository.jboss.org/nexus/service/local/staging/deploy/maven2/ + + + jboss-snapshots-repository + JBoss Snapshots Repository + https://repository.jboss.org/nexus/content/repositories/snapshots/ + + + + + diff --git a/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java b/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java new file mode 100644 index 00000000000..08e6c340818 --- /dev/null +++ b/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java @@ -0,0 +1,118 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; + +import groovy.lang.Binding; +import groovy.lang.GroovyShell; + +public class GroovyRun { + + public static final String SNAPSHOT = "ARTEMIS-SNAPSHOT"; + public static final String ONE_FOUR = "ARTEMIS-140"; + public static final String TWO_FOUR = "ARTEMIS-240"; + public static final String HORNETQ_235 = "HORNETQ-235"; + public static final String HORNETQ_247 = "HORNETQ-247"; + + public static final String WORD_START = "**SERVER STARTED**"; + + public static Binding binding = new Binding(); + public static GroovyShell shell = new GroovyShell(binding); + + // Called with reflection + public static void doMain(String script, String... arg) throws Throwable { + int i = 0; + for (String a : arg) { + System.out.println("[" + (i++) + "]=" + a); + } + System.out.println(); + + evaluate(script, "arg", arg); + + System.out.println(WORD_START); + } + + /** + * This can be called from the scripts as well. + * The scripts will use this method instead of its own groovy method. + * As a classloader operation needs to be done here. + */ + public static void evaluate(String script, + String argVariableName, + String[] arg) throws URISyntaxException, IOException { + URL scriptURL = GroovyRun.class.getClassLoader().getResource(script); + if (scriptURL == null) { + throw new RuntimeException("cannot find " + script); + } + URI scriptURI = scriptURL.toURI(); + + binding.setVariable(argVariableName, arg); + + shell.evaluate(scriptURI); + } + + // Called with reflection + public static void execute(String script) throws Throwable { + shell.evaluate(script); + } + + public static void assertNotNull(Object value) { + if (value == null) { + throw new RuntimeException("Null value"); + } + } + + public static void assertNull(Object value) { + if (value != null) { + throw new RuntimeException("Expected Null value"); + } + } + + public static void assertTrue(boolean value) { + if (!value) { + throw new RuntimeException("Expected true"); + } + } + + public static void assertEquals(Object value1, Object value2) { + if (!value1.equals(value2)) { + throw new RuntimeException(value1 + "!=" + value2); + } + } + + public static void assertEquals(int value1, int value2) { + if (value1 != value2) { + throw new RuntimeException(value1 + "!=" + value2); + } + } + + public static void assertEquals(byte[] value1, byte[] value2) { + + assertEquals(value1.length, value2.length); + + for (int i = 0; i < value1.length; i++) { + assertEquals(value1[i], value2[i]); + } + } + + + public static byte getSamplebyte(final long position) { + return (byte) ('a' + position % ('z' - 'a' + 1)); + } +} + diff --git a/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy b/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy new file mode 100644 index 00000000000..e729ebb0fa3 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy @@ -0,0 +1,31 @@ +package clients +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +// Create a client connection factory + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun; + +println("serverType " + serverArg[0]); + +if (serverArg[0].startsWith("HORNETQ")) { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false"); +} else { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false"); +} + + +GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend()); +GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize()); + diff --git a/tests/compatibility-tests/src/main/resources/clients/hornetqClient.groovy b/tests/compatibility-tests/src/main/resources/clients/hornetqClient.groovy new file mode 100644 index 00000000000..63bf4e6da19 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/clients/hornetqClient.groovy @@ -0,0 +1,33 @@ +package clients +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +// This script is called by sendMessages.groovy + +import javax.jms.ConnectionFactory; +import java.util.HashMap; +import java.util.Map; + +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.jms.HornetQJMSConstants; +import org.hornetq.core.message.impl.MessageImpl; +import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory; +import org.hornetq.core.remoting.impl.netty.TransportConstants; +import org.hornetq.jms.client.HornetQJMSConnectionFactory; + + +properties = new HashMap(); +properties.put(TransportConstants.HOST_PROP_NAME, "localhost"); +properties.put(TransportConstants.PORT_PROP_NAME, "61616"); +configuration = new TransportConfiguration(NettyConnectorFactory.class.getName(), properties); +cf = new HornetQJMSConnectionFactory(false, configuration); diff --git a/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy b/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy new file mode 100644 index 00000000000..fdfddee54f6 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy @@ -0,0 +1,205 @@ +package meshTest + +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +// starts an artemis server +String serverType = arg[0]; +String clientType = arg[1]; +String operation = arg[2]; + + +String queueName = "queue"; + +int LARGE_MESSAGE_SIZE = 10 * 1024; + +String propertyLargeMessage = "JMS_AMQ_InputStream"; +HDR_DUPLICATE_DETECTION_ID = "_AMQ_DUPL_ID"; + +if (clientType.startsWith("HORNETQ")) { + HDR_DUPLICATE_DETECTION_ID = "_HQ_DUPL_ID"; + propertyLargeMessage = "JMS_HQ_InputStream" +} + +BYTES_BODY = new byte[3]; +BYTES_BODY[0] = (byte) 0x77; +BYTES_BODY[1] = (byte) 0x77; +BYTES_BODY[2] = (byte) 0x77; + +String textBody = "a rapadura e doce mas nao e mole nao"; + + +println("serverType " + serverType); + +if (clientType.startsWith("ARTEMIS")) { + // Can't depend directly on artemis, otherwise it wouldn't compile in hornetq + GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType); +} else { + // Can't depend directly on hornetq, otherwise it wouldn't compile in artemis + GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg"); +} + + +Connection connection = cf.createConnection(); +Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +Queue queue = session.createQueue(queueName); + +if (operation.equals("sendAckMessages")) { + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + System.out.println("Sending messages"); + + TextMessage message = session.createTextMessage(textBody); + message.setStringProperty(HDR_DUPLICATE_DETECTION_ID, "some-duplicate"); + message.setStringProperty("prop", "test"); + producer.send(message); + + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(BYTES_BODY); + producer.send(bytesMessage); + + + for (int i = 0; i < 10; i++) { + BytesMessage m = session.createBytesMessage(); + m.setIntProperty("count", i); + + m.setObjectProperty(propertyLargeMessage, createFakeLargeStream(LARGE_MESSAGE_SIZE)); + + producer.send(m); + } + + producer.send(session.createObjectMessage("rapadura")); + + MapMessage mapMessage = session.createMapMessage(); + mapMessage.setString("prop", "rapadura") + producer.send(mapMessage); + + StreamMessage streamMessage = session.createStreamMessage(); + streamMessage.writeString("rapadura"); + streamMessage.writeString("doce"); + streamMessage.writeInt(33); + producer.send(streamMessage); + + Message plain = session.createMessage(); + plain.setStringProperty("plain", "doce"); + producer.send(plain); + + session.commit(); + + connection.close(); + System.out.println("Message sent"); +} else if (operation.equals("receiveMessages")) { + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + System.out.println("Receiving messages"); + + TextMessage message = (TextMessage) consumer.receive(5000); + GroovyRun.assertNotNull(message); + GroovyRun.assertEquals(textBody, message.getText()); + GroovyRun.assertEquals("test", message.getStringProperty("prop")); + GroovyRun.assertEquals("some-duplicate", message.getStringProperty(HDR_DUPLICATE_DETECTION_ID)); + + BytesMessage bm = (BytesMessage) consumer.receive(5000); + GroovyRun.assertNotNull(bm); + + GroovyRun.assertEquals(3L, bm.getBodyLength()); + + byte[] body = new byte[3]; + bm.readBytes(body); + + GroovyRun.assertEquals(BYTES_BODY, body); + + for (int m = 0; m < 10; m++) { + BytesMessage rm = (BytesMessage) consumer.receive(10000); + GroovyRun.assertNotNull(rm); + GroovyRun.assertEquals(m, rm.getIntProperty("count")); + + byte[] data = new byte[1024]; + + System.out.println("Message = " + rm); + + for (int i = 0; i < LARGE_MESSAGE_SIZE; i += 1024) { + int numberOfBytes = rm.readBytes(data); + GroovyRun.assertEquals(1024, numberOfBytes); + for (int j = 0; j < 1024; j++) { + GroovyRun.assertEquals(GroovyRun.getSamplebyte(i + j), data[j]); + } + } + } + + + ObjectMessage obj = consumer.receive(5000); + GroovyRun.assertNotNull(obj); + GroovyRun.assertEquals("rapadura", obj.getObject().toString()); + + MapMessage mapMessage = consumer.receive(5000); + GroovyRun.assertNotNull(mapMessage); + GroovyRun.assertEquals("rapadura", mapMessage.getString("prop")); + + StreamMessage streamMessage = consumer.receive(5000); + GroovyRun.assertNotNull(streamMessage); + GroovyRun.assertEquals("rapadura", streamMessage.readString()); + GroovyRun.assertEquals("doce", streamMessage.readString()); + GroovyRun.assertTrue(streamMessage.readInt() == 33); + + Message plain = consumer.receive(5000); + GroovyRun.assertNotNull(plain); + GroovyRun.assertEquals("doce", plain.getStringProperty("plain")); + + session.commit(); + connection.close(); + System.out.println("Message received"); +} else { + throw new RuntimeException("Invalid operation " + operation); +} + + +// Creates a Fake LargeStream without using a real file +InputStream createFakeLargeStream(final long size) throws Exception { + return new InputStream() { + private long count; + + private boolean closed = false; + + @Override + void close() throws IOException { + super.close(); + closed = true; + } + + @Override + int read() throws IOException { + if (closed) { + throw new IOException("Stream was closed"); + } + if (count++ < size) { + return GroovyRun.getSamplebyte(count - 1); + } + else { + return -1; + } + } + + }; + +} + + + diff --git a/tests/compatibility-tests/src/main/resources/sendAckTest/sendAckMessages.groovy b/tests/compatibility-tests/src/main/resources/sendAckTest/sendAckMessages.groovy new file mode 100644 index 00000000000..2b994bf04af --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/sendAckTest/sendAckMessages.groovy @@ -0,0 +1,97 @@ +package meshTest + +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +// starts an artemis server +String serverType = arg[0]; +String clientType = arg[1]; +String operation = arg[2]; + + +String queueName = "queue"; + + +String textBody = "a rapadura e doce mas nao e mole nao"; + +println("serverType " + serverType); + +if (clientType.startsWith("ARTEMIS")) { + // Can't depend directly on artemis, otherwise it wouldn't compile in hornetq + GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType); +} else { + // Can't depend directly on hornetq, otherwise it wouldn't compile in artemis + GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg"); +} + + +Connection connection = cf.createConnection(); +Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +Queue queue = session.createQueue(queueName); + +if (operation.equals("sendAckMessages")) { + + CountDownLatch latch = new CountDownLatch(10); + + CompletionListener completionListener = new CompletionListener() { + @Override + void onCompletion(Message message) { + latch.countDown(); + } + + @Override + void onException(Message message, Exception exception) { + + } + } + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage(textBody + i), completionListener); + } + + GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS)); + + System.out.println("Sending messages"); + connection.close(); + System.out.println("Message sent"); +} else if (operation.equals("receiveMessages")) { + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + System.out.println("Receiving messages"); + + for (int i = 0; i < 10; i++) { + TextMessage message = consumer.receive(1000); + GroovyRun.assertNotNull(message); + GroovyRun.assertEquals(textBody + i, message.getText()); + } + + GroovyRun.assertNull(consumer.receiveNoWait()); + connection.close(); + System.out.println("Message received"); +} else { + throw new RuntimeException("Invalid operation " + operation); +} + + + + diff --git a/tests/compatibility-tests/src/main/resources/serial/serial.groovy b/tests/compatibility-tests/src/main/resources/serial/serial.groovy new file mode 100644 index 00000000000..1fc9dbd8ef1 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/serial/serial.groovy @@ -0,0 +1,54 @@ +package clients +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +// Create a client connection factory + +import org.apache.activemq.artemis.tests.compatibility.GroovyRun; +import javax.jms.*; +import org.apache.activemq.artemis.jms.client.* + +file = arg[0] +method = arg[1] +System.out.println("File::" + file); + + +if (method.equals("write")) { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false"); + queue = new ActiveMQQueue("queue"); + topic = new ActiveMQTopic("topic") + + ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream(file)); + objectOutputStream.writeObject(cf); + objectOutputStream.writeObject(queue) + objectOutputStream.writeObject(topic) + objectOutputStream.close(); +} else { + ObjectInputStream inputStream = new ObjectInputStream(new FileInputStream(file)) + + cf = inputStream.readObject(); + queue = inputStream.readObject() + topic = inputStream.readObject() + inputStream.close(); +} + +GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend()); +GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize()); + +Connection connection = cf.createConnection(); +Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +MessageConsumer consumer = session.createConsumer(queue); +MessageProducer topicProducer = session.createProducer(topic) +connection.close(); + + diff --git a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy new file mode 100644 index 00000000000..3ec6d319a87 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy @@ -0,0 +1,61 @@ +package servers +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +// starts an artemis server + +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS +import org.apache.activemq.artemis.tests.compatibility.GroovyRun; + + +String folder = arg[0]; +String id = arg[1]; +String type = arg[2]; +String producer = arg[3]; +String consumer = arg[4]; + +println("type = " + type); + +configuration = new ConfigurationImpl(); +configuration.setJournalType(JournalType.NIO); +System.out.println("folder:: " + folder); +configuration.setBrokerInstance(new File(folder + "/" + id)); +configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616"); +configuration.setSecurityEnabled(false); +configuration.setPersistenceEnabled(false); +try { + if (!type.equals("ARTEMIS-140")) { + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateAddresses(true)); + } +} catch (Throwable e) { + // need to ignore this for 1.4 + e.printStackTrace(); +} + +jmsConfiguration = new JMSConfigurationImpl(); + +server = new EmbeddedJMS(); +server.setConfiguration(configuration); +server.setJmsConfiguration(jmsConfiguration); +server.start(); + +// uncomment this line to validate https://issues.apache.org/jira/browse/ARTEMIS-1561 +// this api exists on both 1.4 and 2.x... so, this one was preferred for this +if (producer.toString().startsWith("HORNETQ")) { + // hornetq servers won't auto-create + server.getJMSServerManager().createQueue(true, "queue", null, true); +} diff --git a/tests/compatibility-tests/src/main/resources/servers/hornetqServer.groovy b/tests/compatibility-tests/src/main/resources/servers/hornetqServer.groovy new file mode 100644 index 00000000000..be92768b1ae --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/servers/hornetqServer.groovy @@ -0,0 +1,54 @@ +package servers +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +// starts a hornetq server + +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory +import org.hornetq.core.remoting.impl.netty.TransportConstants +import org.hornetq.jms.server.config.impl.JMSConfigurationImpl +import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl +import org.hornetq.jms.server.embedded.EmbeddedJMS + +String folder = arg[0]; +String id = arg[1]; +String type = arg[2]; +String producer = arg[3]; +String consumer = arg[4]; + +configuration = new ConfigurationImpl(); +configuration.setSecurityEnabled(false); +configuration.setJournalDirectory(folder + "/" + id + "/journal"); +configuration.setBindingsDirectory(folder + "/" + id + "/binding"); +configuration.setPagingDirectory(folder + "/" + id + "/paging"); +configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); +configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); +configuration.setPersistenceEnabled(false); + +HashMap map = new HashMap(); +map.put(TransportConstants.HOST_PROP_NAME, "localhost"); +map.put(TransportConstants.PORT_PROP_NAME, "61616"); +TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); +configuration.getAcceptorConfigurations().add(tpc); + +jmsConfiguration = new JMSConfigurationImpl(); + +JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); +jmsConfiguration.getQueueConfigurations().add(queueConfiguration); +server = new EmbeddedJMS(); +server.setConfiguration(configuration); +server.setJmsConfiguration(jmsConfiguration); +server.start(); + diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MeshTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MeshTest.java new file mode 100644 index 00000000000..ed7851c20f9 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MeshTest.java @@ -0,0 +1,79 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_247; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FOUR; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR; + +/** + * To run this test on the IDE and debug it, run the compatibility-tests through a command line once: + * + * cd /compatibility-tests + * mvn install -Ptests | tee output.log + * + * on the output.log you will see the output generated by {@link #getClasspathProperty(String)} + * + * On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test. + * On Idea you would do the following: + * + * Run->Edit Configuration->Add ArtemisMeshTest and add your properties. + */ +@RunWith(Parameterized.class) +public class MeshTest extends VersionedBaseTest { + + // this will ensure that all tests in this class are run twice, + // once with "true" passed to the class' constructor and once with "false" + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + // we don't need every single version ever released.. + // if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time + List combinations = new ArrayList<>(); + + /* + // during development sometimes is useful to comment out the combinations + // and add the ones you are interested.. example: + */ + // combinations.add(new Object[]{SNAPSHOT, ONE_FOUR, ONE_FOUR}); + // combinations.add(new Object[]{ONE_FOUR, ONE_FOUR, ONE_FOUR}); + + combinations.addAll(combinatory(new Object[]{SNAPSHOT}, new Object[]{ONE_FOUR, TWO_FOUR, SNAPSHOT, HORNETQ_235}, new Object[]{ONE_FOUR, TWO_FOUR, SNAPSHOT, HORNETQ_235})); + combinations.addAll(combinatory(new Object[]{ONE_FOUR}, new Object[]{ONE_FOUR, SNAPSHOT}, new Object[]{ONE_FOUR, SNAPSHOT})); + combinations.addAll(combinatory(new Object[]{HORNETQ_235}, new Object[]{ONE_FOUR, SNAPSHOT, HORNETQ_235}, new Object[]{ONE_FOUR, SNAPSHOT, HORNETQ_235})); + combinations.addAll(combinatory(new Object[]{HORNETQ_247}, new Object[]{SNAPSHOT, HORNETQ_247}, new Object[]{SNAPSHOT, HORNETQ_247})); + return combinations; + } + + public MeshTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Test + public void testSendReceive() throws Throwable { + callMain(senderClassloader, GroovyRun.class.getName(), "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); + callMain(receiverClassloader, GroovyRun.class.getName(), "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); + } + +} + diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/SendAckTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/SendAckTest.java new file mode 100644 index 00000000000..2f217045bda --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/SendAckTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FOUR; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR; + +/** + * To run this test on the IDE and debug it, run the compatibility-tests through a command line once: + * + * cd /compatibility-tests + * mvn install -Ptests | tee output.log + * + * on the output.log you will see the output generated by {@link #getClasspathProperty(String)} + * + * On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test. + * On Idea you would do the following: + * + * Run->Edit Configuration->Add ArtemisMeshTest and add your properties. + */ +@RunWith(Parameterized.class) +public class SendAckTest extends VersionedBaseTest { + + // this will ensure that all tests in this class are run twice, + // once with "true" passed to the class' constructor and once with "false" + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + // we don't need every single version ever released.. + // if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time + List combinations = new ArrayList<>(); + + /* + // during development sometimes is useful to comment out the combinations + // and add the ones you are interested.. example: + */ + // combinations.add(new Object[]{SNAPSHOT, ONE_FOUR, ONE_FOUR}); + // combinations.add(new Object[]{ONE_FOUR, ONE_FOUR, ONE_FOUR}); + + combinations.addAll(combinatory(new Object[]{SNAPSHOT, ONE_FOUR}, new Object[]{ONE_FOUR, SNAPSHOT}, new Object[]{ONE_FOUR, SNAPSHOT})); + + // not every combination on two four would make sense.. as there's a compatibility issue between 2.4 and 1.4 when crossing consumers and producers + combinations.add(new Object[]{TWO_FOUR, SNAPSHOT, SNAPSHOT}); + combinations.add(new Object[]{SNAPSHOT, TWO_FOUR, TWO_FOUR}); + return combinations; + } + + public SendAckTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Test + public void testSendReceive() throws Throwable { + callMain(senderClassloader, GroovyRun.class.getName(), "sendAckTest/sendAckMessages.groovy", server, sender, "sendAckMessages"); + callMain(receiverClassloader, GroovyRun.class.getName(), "sendAckTest/sendAckMessages.groovy", server, receiver, "receiveMessages"); + } + +} + diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/SerializationTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/SerializationTest.java new file mode 100644 index 00000000000..c99e6968cdb --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/SerializationTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FOUR; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; + +/** + * To run this test on the IDE and debug it, run the compatibility-tests through a command line once: + * + * cd /compatibility-tests + * mvn install -Ptests | tee output.log + * + * on the output.log you will see the output generated by {@link #getClasspathProperty(String)} + * + * On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test. + * On Idea you would do the following: + * + * Run->Edit Configuration->Add ArtemisMeshTest and add your properties. + */ +@RunWith(Parameterized.class) +public class SerializationTest extends VersionedBaseTest { + + // this will ensure that all tests in this class are run twice, + // once with "true" passed to the class' constructor and once with "false" + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + // we don't need every single version ever released.. + // if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time + List combinations = new ArrayList<>(); + + /* + // during development sometimes is useful to comment out the combinations + // and add the ones you are interested.. example: + */ + // combinations.add(new Object[]{SNAPSHOT, ONE_FOUR, ONE_FOUR}); + // combinations.add(new Object[]{ONE_FOUR, ONE_FOUR, ONE_FOUR}); + + combinations.addAll(combinatory(new Object[]{SNAPSHOT}, new Object[]{ONE_FOUR, SNAPSHOT}, new Object[]{ONE_FOUR, SNAPSHOT})); + return combinations; + } + + public SerializationTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Test + public void testSerializeFactory() throws Throwable { + File file = serverFolder.newFile("objects.ser"); + file.mkdirs(); + callMain(senderClassloader, GroovyRun.class.getName(), "serial/serial.groovy", file.getAbsolutePath(), "write"); + callMain(receiverClassloader, GroovyRun.class.getName(), "serial/serial.groovy", file.getAbsolutePath(), "read"); + } + +} + diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java new file mode 100644 index 00000000000..64183d60cd9 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java @@ -0,0 +1,179 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import java.io.File; +import java.lang.reflect.Method; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; + +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.RunnableEx; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; + +public abstract class VersionedBaseTest { + + protected final String server; + protected final String sender; + protected final String receiver; + + protected ClassLoader serverClassloader; + protected ClassLoader senderClassloader; + protected ClassLoader receiverClassloader; + + public VersionedBaseTest(String server, String sender, String receiver) throws Exception { + this.server = server; + this.sender = sender; + this.receiver = receiver; + this.serverClassloader = getClasspathProperty(server); + this.senderClassloader = getClasspathProperty(sender); + this.receiverClassloader = getClasspathProperty(receiver); + } + + // This is a test optimization.. + // if false it will span a new VM for each classLoader used. + // this can be a bit faster + public static final boolean USE_CLASSLOADER = true; + + private static HashSet printed = new HashSet<>(); + + @ClassRule + public static TemporaryFolder serverFolder; + + static { + File parent = new File("./target/tmp"); + parent.mkdirs(); + serverFolder = new TemporaryFolder(parent); + } + + @Before + public void startServer() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + + serverFolder.getRoot().mkdirs(); + + System.out.println("Folder::" + serverFolder.getRoot()); + + String scriptToUse; + if (server.startsWith("ARTEMIS")) { + scriptToUse = "servers/artemisServer.groovy"; + } else { + scriptToUse = "servers/hornetqServer.groovy"; + } + + callMain(serverClassloader, GroovyRun.class.getName(), scriptToUse, serverFolder.getRoot().getAbsolutePath(), "1", server, sender, receiver); + } + + @After + public void stopServer() throws Throwable { + callExecute(serverClassloader, GroovyRun.class.getName(), "server.stop()"); + + // GC help!!! + serverClassloader = null; + senderClassloader = null; + receiverClassloader = null; + } + + protected static void callMain(ClassLoader loader, + String className, + String script, + String... arguments) throws Exception { + tclCall(loader, () -> { + Class clazz = loader.loadClass(className); + Method method = clazz.getMethod("doMain", String.class, String[].class); + method.invoke(null, script, arguments); + }); + } + + protected static void callExecute(ClassLoader loader, String className, String script) throws Exception { + tclCall(loader, () -> { + Class clazz = loader.loadClass(className); + Method method = clazz.getMethod("execute", String.class); + method.invoke(null, script); + }); + } + + protected static void tclCall(ClassLoader loader, RunnableEx run) throws Exception { + + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(loader); + try { + run.run(); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + protected static ClassLoader defineClassLoader(String classPath) throws MalformedURLException { + String[] classPathArray = classPath.split(File.pathSeparator); + URL[] elements = new URL[classPathArray.length]; + for (int i = 0; i < classPathArray.length; i++) { + elements[i] = new File(classPathArray[i]).toPath().toUri().toURL(); + } + + return new URLClassLoader(elements, null); + } + + protected static ClassLoader getClasspathProperty(String name) throws Exception { + + if (name.equals(SNAPSHOT)) { + return VersionedBaseTest.class.getClassLoader(); + } + String value = System.getProperty(name); + + if (!printed.contains(name)) { + boolean ok = value != null && !value.trim().isEmpty(); + if (!ok) { + System.out.println("Add \"-D" + name + "=\'CLASSPATH\'\" into your VM settings"); + } else { + printed.add(name); + System.out.println("****************************************************************************"); + System.out.println("* If you want to debug this test, add this parameter to your IDE run settings..."); + System.out.println("****************************************************************************"); + System.out.println("-D" + name + "=\"" + value + "\""); + System.out.println("****************************************************************************"); + + } + + Assume.assumeTrue("Cannot run these tests, no classpath found", ok); + } + + return defineClassLoader(value); + } + + protected static List combinatory(Object[] rootSide, Object[] sideLeft, Object[] sideRight) { + LinkedList combinations = new LinkedList<>(); + + for (Object root : rootSide) { + for (Object left : sideLeft) { + for (Object right : sideRight) { + combinations.add(new Object[]{root, left, right}); + } + } + } + + return combinations; + } + +} diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/package-info.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/package-info.java new file mode 100644 index 00000000000..0c3c124613f --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/** + * This package will contain tests that will validate Artemis within itself in different versions + * and within some specific hornetQ versions. + * + * The integration tests pom will use the artemis-maven-plugin/dependency-scan to lookup for specific verions. + * The tests will only work if the plugin was executed and the system property was filled, + * or if it was manually added within the IDE running for debug purposes. + */ + +package org.apache.activemq.artemis.tests.compatibility; \ No newline at end of file diff --git a/tests/pom.xml b/tests/pom.xml index 145bade5471..a281b96ba5f 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -126,6 +126,7 @@ timing-tests jms-tests integration-tests + compatibility-tests soak-tests stress-tests performance-tests diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java index e66da2c96d4..209fe1ef33b 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java @@ -90,10 +90,68 @@ public static Process spawnVM(final String wordMatch, final boolean logErrorOutput, final boolean useLogging, final String... args) throws Exception { - ProcessBuilder builder = new ProcessBuilder(); + return spawnVM(System.getProperty("java.class.path"), wordMatch, wordRunning, className, memoryArg1, memoryArg2, vmargs, logOutput, logErrorOutput, useLogging, args); + + } + + + public static Process spawnVM(String classPath, + String wordMatch, + Runnable wordRunning, + String className, + String memoryArg1, + String memoryArg2, + String[] vmargs, + boolean logOutput, + boolean logErrorOutput, + boolean useLogging, + String... args) throws IOException, ClassNotFoundException { + return spawnVM(classPath, wordMatch, wordRunning, className, memoryArg1,memoryArg2, vmargs, logOutput, logErrorOutput, useLogging, -1, args); + } + + /** + * + * @param classPath + * @param wordMatch + * @param wordRunning + * @param className + * @param memoryArg1 + * @param memoryArg2 + * @param vmargs + * @param logOutput + * @param logErrorOutput + * @param useLogging + * @param debugPort if <=0 it means no debug + * @param args + * @return + * @throws IOException + * @throws ClassNotFoundException + */ + public static Process spawnVM(String classPath, + String wordMatch, + Runnable wordRunning, + String className, + String memoryArg1, + String memoryArg2, + String[] vmargs, + boolean logOutput, + boolean logErrorOutput, + boolean useLogging, + long debugPort, + String... args) throws IOException, ClassNotFoundException { final String javaPath = Paths.get(System.getProperty("java.home"), "bin", "java").toAbsolutePath().toString(); + ProcessBuilder builder = new ProcessBuilder(); + if (memoryArg1 == null) { + memoryArg1 = "-Xms128m"; + } + if (memoryArg2 == null) { + memoryArg2 = "-Xmx128m"; + } builder.command(javaPath, memoryArg1, memoryArg2); - builder.environment().put("CLASSPATH", System.getProperty("java.class.path")); + if (debugPort > 0) { + builder.command().add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=" + debugPort); + } + builder.environment().put("CLASSPATH", classPath); List commandList = builder.command(); @@ -143,7 +201,6 @@ public static Process spawnVM(final String wordMatch, errorLogger.start(); return process; - } /** From dbe575a0c1a73849284f163e32544cbfe901c1f9 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 15 Dec 2017 16:36:46 -0500 Subject: [PATCH 2/2] ARTEMIS-1546 Fixing body compatibility issue by recast body towards 1.4 https://issues.apache.org/jira/browse/ARTEMIS-1546 Recasting the body as 1.x format when there's a 1.x in use at the other size of the wire --- .../artemis/api/core/ICoreMessage.java | 27 +++++++-- .../core/message/impl/CoreMessage.java | 58 +++++++++++-------- .../core/protocol/ClientPacketDecoder.java | 16 +++-- .../protocol/core/CoreRemotingConnection.java | 10 +++- .../artemis/core/protocol/core/Packet.java | 3 +- .../impl/ActiveMQClientProtocolManager.java | 1 + .../core/impl/ActiveMQSessionContext.java | 43 ++++++++++---- .../core/protocol/core/impl/ChannelImpl.java | 2 +- .../protocol/core/impl/PacketDecoder.java | 5 +- .../core/protocol/core/impl/PacketImpl.java | 23 +++++--- .../core/impl/RemotingConnectionImpl.java | 18 +++--- .../wireformat/SessionReceiveMessage.java | 11 +++- .../wireformat/SessionReceiveMessage_1X.java | 49 ++++++++++++++++ .../impl/wireformat/SessionSendMessage.java | 8 ++- .../wireformat/SessionSendMessage_1X.java | 52 +++++++++++++++++ .../jms/client/ActiveMQConnectionFactory.java | 2 + .../jms/client/ActiveMQDestination.java | 4 +- .../core/protocol/ServerPacketDecoder.java | 35 ++++++----- .../core/ServerSessionPacketHandler.java | 10 ++-- .../core/impl/ActiveMQPacketHandler.java | 10 ++-- .../core/impl/CoreProtocolManager.java | 4 +- .../core/impl/CoreSessionCallback.java | 8 ++- .../core/server/impl/ServerConsumerImpl.java | 2 +- .../resources/servers/artemisServer.groovy | 7 +-- 24 files changed, 303 insertions(+), 105 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_1X.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java index 45622e44490..179f8c58767 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java @@ -20,6 +20,7 @@ import java.io.InputStream; import java.util.Map; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.message.impl.CoreMessage; @@ -35,25 +36,33 @@ public interface ICoreMessage extends Message { @Override InputStream getBodyInputStream(); - /** Returns a new Buffer slicing the current Body. */ + /** + * Returns a new Buffer slicing the current Body. + */ ActiveMQBuffer getReadOnlyBodyBuffer(); - /** Return the type of the message */ + /** + * Return the type of the message + */ @Override byte getType(); - /** the type of the message */ + /** + * the type of the message + */ @Override CoreMessage setType(byte type); /** * We are really interested if this is a LargeServerMessage. + * * @return */ boolean isServerMessage(); /** * The body used for this message. + * * @return */ @Override @@ -61,10 +70,18 @@ public interface ICoreMessage extends Message { int getEndOfBodyPosition(); - - /** Used on large messages treatment */ + /** + * Used on large messages treatment + */ void copyHeadersAndProperties(Message msg); + void sendBuffer_1X(ByteBuf sendBuffer); + + /** + * it will fix the body of incoming messages from 1.x and before versions + */ + void receiveBuffer_1X(ByteBuf buffer); + /** * @return Returns the message in Map form, useful when encoding to JSON */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 25001424c81..b0656b6926f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -188,8 +188,16 @@ public CoreMessage setReplyTo(SimpleString address) { public void receiveBuffer(ByteBuf buffer) { this.buffer = buffer; this.buffer.retain(); - decode(); - this.validBuffer = true; + decode(false); + } + + /** This will fix the incoming body of 1.x messages */ + @Override + public void receiveBuffer_1X(ByteBuf buffer) { + this.buffer = buffer; + this.buffer.retain(); + decode(true); + validBuffer = false; } @Override @@ -205,7 +213,6 @@ public SimpleString getGroupID() { } /** - * * @param sendBuffer * @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core */ @@ -215,6 +222,21 @@ public void sendBuffer(ByteBuf sendBuffer, int deliveryCount) { sendBuffer.writeBytes(buffer, 0, buffer.writerIndex()); } + /** + * Recast the message as an 1.4 message + */ + @Override + public void sendBuffer_1X(ByteBuf sendBuffer) { + checkEncode(); + ByteBuf tmpBuffer = buffer.duplicate(); + sendBuffer.writeInt(endOfBodyPosition + DataConstants.SIZE_INT); + tmpBuffer.readerIndex(DataConstants.SIZE_INT); + tmpBuffer.readBytes(sendBuffer, endOfBodyPosition - BUFFER_HEADER_SPACE); + sendBuffer.writeInt(tmpBuffer.writerIndex() + DataConstants.SIZE_INT + BUFFER_HEADER_SPACE); + tmpBuffer.readBytes(sendBuffer, tmpBuffer.readableBytes()); + sendBuffer.readerIndex(0); + } + private synchronized void checkEncode() { if (!validBuffer) { encode(); @@ -280,12 +302,10 @@ public int getEndOfBodyPosition() { return endOfBodyPosition; } - public TypedProperties getTypedProperties() { return checkProperties(); } - @Override public void messageChanged() { validBuffer = false; @@ -323,7 +343,7 @@ protected CoreMessage(CoreMessage other, TypedProperties copyProperties) { public void copyHeadersAndProperties(final Message msg) { messageID = msg.getMessageID(); address = msg.getAddressSimpleString(); - userID = (UUID)msg.getUserID(); + userID = (UUID) msg.getUserID(); type = msg.toCore().getType(); durable = msg.isDurable(); expiration = msg.getExpiration(); @@ -331,11 +351,10 @@ public void copyHeadersAndProperties(final Message msg) { priority = msg.getPriority(); if (msg instanceof CoreMessage) { - properties = ((CoreMessage)msg).getTypedProperties(); + properties = ((CoreMessage) msg).getTypedProperties(); } } - @Override public Message copy() { checkEncode(); @@ -380,7 +399,7 @@ public UUID getUserID() { @Override public CoreMessage setUserID(Object uuid) { - this.userID = (UUID)uuid; + this.userID = (UUID) uuid; return this; } @@ -418,7 +437,6 @@ public SimpleString getAddressSimpleString() { return address; } - @Override public CoreMessage setExpiration(long expiration) { this.expiration = expiration; @@ -487,18 +505,22 @@ public CoreMessage setType(byte type) { return this; } - private void decode() { + private void decode(boolean beforeAddress) { endOfBodyPosition = buffer.readInt(); buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE); decodeHeadersAndProperties(buffer, true); buffer.readerIndex(0); + validBuffer = true; + + if (beforeAddress) { + endOfBodyPosition = endOfBodyPosition - DataConstants.SIZE_INT; + } internalWritableBuffer(); } - public void decodeHeadersAndProperties(final ByteBuf buffer) { decodeHeadersAndProperties(buffer, false); } @@ -529,7 +551,6 @@ private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProper } } - public synchronized CoreMessage encode() { checkProperties(); @@ -654,7 +675,6 @@ public CoreMessage setDurable(boolean durable) { return this; } - @Override public CoreMessage putBooleanProperty(final String key, final boolean value) { messageChanged(); @@ -683,7 +703,6 @@ public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConve return properties.getBooleanProperty(new SimpleString(key)); } - @Override public CoreMessage putByteProperty(final SimpleString key, final byte value) { messageChanged(); @@ -692,7 +711,6 @@ public CoreMessage putByteProperty(final SimpleString key, final byte value) { return this; } - @Override public CoreMessage putByteProperty(final String key, final byte value) { messageChanged(); @@ -702,7 +720,6 @@ public CoreMessage putByteProperty(final String key, final byte value) { return this; } - @Override public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException { checkProperties(); @@ -731,7 +748,6 @@ public CoreMessage putBytesProperty(final String key, final byte[] value) { return this; } - @Override public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException { checkProperties(); @@ -775,7 +791,6 @@ public CoreMessage putShortProperty(final String key, final short value) { return this; } - @Override public CoreMessage putIntProperty(final SimpleString key, final int value) { messageChanged(); @@ -803,7 +818,6 @@ public Integer getIntProperty(final String key) throws ActiveMQPropertyConversio return getIntProperty(SimpleString.toSimpleString(key)); } - @Override public CoreMessage putLongProperty(final SimpleString key, final long value) { messageChanged(); @@ -832,7 +846,6 @@ public Long getLongProperty(final String key) throws ActiveMQPropertyConversionE return getLongProperty(SimpleString.toSimpleString(key)); } - @Override public CoreMessage putFloatProperty(final SimpleString key, final float value) { messageChanged(); @@ -865,7 +878,6 @@ public CoreMessage putDoubleProperty(final String key, final double value) { return this; } - @Override public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException { messageChanged(); @@ -1071,7 +1083,7 @@ public void reloadPersistence(ActiveMQBuffer record) { int size = record.readInt(); initBuffer(size); buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size); - decode(); + decode(false); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java index 206796d6524..10220306bff 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java @@ -19,10 +19,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveClientLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage_1X; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG; @@ -33,10 +35,10 @@ public class ClientPacketDecoder extends PacketDecoder { public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder(); @Override - public Packet decode(final ActiveMQBuffer in) { + public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) { final byte packetType = in.readByte(); - Packet packet = decode(packetType); + Packet packet = decode(packetType, connection); packet.decode(in); @@ -44,12 +46,16 @@ public Packet decode(final ActiveMQBuffer in) { } @Override - public Packet decode(byte packetType) { + public Packet decode(byte packetType, CoreRemotingConnection connection) { Packet packet; switch (packetType) { case SESS_RECEIVE_MSG: { - packet = new SessionReceiveMessage(new ClientMessageImpl()); + if (connection.isVersionBeforeAddressChange()) { + packet = new SessionReceiveMessage_1X(new ClientMessageImpl()); + } else { + packet = new SessionReceiveMessage(new ClientMessageImpl()); + } break; } case SESS_RECEIVE_LARGE_MSG: { @@ -57,7 +63,7 @@ public Packet decode(byte packetType) { break; } default: { - packet = super.decode(packetType); + packet = super.decode(packetType, connection); } } return packet; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index 1756153ca17..b6a5d93af55 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.core; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -28,13 +29,18 @@ public interface CoreRemotingConnection extends RemotingConnection { * The client protocol used on the communication. * This will determine if the client has support for certain packet types */ - int getClientVersion(); + int getChannelVersion(); + + default boolean isVersionBeforeAddressChange() { + int version = getChannelVersion(); + return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION); + } /** * Sets the client protocol used on the communication. This will determine if the client has * support for certain packet types */ - void setClientVersion(int clientVersion); + void setChannelVersion(int clientVersion); /** * Returns the channel with the channel id specified. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index efb9aa6fe32..1f40314a6db 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.protocol.core; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; /** * A Packet represents a packet of data transmitted over a connection. @@ -71,7 +70,7 @@ default int expectedEncodeSize() { * @param connection the connection * @return the buffer to encode to */ - ActiveMQBuffer encode(RemotingConnection connection); + ActiveMQBuffer encode(CoreRemotingConnection connection); /** * decodes the buffer into this packet diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index 1ebc1d0ee36..93432b87f79 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -326,6 +326,7 @@ public SessionContext createSessionContext(Version clientVersion, } } while (retry); + sessionChannel.getConnection().setChannelVersion(response.getServerVersion()); return newSessionContext(name, confirmationWindowSize, sessionChannel, response); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index d0d75ac346f..a9c34f7ee42 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -83,7 +83,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; @@ -91,6 +91,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage; @@ -266,8 +267,14 @@ public void deleteQueue(final SimpleString queueName) throws ActiveMQException { @Override public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { - SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); - SessionQueueQueryResponseMessage_V3 response = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3); + SessionQueueQueryResponseMessage response; + if (sessionChannel.getConnection().isVersionBeforeAddressChange()) { + SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); + response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2); + } else { + SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); + response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3); + } return response.toQueueQuery(); } @@ -292,7 +299,13 @@ public ClientConsumerInternal createConsumer(SimpleString queueName, SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true); - SessionQueueQueryResponseMessage_V3 queueInfo = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3); + SessionQueueQueryResponseMessage queueInfo; + + if (sessionChannel.getConnection().isVersionBeforeAddressChange()) { + queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2); + } else { + queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3); + } // The actual windows size that gets used is determined by the user since // could be overridden on the queue settings @@ -441,7 +454,12 @@ public void sendFullMessage(ICoreMessage msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException { - SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler); + SessionSendMessage packet; + if (sessionChannel.getConnection().isVersionBeforeAddressChange()) { + packet = new SessionSendMessage_1X(msgI, sendBlocking, handler); + } else { + packet = new SessionSendMessage(msgI, sendBlocking, handler); + } if (sendBlocking) { sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE); @@ -596,7 +614,9 @@ public void createAddress(SimpleString address, Set routingTypes, final boolean autoCreated) throws ActiveMQException { CreateAddressMessage request = new CreateAddressMessage(address, routingTypes, autoCreated, true); - sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); + if (!sessionChannel.getConnection().isVersionBeforeAddressChange()) { + sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); + } } @Deprecated @@ -621,7 +641,9 @@ public void createQueue(SimpleString address, boolean purgeOnNoConsumers, boolean autoCreated) throws ActiveMQException { CreateQueueMessage request = new CreateQueueMessage_V2(address, queueName, routingType, filterString, durable, temp, maxConsumers, purgeOnNoConsumers, autoCreated, true); - sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); + if (!sessionChannel.getConnection().isVersionBeforeAddressChange()) { + sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); + } } @Override @@ -695,11 +717,13 @@ protected CreateSessionMessage newCreateSession(String username, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge) { - return new CreateSessionMessage(name, sessionChannel.getID(), serverVersion, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null); + return new CreateSessionMessage(name, sessionChannel.getID(), getServerVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null); } @Override - public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException { + public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, + long consumerId, + boolean isSessionStarted) throws ActiveMQException { ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo(); // We try and recreate any non durable queues, since they probably won't be there unless @@ -851,7 +875,6 @@ private static int sendSessionSendContinuationMessage(Channel channel, } } - class ClientSessionPacketHandler implements ChannelHandler { @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index b8049d27d23..4d73cf82ce1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -152,7 +152,7 @@ public int getReconnectID() { @Override public boolean supports(final byte packetType) { - return supports(packetType, connection.getClientVersion()); + return supports(packetType, connection.getChannelVersion()); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 2755b9adacd..5e468486157 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailoverMessage; @@ -160,9 +161,9 @@ public abstract class PacketDecoder implements Serializable { - public abstract Packet decode(ActiveMQBuffer in); + public abstract Packet decode(ActiveMQBuffer in, CoreRemotingConnection connection); - public Packet decode(byte packetType) { + public Packet decode(byte packetType, CoreRemotingConnection connection) { Packet packet; switch (packetType) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 186a703368f..925d0896652 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -21,8 +21,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; public class PacketImpl implements Packet { @@ -305,28 +305,35 @@ public void setChannelID(final long channelID) { } @Override - public ActiveMQBuffer encode(final RemotingConnection connection) { + public ActiveMQBuffer encode(final CoreRemotingConnection connection) { ActiveMQBuffer buffer = createPacket(connection); - // The standard header fields + encodeHeader(buffer); + + encodeRest(buffer); + + encodeSize(buffer); + + return buffer; + } + protected void encodeHeader(ActiveMQBuffer buffer) { + // The standard header fields buffer.writeInt(0); // The length gets filled in at the end buffer.writeByte(type); buffer.writeLong(channelID); + } - encodeRest(buffer); - + protected void encodeSize(ActiveMQBuffer buffer) { size = buffer.writerIndex(); // The length doesn't include the actual length byte int len = size - DataConstants.SIZE_INT; buffer.setInt(0, len); - - return buffer; } - protected ActiveMQBuffer createPacket(RemotingConnection connection) { + protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) { int size = expectedEncodeSize(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index 17f96fb3dc8..ac73b57f25a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -64,7 +64,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement private final boolean client; - private int clientVersion; + private int channelVersion; private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(CHANNEL_ID.USER.id); @@ -146,19 +146,19 @@ public String toString() { } /** - * @return the clientVersion + * @return the channelVersion */ @Override - public int getClientVersion() { - return clientVersion; + public int getChannelVersion() { + return channelVersion; } /** - * @param clientVersion the clientVersion to set + * @param clientVersion the channelVersion to set */ @Override - public void setClientVersion(int clientVersion) { - this.clientVersion = clientVersion; + public void setChannelVersion(int clientVersion) { + this.channelVersion = clientVersion; } @Override @@ -362,7 +362,7 @@ public String getProtocolName() { @Override public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) { try { - final Packet packet = packetDecoder.decode(buffer); + final Packet packet = packetDecoder.decode(buffer, this); if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + getID() + " handling packet " + packet); @@ -417,7 +417,7 @@ private void internalClose() { @Override public void killMessage(SimpleString nodeID) { - if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) { + if (channelVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) { return; } Channel clientChannel = getChannel(1, -1); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java index 4fbd48f30d4..0a8d8709fa8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage; @@ -26,9 +27,9 @@ public class SessionReceiveMessage extends MessagePacket { // Attributes ---------------------------------------------------- - private long consumerID; + protected long consumerID; - private int deliveryCount; + protected int deliveryCount; public SessionReceiveMessage(final long consumerID, final ICoreMessage message, final int deliveryCount) { super(SESS_RECEIVE_MSG, message); @@ -69,13 +70,17 @@ public void encodeRest(ActiveMQBuffer buffer) { public void decodeRest(final ActiveMQBuffer buffer) { // Buffer comes in after having read standard headers and positioned at Beginning of body part - message.receiveBuffer(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT)); + receiveMessage(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT)); buffer.readerIndex(buffer.capacity() - DataConstants.SIZE_LONG - DataConstants.SIZE_INT); this.consumerID = buffer.readLong(); this.deliveryCount = buffer.readInt(); + } + protected void receiveMessage(ByteBuf buffer) { + message.receiveBuffer(buffer); } + @Override public int hashCode() { final int prime = 31; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java new file mode 100644 index 00000000000..2644af9d540 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java @@ -0,0 +1,49 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.utils.DataConstants; + +public class SessionReceiveMessage_1X extends SessionReceiveMessage { + + public SessionReceiveMessage_1X(long consumerID, ICoreMessage message, int deliveryCount) { + super(consumerID, message, deliveryCount); + } + + public SessionReceiveMessage_1X(CoreMessage message) { + super(message); + } + + @Override + public void encodeRest(ActiveMQBuffer buffer) { + message.sendBuffer_1X(buffer.byteBuf()); + buffer.writeLong(consumerID); + buffer.writeInt(deliveryCount); + } + + @Override + protected void receiveMessage(ByteBuf buffer) { + message.receiveBuffer_1X(buffer); + } + + @Override + public int expectedEncodeSize() { + return super.expectedEncodeSize() + DataConstants.SIZE_INT; + } + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java index 9f76c2d2d14..b56ae305122 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -24,7 +24,7 @@ public class SessionSendMessage extends MessagePacket { - private boolean requiresResponse; + protected boolean requiresResponse; /** * In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession} @@ -76,7 +76,7 @@ public void decodeRest(final ActiveMQBuffer buffer) { // Buffer comes in after having read standard headers and positioned at Beginning of body part ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1); - message.receiveBuffer(messageBuffer); + receiveMessage(messageBuffer); buffer.readerIndex(buffer.capacity() - 1); @@ -84,6 +84,10 @@ public void decodeRest(final ActiveMQBuffer buffer) { } + protected void receiveMessage(ByteBuf messageBuffer) { + message.receiveBuffer(messageBuffer); + } + @Override public int hashCode() { final int prime = 31; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_1X.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_1X.java new file mode 100644 index 00000000000..fc91b418ff6 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_1X.java @@ -0,0 +1,52 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.utils.DataConstants; + +/** + * SessionSend Message for the 1.x branch + */ +public class SessionSendMessage_1X extends SessionSendMessage { + + public SessionSendMessage_1X(ICoreMessage message, boolean requiresResponse, SendAcknowledgementHandler handler) { + super(message, requiresResponse, handler); + } + + public SessionSendMessage_1X(CoreMessage message) { + super(message); + } + + @Override + public void encodeRest(ActiveMQBuffer buffer) { + message.sendBuffer_1X(buffer.byteBuf()); + buffer.writeBoolean(requiresResponse); + } + + @Override + protected void receiveMessage(ByteBuf messageBuffer) { + message.receiveBuffer_1X(messageBuffer); + } + + @Override + public int expectedEncodeSize() { + return super.expectedEncodeSize() + DataConstants.SIZE_INT; + } + +} diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java index bd5fccf375f..ba5359a7973 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java @@ -64,6 +64,8 @@ */ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Externalizable, Referenceable, ConnectionFactory, XAConnectionFactory, AutoCloseable { + private static final long serialVersionUID = -7554006056207377105L; + private ServerLocator serverLocator; private String clientID; diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java index 0bf4dd6258e..c0ab4b9e95d 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java @@ -384,7 +384,7 @@ public static TYPE getType(byte type) { public static boolean isQueue(TYPE type) { boolean result = false; - if (type.equals(QUEUE) || type.equals(TEMP_QUEUE)) { + if (type != null && (type.equals(QUEUE) || type.equals(TEMP_QUEUE))) { result = true; } @@ -394,7 +394,7 @@ public static boolean isQueue(TYPE type) { public static boolean isTemporary(TYPE type) { boolean result = false; - if (type.equals(TEMP_TOPIC) || type.equals(TEMP_QUEUE)) { + if (type != null && (type.equals(TEMP_TOPIC) || type.equals(TEMP_QUEUE))) { result = true; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index bcbe633a354..05844769b02 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -18,6 +18,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage; @@ -51,6 +52,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; @@ -84,51 +86,58 @@ public class ServerPacketDecoder extends ClientPacketDecoder { private static final long serialVersionUID = 3348673114388400766L; public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder(); - private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in) { - final SessionSendMessage sendMessage = new SessionSendMessage(new CoreMessage()); + private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) { + final SessionSendMessage sendMessage; + + if (connection.isVersionBeforeAddressChange()) { + sendMessage = new SessionSendMessage_1X(new CoreMessage()); + } else { + sendMessage = new SessionSendMessage(new CoreMessage()); + } + sendMessage.decode(in); return sendMessage; } - private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in) { + private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) { final SessionAcknowledgeMessage acknowledgeMessage = new SessionAcknowledgeMessage(); acknowledgeMessage.decode(in); return acknowledgeMessage; } - private static SessionRequestProducerCreditsMessage decodeRequestProducerCreditsMessage(final ActiveMQBuffer in) { + private static SessionRequestProducerCreditsMessage decodeRequestProducerCreditsMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) { final SessionRequestProducerCreditsMessage requestProducerCreditsMessage = new SessionRequestProducerCreditsMessage(); requestProducerCreditsMessage.decode(in); return requestProducerCreditsMessage; } - private static SessionConsumerFlowCreditMessage decodeSessionConsumerFlowCreditMessage(final ActiveMQBuffer in) { + private static SessionConsumerFlowCreditMessage decodeSessionConsumerFlowCreditMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) { final SessionConsumerFlowCreditMessage sessionConsumerFlowCreditMessage = new SessionConsumerFlowCreditMessage(); sessionConsumerFlowCreditMessage.decode(in); return sessionConsumerFlowCreditMessage; } @Override - public Packet decode(final ActiveMQBuffer in) { + public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) { final byte packetType = in.readByte(); //optimized for the most common cases: hottest and commons methods will be inlined and this::decode too due to the byte code size switch (packetType) { case SESS_SEND: - return decodeSessionSendMessage(in); + return decodeSessionSendMessage(in, connection); case SESS_ACKNOWLEDGE: - return decodeSessionAcknowledgeMessage(in); + return decodeSessionAcknowledgeMessage(in, connection); case SESS_PRODUCER_REQUEST_CREDITS: - return decodeRequestProducerCreditsMessage(in); + return decodeRequestProducerCreditsMessage(in, connection); case SESS_FLOWTOKEN: - return decodeSessionConsumerFlowCreditMessage(in); + return decodeSessionConsumerFlowCreditMessage(in, connection); default: - return slowPathDecode(in, packetType); + return slowPathDecode(in, packetType, connection); } } // separating for performance reasons - private Packet slowPathDecode(ActiveMQBuffer in, byte packetType) { + private Packet slowPathDecode(ActiveMQBuffer in, byte packetType, CoreRemotingConnection connection) { Packet packet; switch (packetType) { @@ -242,7 +251,7 @@ private Packet slowPathDecode(ActiveMQBuffer in, byte packetType) { break; } default: { - packet = super.decode(packetType); + packet = super.decode(packetType, connection); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index e1e1b684cdc..e93dd94c244 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -314,11 +314,11 @@ private void slowPacketHandler(final Packet packet) { case SESS_CREATECONSUMER: { SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet; requiresResponse = request.isRequiresResponse(); - session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getClientVersion()), request.getFilterString(), request.isBrowseOnly()); + session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getChannelVersion()), request.getFilterString(), request.isBrowseOnly()); if (requiresResponse) { // We send back queue information on the queue as a response- this allows the queue to // be automatically recreated on failover - QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion())); + QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getChannelVersion())); if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) { response = new SessionQueueQueryResponseMessage_V3(queueQueryResult); @@ -387,9 +387,9 @@ private void slowPacketHandler(final Packet packet) { case SESS_QUEUEQUERY: { requiresResponse = true; SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet; - QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion())); + QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getChannelVersion())); - if (remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + if (remotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType())); } @@ -405,7 +405,7 @@ private void slowPacketHandler(final Packet packet) { case SESS_BINDINGQUERY: { requiresResponse = true; SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; - final int clientVersion = remotingConnection.getClientVersion(); + final int clientVersion = remotingConnection.getChannelVersion(); BindingQueryResult result = session.executeBindingQuery(request.getAddress(clientVersion)); /* if the session is JMS and it's from an older client then we need to add the old prefix to the queue diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index 1b5a2a6b1f7..d38483a7e56 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -145,10 +145,10 @@ private void handleCreateSession(final CreateSessionMessage request) { "Server will not accept create session requests"); }*/ - if (connection.getClientVersion() == 0) { - connection.setClientVersion(request.getVersion()); - } else if (connection.getClientVersion() != request.getVersion()) { - ActiveMQServerLogger.LOGGER.incompatibleVersionAfterConnect(request.getVersion(), connection.getClientVersion()); + if (connection.getChannelVersion() == 0) { + connection.setChannelVersion(request.getVersion()); + } else if (connection.getChannelVersion() != request.getVersion()) { + ActiveMQServerLogger.LOGGER.incompatibleVersionAfterConnect(request.getVersion(), connection.getChannelVersion()); } Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize()); @@ -163,7 +163,7 @@ private void handleCreateSession(final CreateSessionMessage request) { Map routingTypeMap = protocolManager.getPrefixes(); - if (connection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + if (connection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { routingTypeMap = new HashMap<>(); routingTypeMap.put(PacketImpl.OLD_QUEUE_PREFIX, RoutingType.ANYCAST); routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index 2cfd451ef66..c9262fa36e4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -253,14 +253,14 @@ public void handlePacket(final Packet packet) { SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage) packet; if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2) { - channel0.getConnection().setClientVersion(((SubscribeClusterTopologyUpdatesMessageV2) msg).getClientVersion()); + channel0.getConnection().setChannelVersion(((SubscribeClusterTopologyUpdatesMessageV2) msg).getClientVersion()); } final ClusterTopologyListener listener = new ClusterTopologyListener() { @Override public void nodeUP(final TopologyMember topologyMember, final boolean last) { try { - final Pair connectorPair = BackwardsCompatibilityUtils.getTCPair(channel0.getConnection().getClientVersion(), topologyMember); + final Pair connectorPair = BackwardsCompatibilityUtils.getTCPair(channel0.getConnection().getChannelVersion(), topologyMember); final String nodeID = topologyMember.getNodeId(); // Using an executor as most of the notifications on the Topology diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 92b3768eee7..8b281ebce3c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage_1X; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -112,7 +113,12 @@ public int sendLargeMessageContinuation(ServerConsumer consumer, @Override public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { - Packet packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount); + Packet packet; + if (channel.getConnection().isVersionBeforeAddressChange()) { + packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(), deliveryCount); + } else { + packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount); + } int size = 0; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 36aa4e2d839..5dc1b93ad6d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -237,7 +237,7 @@ public ServerConsumerImpl(final long id, if (session.getRemotingConnection() instanceof CoreRemotingConnection) { CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection(); - if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { requiresLegacyPrefix = true; if (getQueue().getRoutingType().equals(RoutingType.ANYCAST)) { anycast = true; diff --git a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy index 3ec6d319a87..13579866cd4 100644 --- a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy +++ b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy @@ -53,9 +53,8 @@ server.setConfiguration(configuration); server.setJmsConfiguration(jmsConfiguration); server.start(); -// uncomment this line to validate https://issues.apache.org/jira/browse/ARTEMIS-1561 -// this api exists on both 1.4 and 2.x... so, this one was preferred for this -if (producer.toString().startsWith("HORNETQ")) { - // hornetq servers won't auto-create +// uncomment this next statements to validate https://issues.apache.org/jira/browse/ARTEMIS-1561 +if (producer.toString().equals("ARTEMIS-140") && type.equals("ARTEMIS-SNAPSHOT") || + producer.toString().startsWith("HORNETQ")) { server.getJMSServerManager().createQueue(true, "queue", null, true); }