Skip to content

Commit

Permalink
ARTEMIS-1511 Refactor AMQP Transport for use with other test clients
Browse files Browse the repository at this point in the history
  • Loading branch information
mtaylor authored and clebertsuconic committed Nov 13, 2017
1 parent 63b156e commit 5211afd
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 38 deletions.
Expand Up @@ -21,8 +21,8 @@
import java.util.List;
import java.util.Map;

import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
import org.apache.activemq.transport.netty.NettyTransport;
import org.apache.activemq.transport.netty.NettyTransportFactory;
import org.apache.qpid.proton.amqp.Symbol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Expand Up @@ -33,8 +33,9 @@
import java.util.concurrent.atomic.AtomicLong;

import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.netty.NettyTransport;
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
import org.apache.activemq.transport.netty.NettyTransportListener;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IdGenerator;
Expand Down Expand Up @@ -80,7 +81,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private final AtomicLong sessionIdGenerator = new AtomicLong();
private final AtomicLong txIdGenerator = new AtomicLong();
private final Collector protonCollector = new CollectorImpl();
private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport;
private final NettyTransport transport;
private final Transport protonTransport = Transport.Factory.create();

private final String username;
Expand Down Expand Up @@ -109,7 +110,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private boolean trace;
private boolean noContainerID = false;

public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) {
public AmqpConnection(NettyTransport transport, String username, String password) {
setEndpoint(Connection.Factory.create());
getEndpoint().collect(protonCollector);

Expand Down
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
package org.apache.activemq.transport.netty;

import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -223,16 +223,16 @@ public ByteBuf allocateSendBuffer(int size) throws IOException {
}

@Override
public void send(ByteBuf output) throws IOException {
public ChannelFuture send(ByteBuf output) throws IOException {
checkConnected();
int length = output.readableBytes();
if (length == 0) {
return;
return null;
}

LOG.trace("Attempted write of: {} bytes", length);

channel.writeAndFlush(output);
return channel.writeAndFlush(output);
}

@Override
Expand Down
Expand Up @@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
package org.apache.activemq.transport.netty;

import java.io.IOException;
import java.net.URI;
import java.security.Principal;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;

/**
* Base for all Netty based Transports in this client.
Expand All @@ -37,7 +38,7 @@ public interface NettyTransport {

ByteBuf allocateSendBuffer(int size) throws IOException;

void send(ByteBuf output) throws IOException;
ChannelFuture send(ByteBuf output) throws IOException;

NettyTransportListener getTransportListener();

Expand Down
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
package org.apache.activemq.transport.netty;

import java.net.URI;
import java.util.Map;
Expand Down Expand Up @@ -65,19 +65,18 @@ public static NettyTransport createTransport(URI remoteURI) throws Exception {

NettyTransport result = null;

switch (remoteURI.getScheme().toLowerCase()) {
case "tcp":
case "ssl":
result = new NettyTcpTransport(remoteURI, transportOptions);
break;
case "ws":
case "wss":
result = new NettyWSTransport(remoteURI, transportOptions);
break;
default:
throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme());
String scheme = remoteURI.getScheme().toLowerCase();
if (scheme.startsWith("tcp") || scheme.startsWith("ssl")) {
result = new NettyTcpTransport(remoteURI, transportOptions);
} else if (scheme.startsWith("ws") || scheme.startsWith("wss")) {
// Check for ws subprotocol
if (scheme.contains("+")) {
transportOptions.setWsSubProtocol(scheme.substring(scheme.indexOf("+") + 1));
}
result = new NettyWSTransport(remoteURI, transportOptions);
} else {
throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme());
}

return result;
}
}
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
package org.apache.activemq.transport.netty;

import io.netty.buffer.ByteBuf;

Expand Down
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
package org.apache.activemq.transport.netty;

/**
* Encapsulates all the TCP Transport options in one configuration object.
Expand All @@ -31,6 +31,7 @@ public class NettyTransportOptions implements Cloneable {
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_TCP_PORT = 5672;
public static final boolean DEFAULT_TRACE_BYTES = false;
public static final String DEFAULT_WS_SUBPROTOCOL = NettyWSTransport.AMQP_SUB_PROTOCOL;

public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();

Expand All @@ -44,6 +45,7 @@ public class NettyTransportOptions implements Cloneable {
private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
private int defaultTcpPort = DEFAULT_TCP_PORT;
private boolean traceBytes = DEFAULT_TRACE_BYTES;
private String wsSubProtocol = DEFAULT_WS_SUBPROTOCOL;

/**
* @return the currently set send buffer size in bytes.
Expand Down Expand Up @@ -188,6 +190,14 @@ public boolean isSSL() {
return false;
}

public String getWsSubProtocol() {
return wsSubProtocol;
}

public void setWsSubProtocol(String wsSubProtocol) {
this.wsSubProtocol = wsSubProtocol;
}

@Override
public NettyTransportOptions clone() {
return copyOptions(new NettyTransportOptions());
Expand All @@ -202,6 +212,7 @@ protected NettyTransportOptions copyOptions(NettyTransportOptions copy) {
copy.setTcpKeepAlive(isTcpKeepAlive());
copy.setTcpNoDelay(isTcpNoDelay());
copy.setTrafficClass(getTrafficClass());
copy.setWsSubProtocol(getWsSubProtocol());

return copy;
}
Expand Down
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
package org.apache.activemq.transport.netty;

import java.util.Arrays;
import java.util.Collections;
Expand Down
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
package org.apache.activemq.transport.netty;

import java.io.File;
import java.io.FileInputStream;
Expand Down
Expand Up @@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
package org.apache.activemq.transport.netty;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;

import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -50,7 +51,7 @@ public class NettyWSTransport extends NettyTcpTransport {

private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);

private static final String AMQP_SUB_PROTOCOL = "amqp";
public static final String AMQP_SUB_PROTOCOL = "amqp";

/**
* Create a new transport instance
Expand Down Expand Up @@ -79,16 +80,16 @@ public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, Net
}

@Override
public void send(ByteBuf output) throws IOException {
public ChannelFuture send(ByteBuf output) throws IOException {
checkConnected();
int length = output.readableBytes();
if (length == 0) {
return;
return null;
}

LOG.trace("Attempted write of: {} bytes", length);

channel.writeAndFlush(new BinaryWebSocketFrame(output));
return channel.writeAndFlush(new BinaryWebSocketFrame(output));
}

@Override
Expand All @@ -115,7 +116,7 @@ private class NettyWebSocketTransportHandler extends NettyDefaultHandler<Object>

NettyWebSocketTransportHandler() {
handshaker = WebSocketClientHandshakerFactory.newHandshaker(
getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL,
getRemoteLocation(), WebSocketVersion.V13, options.getWsSubProtocol(),
true, new DefaultHttpHeaders(), getMaxFrameSize());
}

Expand Down
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
package org.apache.activemq.transport.netty;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.X509ExtendedKeyManager;
Expand Down
Expand Up @@ -25,9 +25,9 @@
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
import org.apache.activemq.transport.netty.NettyTransport;
import org.apache.activemq.transport.netty.NettyTransportFactory;
import org.apache.activemq.transport.netty.NettyTransportListener;
import org.junit.Test;

import java.net.URI;
Expand Down

0 comments on commit 5211afd

Please sign in to comment.