Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@

public class ServerJMSMessage implements Message {

public static final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID";

protected final MessageInternal message;
private final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID";

protected int deliveryCount;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.proton.converter.message;

import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
Expand Down Expand Up @@ -223,7 +224,7 @@ public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncod
final Enumeration<String> keys = msg.getPropertyNames();
while (keys.hasMoreElements()) {
String key = keys.nextElement();
if (key.equals(messageFormatKey) || key.equals(nativeKey)) {
if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) {
// skip..
}
else if (key.equals(firstAcquirerKey)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.proton.plug;

import org.apache.qpid.proton.amqp.Symbol;

import io.netty.buffer.ByteBuf;

public interface AMQPConnectionContext {
Expand All @@ -30,6 +32,14 @@ public interface AMQPConnectionContext {

SASLResult getSASLResult();

/**
* Load and return a <code>[]Symbol</code> that contains the connection capabilities
* offered to new connections
*
* @return the capabilities that are offered to new remote peers on connect.
*/
Symbol[] getConnectionCapabilitiesOffered();

/**
* Even though we are currently always sending packets asynchronsouly
* we have a possibility to start trusting on the network flow control
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class AmqpSupport {

// Symbols used to announce connection information to remote peer.
public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.proton.plug.context;

import static org.proton.plug.AmqpSupport.PRODUCT;
import static org.proton.plug.AmqpSupport.VERSION;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -76,11 +79,13 @@ public AbstractConnectionContext(AMQPConnectionCallback connectionCallback,
ScheduledExecutorService scheduledPool) {
this.connectionCallback = connectionCallback;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
connectionProperties.put(Symbol.valueOf("product"), "apache-activemq-artemis");
connectionProperties.put(Symbol.valueOf("version"), VersionLoader.getVersion().getFullVersion());

connectionProperties.put(PRODUCT, "apache-activemq-artemis");
connectionProperties.put(VERSION, VersionLoader.getVersion().getFullVersion());

this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this);
this.handler = ProtonHandler.Factory.create(dispatchExecutor);
this.handler = ProtonHandler.Factory.create(dispatchExecutor);
Transport transport = handler.getTransport();
transport.setEmitFlowEventOnSend(false);
if (idleTimeout > 0) {
Expand Down Expand Up @@ -211,6 +216,7 @@ public void onRemoteOpen(Connection connection) throws Exception {
connection.setContext(AbstractConnectionContext.this);
connection.setContainer(containerId);
connection.setProperties(connectionProperties);
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
connection.open();
}
initialise();
Expand Down Expand Up @@ -326,9 +332,10 @@ public void onDelivery(Delivery delivery) throws Exception {
System.err.println("Handler is null, can't delivery " + delivery);
}
}

}



@Override
public Symbol[] getConnectionCapabilitiesOffered() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.proton.plug.context.server;

import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;

import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
Expand Down Expand Up @@ -79,4 +82,7 @@ protected void remoteLinkOpened(Link link) throws Exception {
}
}

public Symbol[] getConnectionCapabilitiesOffered() {
return new Symbol[]{DELAYED_DELIVERY};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
*/
package org.apache.activemq.artemis.tests.integration.proton;

import static org.proton.plug.AmqpSupport.contains;
import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
import static org.proton.plug.AmqpSupport.PRODUCT;
import static org.proton.plug.AmqpSupport.VERSION;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
Expand Down Expand Up @@ -63,6 +68,7 @@
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
Expand Down Expand Up @@ -210,6 +216,114 @@ public void testBrokerConnectionProperties() throws Exception {
}
}

@Test(timeout = 60000)
public void testConnectionCarriesExpectedCapabilities() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol

AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);

client.setValidator(new AmqpValidator() {

@Override
public void inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) {

Symbol[] offered = connection.getRemoteOfferedCapabilities();

if (!contains(offered, DELAYED_DELIVERY)) {
markAsInvalid("Broker did not indicate it support delayed message delivery");
return;
}

Map<Symbol, Object> properties = connection.getRemoteProperties();
if (!properties.containsKey(PRODUCT)) {
markAsInvalid("Broker did not send a queue product name value");
return;
}

if (!properties.containsKey(VERSION)) {
markAsInvalid("Broker did not send a queue version value");
return;
}
}
});

AmqpConnection connection = client.connect();
try {
assertNotNull(connection);
connection.getStateInspector().assertValid();
}
finally {
connection.close();
}
}

@Test(timeout = 60000)
public void testSendWithDeliveryTimeHoldsMessage() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol

AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);

AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();

AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(address);

AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);

// Now try and get the message
receiver.flow(1);

// Shouldn't get this since we delayed the message.
assertNull(receiver.receive(5, TimeUnit.SECONDS));
}
finally {
connection.close();
}
}

@Test(timeout = 60000)
public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol

AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);

AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();

AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(address);

AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 2000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);

// Now try and get the message
receiver.flow(1);

AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
assertNotNull(msgDeliveryTime);
assertEquals(deliveryTime, msgDeliveryTime.longValue());
}
finally {
connection.close();
}
}

@Test
public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
Expand Down