From 11cda77ce65c53aff234b49d98fe9c9a427eca53 Mon Sep 17 00:00:00 2001 From: Przemek Bruski Date: Fri, 22 Jan 2016 15:45:15 +0100 Subject: [PATCH 1/4] AMQ-6135-activemq-http-openwire --- .../transport/http/HttpClientTransport.java | 32 ++-- .../transport/http/HttpTransportFactory.java | 40 +++-- .../transport/http/HttpTransportSupport.java | 26 ++- .../transport/http/HttpTunnelServlet.java | 21 ++- .../HttpTextWireFormatMarshaller.java | 51 ++++++ .../marshallers/HttpTransportMarshaller.java | 46 ++++++ .../marshallers/HttpWireFormatMarshaller.java | 47 ++++++ .../TextWireFormatMarshallers.java | 93 +++++++++++ .../transport/https/HttpsClientTransport.java | 6 + .../https/HttpsTransportFactory.java | 8 +- .../http/HttpClientTransportTest.java | 131 +++++++++++++++ .../transport/http/HttpTunnelServletTest.java | 156 ++++++++++++++++++ 12 files changed, 624 insertions(+), 33 deletions(-) create mode 100644 activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTextWireFormatMarshaller.java create mode 100644 activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTransportMarshaller.java create mode 100644 activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpWireFormatMarshaller.java create mode 100644 activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/TextWireFormatMarshallers.java create mode 100644 activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientTransportTest.java create mode 100644 activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTunnelServletTest.java diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java index c65dbb99803..c8c8d600d90 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java @@ -16,8 +16,8 @@ */ package org.apache.activemq.transport.http; -import java.io.DataInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import java.net.URI; import java.util.zip.GZIPInputStream; @@ -25,6 +25,7 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.transport.FutureResponse; +import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.IOExceptionSupport; @@ -86,10 +87,15 @@ public class HttpClientTransport extends HttpTransportSupport { protected boolean canSendCompressed = false; private int minSendAsCompressedSize = 0; + @Deprecated public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { super(wireFormat, remoteUrl); } + public HttpClientTransport(final HttpTransportMarshaller marshaller, URI remoteUrl) { + super(marshaller, remoteUrl); + } + public FutureResponse asyncRequest(Object command) throws IOException { return null; } @@ -102,8 +108,7 @@ public void oneway(Object command) throws IOException { } HttpPost httpMethod = new HttpPost(getRemoteUrl().toString()); configureMethod(httpMethod); - String data = getTextWireFormat().marshalText(command); - byte[] bytes = data.getBytes("UTF-8"); + byte[] bytes = asBytes(command); if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) { ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); GZIPOutputStream stream = new GZIPOutputStream(bytesOut); @@ -145,17 +150,24 @@ public void oneway(Object command) throws IOException { } } + private byte[] asBytes(final Object command) throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + getMarshaller().marshal(command, outputStream); + return outputStream.toByteArray(); + } + @Override public Object request(Object command) throws IOException { return null; } - private DataInputStream createDataInputStream(HttpResponse answer) throws IOException { - Header encoding = answer.getEntity().getContentEncoding(); - if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) { - return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent())); + private InputStream createInputStream(final HttpResponse answer) throws IOException { + final InputStream inputStream = answer.getEntity().getContent(); + final Header encoding = answer.getEntity().getContentEncoding(); + if (encoding == null || !"gzip".equalsIgnoreCase(encoding.getValue())) { + return inputStream; } else { - return new DataInputStream(answer.getEntity().getContent()); + return new GZIPInputStream(inputStream); } } @@ -193,8 +205,8 @@ public void run() { } } else { receiveCounter++; - DataInputStream stream = createDataInputStream(answer); - Object command = getTextWireFormat().unmarshal(stream); + final InputStream stream = createInputStream(answer); + final Object command = getMarshaller().unmarshal(stream); if (command == null) { LOG.debug("Received null command from url: " + remoteUrl); } else { diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java index 5ccb3f97c66..ee73beb1825 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java @@ -28,8 +28,10 @@ import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportLoggerFactory; import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; +import org.apache.activemq.transport.http.marshallers.HttpWireFormatMarshaller; +import org.apache.activemq.transport.http.marshallers.TextWireFormatMarshallers; import org.apache.activemq.transport.util.TextWireFormat; -import org.apache.activemq.transport.xstream.XStreamWireFormat; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.URISupport; @@ -40,6 +42,16 @@ public class HttpTransportFactory extends TransportFactory { private static final Logger LOG = LoggerFactory.getLogger(HttpTransportFactory.class); + private static final String WIRE_FORMAT_XSTREAM = "xstream"; + private final String defaultWireFormatType; + + public HttpTransportFactory() { + defaultWireFormatType = WIRE_FORMAT_XSTREAM; + } + + public HttpTransportFactory(final String defaultWireFormatType) { + this.defaultWireFormatType = defaultWireFormatType; + } public TransportServer doBind(URI location) throws IOException { try { @@ -53,20 +65,21 @@ public TransportServer doBind(URI location) throws IOException { } } - protected TextWireFormat asTextWireFormat(WireFormat wireFormat) { - if (wireFormat instanceof TextWireFormat) { - return (TextWireFormat)wireFormat; - } - LOG.trace("Not created with a TextWireFormat: " + wireFormat); - return new XStreamWireFormat(); + @Deprecated + protected final TextWireFormat asTextWireFormat(final WireFormat wireFormat) { + throw new UnsupportedOperationException("asTextWireFormat is no longer supported"); + } + + protected WireFormat processWireFormat(final WireFormat wireFormat) { + return wireFormat; } protected String getDefaultWireFormatType() { - return "xstream"; + return defaultWireFormatType; } protected Transport createTransport(URI location, WireFormat wf) throws IOException { - TextWireFormat textWireFormat = asTextWireFormat(wf); + final WireFormat wireFormat = processWireFormat(wf); // need to remove options from uri URI uri; try { @@ -76,7 +89,14 @@ protected Transport createTransport(URI location, WireFormat wf) throws IOExcept cause.initCause(e); throw cause; } - return new HttpClientTransport(textWireFormat, uri); + return new HttpClientTransport(createMarshaller(wireFormat), uri); + } + + protected static HttpTransportMarshaller createMarshaller(final WireFormat wireFormat) + { + return wireFormat instanceof TextWireFormat ? + TextWireFormatMarshallers.newTransportMarshaller((TextWireFormat)wireFormat) : + new HttpWireFormatMarshaller(wireFormat); } @SuppressWarnings("rawtypes") diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportSupport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportSupport.java index d01ce25d2b9..9886d840173 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportSupport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportSupport.java @@ -19,7 +19,10 @@ import java.net.URI; import org.apache.activemq.transport.TransportThreadSupport; +import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; +import org.apache.activemq.transport.http.marshallers.TextWireFormatMarshallers; import org.apache.activemq.transport.util.TextWireFormat; +import org.apache.activemq.wireformat.WireFormat; /** * A useful base class for HTTP Transport implementations. @@ -27,15 +30,23 @@ * */ public abstract class HttpTransportSupport extends TransportThreadSupport { - private TextWireFormat textWireFormat; + @Deprecated + private WireFormat textWireFormat; + private HttpTransportMarshaller marshaller; private URI remoteUrl; private String proxyHost; private int proxyPort = 8080; private String proxyUser; private String proxyPassword; - public HttpTransportSupport(TextWireFormat textWireFormat, URI remoteUrl) { + @Deprecated + public HttpTransportSupport(final TextWireFormat textWireFormat, final URI remoteUrl) { + this(TextWireFormatMarshallers.newTransportMarshaller(textWireFormat), remoteUrl); this.textWireFormat = textWireFormat; + } + + public HttpTransportSupport(final HttpTransportMarshaller marshaller, final URI remoteUrl) { + this.marshaller = marshaller; this.remoteUrl = remoteUrl; } @@ -53,12 +64,19 @@ public URI getRemoteUrl() { return remoteUrl; } + @Deprecated public TextWireFormat getTextWireFormat() { - return textWireFormat; + return (TextWireFormat) textWireFormat; + } + + public HttpTransportMarshaller getMarshaller() { + return marshaller; } - public void setTextWireFormat(TextWireFormat textWireFormat) { + @Deprecated + public void setTextWireFormat(final TextWireFormat textWireFormat) { this.textWireFormat = textWireFormat; + this.marshaller = TextWireFormatMarshallers.newTransportMarshaller(textWireFormat); } public String getProxyHost() { diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java index e6dc7c9d0e6..2143acbe0b4 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java @@ -17,10 +17,8 @@ package org.apache.activemq.transport.http; import java.io.BufferedReader; -import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -38,10 +36,14 @@ import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; +import org.apache.activemq.transport.http.marshallers.HttpWireFormatMarshaller; +import org.apache.activemq.transport.http.marshallers.TextWireFormatMarshallers; import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.transport.xstream.XStreamWireFormat; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceListener; +import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +58,7 @@ public class HttpTunnelServlet extends HttpServlet { private TransportAcceptListener listener; private HttpTransportFactory transportFactory; - private TextWireFormat wireFormat; + private HttpTransportMarshaller marshaller; private ConcurrentMap clients = new ConcurrentHashMap(); private final long requestTimeout = 30000L; private HashMap transportOptions; @@ -74,10 +76,15 @@ public void init() throws ServletException { throw new ServletException("No such attribute 'transportFactory' available in the ServletContext"); } transportOptions = (HashMap)getServletContext().getAttribute("transportOptions"); - wireFormat = (TextWireFormat)getServletContext().getAttribute("wireFormat"); + WireFormat wireFormat = (WireFormat) getServletContext().getAttribute("wireFormat"); if (wireFormat == null) { wireFormat = createWireFormat(); } + if (wireFormat instanceof TextWireFormat) { + marshaller = TextWireFormatMarshallers.newServletMarshaller(wireFormat); + } else { + marshaller = new HttpWireFormatMarshaller(wireFormat); + } } @Override @@ -104,8 +111,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t packet = (Command)transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS); - DataOutputStream stream = new DataOutputStream(response.getOutputStream()); - wireFormat.marshal(packet, stream); + marshaller.marshal(packet, response.getOutputStream()); count++; } catch (InterruptedException ignore) { } @@ -124,8 +130,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) stream = new GZIPInputStream(stream); } - // Read the command directly from the reader, assuming UTF8 encoding - Command command = (Command) wireFormat.unmarshalText(new InputStreamReader(stream, "UTF-8")); + final Command command = (Command)marshaller.unmarshal(stream); if (command instanceof WireFormatInfo) { WireFormatInfo info = (WireFormatInfo) command; diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTextWireFormatMarshaller.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTextWireFormatMarshaller.java new file mode 100644 index 00000000000..c25b2172ed7 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTextWireFormatMarshaller.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.http.marshallers; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import org.apache.activemq.transport.util.TextWireFormat; + +/** + * A {@link HttpTransportMarshaller} implementation using a {@link TextWireFormat} and UTF8 encoding. + */ +public class HttpTextWireFormatMarshaller implements HttpTransportMarshaller +{ + private static final Charset CHARSET = StandardCharsets.UTF_8; + private final TextWireFormat wireFormat; + + public HttpTextWireFormatMarshaller(final TextWireFormat wireFormat) { + this.wireFormat = wireFormat; + } + + @Override + public void marshal(final Object command, final OutputStream outputStream) throws IOException { + final String s = wireFormat.marshalText(command); + outputStream.write(s.getBytes(CHARSET)); + } + + @Override + public Object unmarshal(final InputStream stream) throws IOException { + return wireFormat.unmarshalText(new InputStreamReader(stream, CHARSET)); + } +} diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTransportMarshaller.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTransportMarshaller.java new file mode 100644 index 00000000000..36b6a6c71c3 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTransportMarshaller.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.http.marshallers; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * A generic interface for marshallers used for HTTP communication. + */ +public interface HttpTransportMarshaller +{ + /** + * The implementations of this method should be able to marshall the supplied object into the output stream. + * + * @param command the object to marshall + * @param outputStream output stream for the serialised form. + * @throws IOException + */ + void marshal(final Object command, final OutputStream outputStream) throws IOException; + + /** + * The implementations of this method handle unmarshalling of objects from a wire format into Java objects. + * + * @param stream the stream with the serialised form of an object + * @return the deserialised object + * @throws IOException + */ + Object unmarshal(final InputStream stream) throws IOException; +} diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpWireFormatMarshaller.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpWireFormatMarshaller.java new file mode 100644 index 00000000000..92eec3c0cf5 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpWireFormatMarshaller.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.http.marshallers; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.activemq.wireformat.WireFormat; + +public class HttpWireFormatMarshaller implements HttpTransportMarshaller +{ + private final WireFormat wireFormat; + + public HttpWireFormatMarshaller(final WireFormat wireFormat) { + this.wireFormat = wireFormat; + } + + @Override + public void marshal(final Object command, final OutputStream outputStream) throws IOException { + final DataOutputStream out = new DataOutputStream(outputStream); + wireFormat.marshal(command, out); + out.flush(); + } + + @Override + public Object unmarshal(final InputStream stream) throws IOException { + return wireFormat.unmarshal(new DataInputStream(stream)); + } +} diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/TextWireFormatMarshallers.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/TextWireFormatMarshallers.java new file mode 100644 index 00000000000..d44468350ec --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/TextWireFormatMarshallers.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.http.marshallers; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import org.apache.activemq.transport.util.TextWireFormat; +import org.apache.activemq.wireformat.WireFormat; + +/** + * A factory for marshallers {@link HttpTransportMarshaller} that maintain compatibility with the original + * ActiveMQ code that used {@link TextWireFormat#marshalText(Object)} and {@link TextWireFormat#marshal(Object)} depending + * on the context. + * All text handling is done using UTF-8. + */ +public class TextWireFormatMarshallers { + private static final Charset CHARSET = StandardCharsets.UTF_8; + + /** + * The returned marshaller uses {@link TextWireFormat#marshal(Object)} and {@link TextWireFormat#unmarshalText(Reader)}. + */ + public static HttpTransportMarshaller newServletMarshaller(final WireFormat wireFormat) { + return new MarshalPlainUnmarshalTextMarshaller((TextWireFormat)wireFormat); + } + + /** + * The returned marshaller uses {@link TextWireFormat#marshalText(Object)} and {@link TextWireFormat#unmarshal(DataInput)} + */ + public static HttpTransportMarshaller newTransportMarshaller(final TextWireFormat textWireFormat) { + return new MarshalTextUnmarshalPlainMarshaller(textWireFormat); + } + + private static class MarshalTextUnmarshalPlainMarshaller implements HttpTransportMarshaller { + private final TextWireFormat wireFormat; + + private MarshalTextUnmarshalPlainMarshaller(final TextWireFormat wireFormat) { + this.wireFormat = wireFormat; + } + + @Override + public void marshal(final Object command, final OutputStream outputStream) throws IOException { + final String s = wireFormat.marshalText(command); + outputStream.write(s.getBytes(CHARSET)); + } + + @Override + public Object unmarshal(final InputStream stream) throws IOException { + return wireFormat.unmarshal(new DataInputStream(stream)); + } + } + + private static class MarshalPlainUnmarshalTextMarshaller implements HttpTransportMarshaller { + private final TextWireFormat wireFormat; + + private MarshalPlainUnmarshalTextMarshaller(final TextWireFormat wireFormat) { + this.wireFormat = wireFormat; + } + + @Override + public void marshal(final Object command, final OutputStream outputStream) throws IOException { + wireFormat.marshal(command, new DataOutputStream(outputStream)); + } + + @Override + public Object unmarshal(final InputStream stream) throws IOException { + return wireFormat.unmarshalText(new InputStreamReader(stream, CHARSET)); + } + } +} \ No newline at end of file diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java index 2e432fcaf14..c81342fe2c5 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java @@ -22,6 +22,7 @@ import org.apache.activemq.broker.SslContext; import org.apache.activemq.transport.http.HttpClientTransport; +import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.util.IOExceptionSupport; import org.apache.http.conn.ClientConnectionManager; @@ -32,10 +33,15 @@ public class HttpsClientTransport extends HttpClientTransport { + @Deprecated public HttpsClientTransport(TextWireFormat wireFormat, URI remoteUrl) { super(wireFormat, remoteUrl); } + public HttpsClientTransport(final HttpTransportMarshaller marshaller, URI remoteUrl) { + super(marshaller, remoteUrl); + } + @Override protected ClientConnectionManager createClientConnectionManager() { PoolingClientConnectionManager connectionManager = new PoolingClientConnectionManager(createSchemeRegistry()); diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsTransportFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsTransportFactory.java index 036484cf7c8..fe16a084e48 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsTransportFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsTransportFactory.java @@ -37,6 +37,12 @@ */ public class HttpsTransportFactory extends HttpTransportFactory { + public HttpsTransportFactory() {} + + public HttpsTransportFactory(final String defaultWireFormatType) { + super(defaultWireFormatType); + } + public TransportServer doBind(String brokerId, URI location) throws IOException { return doBind(location); } @@ -63,6 +69,6 @@ protected Transport createTransport(URI location, WireFormat wf) throws Malforme cause.initCause(e); throw cause; } - return new HttpsClientTransport(asTextWireFormat(wf), uri); + return new HttpsClientTransport(createMarshaller(wf), uri); } } diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientTransportTest.java new file mode 100644 index 00000000000..49281b9fda1 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientTransportTest.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.http; + +import java.io.ByteArrayInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.transport.http.marshallers.TextWireFormatMarshallers; +import org.apache.activemq.transport.xstream.XStreamWireFormat; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.http.HttpResponse; +import org.apache.http.ProtocolVersion; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.message.BasicHttpResponse; +import org.apache.http.message.BasicStatusLine; +import org.apache.http.params.BasicHttpParams; +import org.hamcrest.CoreMatchers; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +public class HttpClientTransportTest { + + @Rule + public final MockitoRule rule = MockitoJUnit.rule(); + + @Mock + private HttpClient sendHttpClient; + + @Mock + private HttpClient receiveHttpClient; + + @Test + public void testPreservesAsymmetricalMarshalling() throws Exception { + final AtomicReference unmarshalledCommand = new AtomicReference<>(); + + final HttpClientTransport httpClientTransport = new HttpClientTransport(TextWireFormatMarshallers.newTransportMarshaller(new XStreamWireFormat()), URI.create("http://localhost")) { + @Override + public HttpClient getSendHttpClient() { + return sendHttpClient; + } + + @Override + public HttpClient getReceiveHttpClient() { + return receiveHttpClient; + } + + @Override + public void doConsume(final Object command) { + unmarshalledCommand.set(command); + try { + stop(); + } catch (Exception e) { + } + } + }; + + final AtomicReference marshalledCommand = new AtomicReference<>(); + + { + when(sendHttpClient.getParams()).thenReturn(new BasicHttpParams()); + when(sendHttpClient.execute(Mockito.any())).thenAnswer(new Answer() { + @Override + public HttpResponse answer(final InvocationOnMock invocation) throws Throwable { + final HttpPost method = invocation.getArgumentAt(0, HttpPost.class); + final String entityBody = IOUtils.toString(method.getEntity().getContent()); + marshalledCommand.set(entityBody); + return newHttpOkResponse(); + } + }); + + httpClientTransport.oneway(new ConsumerInfo()); + assertThat(marshalledCommand.get(), CoreMatchers.startsWith("<")); + } + + { + final BasicHttpResponse httpOkResponse = newHttpOkResponse(); + httpOkResponse.setEntity(new InputStreamEntity(new ByteArrayInputStream(toMarshalledMessage(marshalledCommand)))); + when(receiveHttpClient.execute(Mockito.any())).thenReturn(httpOkResponse); + httpClientTransport.run(); + assertThat(unmarshalledCommand.get(), CoreMatchers.instanceOf(ConsumerInfo.class)); + } + } + + private byte[] toMarshalledMessage(AtomicReference marshalledCommand) throws IOException { + final byte[] textBytes = marshalledCommand.get().getBytes(StandardCharsets.UTF_8); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream dataOutputStream = new DataOutputStream(baos); + dataOutputStream.writeInt(textBytes.length); + dataOutputStream.write(textBytes); + dataOutputStream.flush(); + return baos.toByteArray(); + } + + private BasicHttpResponse newHttpOkResponse() { + return new BasicHttpResponse(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), HttpURLConnection.HTTP_OK, "OK")); + } +} \ No newline at end of file diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTunnelServletTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTunnelServletTest.java new file mode 100644 index 00000000000..42d2913f122 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTunnelServletTest.java @@ -0,0 +1,156 @@ +package org.apache.activemq.transport.http; + +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ReadListener; +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.transport.TransportAcceptListener; +import org.hamcrest.CoreMatchers; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class HttpTunnelServletTest { + + @Rule + public final MockitoRule rule = MockitoJUnit.rule(); + + @Mock + private HttpServletRequest request; + @Mock + private HttpServletResponse response; + + private final MockServletOutputStream servletOutputStream = new MockServletOutputStream(); + + @Before + public void setup() throws IOException { + when(response.getOutputStream()).thenReturn(servletOutputStream); + } + + @Test + public void testPreservesAsymmetricalMarshalling() throws Exception { + final AtomicReference commandRef = new AtomicReference<>(); + + final BlockingQueueTransport transportChannel = newTransportChannel(commandRef); + + final HttpTunnelServlet httpTunnelServlet = newServlet(transportChannel); + + httpTunnelServlet.doGet(request, response); //marshall + + final String wireFormatMessage = servletOutputStream.getContent(); + assertThat(wireFormatMessage, not(CoreMatchers.startsWith("<"))); + + final String message = toTextMessage(wireFormatMessage); + when(request.getInputStream()).thenReturn(new MockServletInputStream(message)); + httpTunnelServlet.doPost(request, response); //unmarshallText + assertThat(commandRef.get(), CoreMatchers.instanceOf(ConsumerInfo.class)); + } + + private HttpTunnelServlet newServlet(final BlockingQueueTransport transportChannel) throws ServletException { + final HttpTunnelServlet httpTunnelServlet = new HttpTunnelServlet() { + @Override + protected BlockingQueueTransport getTransportChannel(final HttpServletRequest request, final HttpServletResponse response) { + return transportChannel; + } + }; + final ServletConfig servletConfig = mock(ServletConfig.class); + + final ServletContext servletContext = mockServletContext(); + when(servletConfig.getServletContext()).thenReturn(servletContext); + httpTunnelServlet.init(servletConfig); + return httpTunnelServlet; + } + + private BlockingQueueTransport newTransportChannel(final AtomicReference commandRef) { + final BlockingQueueTransport transportChannel = new BlockingQueueTransport(new ArrayBlockingQueue<>(10)) { + @Override + public void doConsume(final Object command) { + commandRef.set(command); + } + }; + transportChannel.getQueue().offer(new ConsumerInfo()); + return transportChannel; + } + + private ServletContext mockServletContext() { + final ServletContext servletContext = mock(ServletContext.class); + final TransportAcceptListener acceptListener = mock(TransportAcceptListener.class); + when(servletContext.getAttribute(eq("acceptListener"))).thenReturn(acceptListener); + when(servletContext.getAttribute(eq("transportFactory"))).thenReturn(new HttpTransportFactory()); + return servletContext; + } + + private String toTextMessage(final String message) { + return message.substring(message.indexOf('<')); + } + + private static class MockServletOutputStream extends ServletOutputStream { + private final StringBuilder sb = new StringBuilder(); + + @Override + public boolean isReady() { + return false; + } + + @Override + public void setWriteListener(final WriteListener writeListener) { + } + + @Override + public void write(final int b) throws IOException { + sb.append((char)b); + } + + public String getContent() { + final String s = sb.toString(); + sb.setLength(0); + return s; + } + } + + private class MockServletInputStream extends ServletInputStream { + private final String string; + private int pos; + + private MockServletInputStream(final String message) { + string = message; + } + + @Override + public boolean isFinished() { + return pos==string.length(); + } + + @Override + public boolean isReady() { + return false; + } + + @Override + public void setReadListener(final ReadListener readListener) { + } + + @Override + public int read() throws IOException { + return isFinished() ? -1 : string.charAt(pos++); + } + } +} \ No newline at end of file From 28a51d717e13b93a7d0a4c0c9902f19debbfb8c4 Mon Sep 17 00:00:00 2001 From: Przemek Bruski Date: Wed, 2 Mar 2016 16:35:02 +0100 Subject: [PATCH 2/4] AMQ-6135 removed deprecations --- .../transport/http/HttpClientTransport.java | 5 ----- .../transport/http/HttpTransportFactory.java | 5 ----- .../transport/http/HttpTransportSupport.java | 22 ------------------- .../transport/https/HttpsClientTransport.java | 6 ----- 4 files changed, 38 deletions(-) diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java index c8c8d600d90..4639c0c22c1 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java @@ -87,11 +87,6 @@ public class HttpClientTransport extends HttpTransportSupport { protected boolean canSendCompressed = false; private int minSendAsCompressedSize = 0; - @Deprecated - public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { - super(wireFormat, remoteUrl); - } - public HttpClientTransport(final HttpTransportMarshaller marshaller, URI remoteUrl) { super(marshaller, remoteUrl); } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java index ee73beb1825..131a00e6381 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java @@ -65,11 +65,6 @@ public TransportServer doBind(URI location) throws IOException { } } - @Deprecated - protected final TextWireFormat asTextWireFormat(final WireFormat wireFormat) { - throw new UnsupportedOperationException("asTextWireFormat is no longer supported"); - } - protected WireFormat processWireFormat(final WireFormat wireFormat) { return wireFormat; } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportSupport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportSupport.java index 9886d840173..97ee904a824 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportSupport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportSupport.java @@ -20,9 +20,6 @@ import org.apache.activemq.transport.TransportThreadSupport; import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; -import org.apache.activemq.transport.http.marshallers.TextWireFormatMarshallers; -import org.apache.activemq.transport.util.TextWireFormat; -import org.apache.activemq.wireformat.WireFormat; /** * A useful base class for HTTP Transport implementations. @@ -30,8 +27,6 @@ * */ public abstract class HttpTransportSupport extends TransportThreadSupport { - @Deprecated - private WireFormat textWireFormat; private HttpTransportMarshaller marshaller; private URI remoteUrl; private String proxyHost; @@ -39,12 +34,6 @@ public abstract class HttpTransportSupport extends TransportThreadSupport { private String proxyUser; private String proxyPassword; - @Deprecated - public HttpTransportSupport(final TextWireFormat textWireFormat, final URI remoteUrl) { - this(TextWireFormatMarshallers.newTransportMarshaller(textWireFormat), remoteUrl); - this.textWireFormat = textWireFormat; - } - public HttpTransportSupport(final HttpTransportMarshaller marshaller, final URI remoteUrl) { this.marshaller = marshaller; this.remoteUrl = remoteUrl; @@ -64,21 +53,10 @@ public URI getRemoteUrl() { return remoteUrl; } - @Deprecated - public TextWireFormat getTextWireFormat() { - return (TextWireFormat) textWireFormat; - } - public HttpTransportMarshaller getMarshaller() { return marshaller; } - @Deprecated - public void setTextWireFormat(final TextWireFormat textWireFormat) { - this.textWireFormat = textWireFormat; - this.marshaller = TextWireFormatMarshallers.newTransportMarshaller(textWireFormat); - } - public String getProxyHost() { return proxyHost; } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java index c81342fe2c5..dbedf6f3027 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java @@ -23,7 +23,6 @@ import org.apache.activemq.broker.SslContext; import org.apache.activemq.transport.http.HttpClientTransport; import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; -import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.util.IOExceptionSupport; import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.scheme.Scheme; @@ -33,11 +32,6 @@ public class HttpsClientTransport extends HttpClientTransport { - @Deprecated - public HttpsClientTransport(TextWireFormat wireFormat, URI remoteUrl) { - super(wireFormat, remoteUrl); - } - public HttpsClientTransport(final HttpTransportMarshaller marshaller, URI remoteUrl) { super(marshaller, remoteUrl); } From 7f9b00f3e9ac4a4f5cfec95dff42d808fdde5f84 Mon Sep 17 00:00:00 2001 From: Przemek Bruski Date: Fri, 18 Mar 2016 16:49:22 +0100 Subject: [PATCH 3/4] AMQ-6135 added tests for the new functionality --- .../transport/http/HttpTransportFactory.java | 2 +- .../http/HttpOpenWireSendAndReceiveTest.java | 100 ++++++++++++++++++ .../openwire/AssertingTransportFactory.java | 54 ++++++++++ .../openwire/CustomHttpTransportFactory.java | 47 ++++++++ .../openwire/CustomHttpTransportServer.java | 57 ++++++++++ .../openwire/ServletContextAttributes.java | 46 ++++++++ .../http/openwire/SpyMarshaller.java | 58 ++++++++++ 7 files changed, 363 insertions(+), 1 deletion(-) create mode 100755 activemq-http/src/test/java/org/apache/activemq/transport/http/HttpOpenWireSendAndReceiveTest.java create mode 100644 activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/AssertingTransportFactory.java create mode 100644 activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/CustomHttpTransportFactory.java create mode 100644 activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/CustomHttpTransportServer.java create mode 100644 activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/ServletContextAttributes.java create mode 100644 activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/SpyMarshaller.java diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java index 131a00e6381..c1846fc5d28 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java @@ -87,7 +87,7 @@ protected Transport createTransport(URI location, WireFormat wf) throws IOExcept return new HttpClientTransport(createMarshaller(wireFormat), uri); } - protected static HttpTransportMarshaller createMarshaller(final WireFormat wireFormat) + protected HttpTransportMarshaller createMarshaller(final WireFormat wireFormat) { return wireFormat instanceof TextWireFormat ? TextWireFormatMarshallers.newTransportMarshaller((TextWireFormat)wireFormat) : diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpOpenWireSendAndReceiveTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpOpenWireSendAndReceiveTest.java new file mode 100755 index 00000000000..11ee097925e --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpOpenWireSendAndReceiveTest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.http; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.http.openwire.AssertingTransportFactory; +import org.apache.activemq.transport.http.openwire.CustomHttpTransportFactory; +import org.apache.activemq.transport.http.openwire.SpyMarshaller; +import org.apache.activemq.transport.http.marshallers.HttpWireFormatMarshaller; + +import java.util.LinkedList; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; + +/** + * + */ +public class HttpOpenWireSendAndReceiveTest extends JmsTopicSendReceiveWithTwoConnectionsTest { + private static final String CUSTOM_HTTP_PROTOCOL = "http"; + private static final String WIRE_FORMAT_OPENWIRE = "default"; + private final AssertingTransportFactory clientTransportFactory = new AssertingTransportFactory(WIRE_FORMAT_OPENWIRE, HttpWireFormatMarshaller.class); + + protected BrokerService broker; + + @Override + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + broker.start(); + } + super.setUp(); + WaitForJettyListener.waitForJettySocketToAccept(getBrokerURL()); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + @Override + public void testSendReceive() throws Exception + { + super.testSendReceive(); + final LinkedList usedMarshallers = clientTransportFactory.getSpyMarshallers(); + assertThat(usedMarshallers.size(), equalTo(2)); + final SpyMarshaller marshaller1 = usedMarshallers.pop(); + final SpyMarshaller marshaller2 = usedMarshallers.pop(); + + assertThat(marshaller1.getMarshallCallsCnt(), not(equalTo(0))); + assertThat(marshaller1.getUnmarshallCallsCnt(), not(equalTo(0))); + + assertThat(marshaller1.getMarshallCallsCnt(), equalTo(marshaller2.getMarshallCallsCnt())); + assertThat(marshaller1.getUnmarshallCallsCnt(), equalTo(marshaller2.getUnmarshallCallsCnt())); + } + + protected String getBrokerURL() { + return "http://localhost:8161"; + } + + protected BrokerService createBroker() throws Exception { + final BrokerService broker = new BrokerService(); + broker.setPersistent(false); + addConnector(broker, getBrokerURL()); + return broker; + } + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() { + TransportFactory.registerTransportFactory(CUSTOM_HTTP_PROTOCOL, clientTransportFactory); + return new ActiveMQConnectionFactory(getBrokerURL()); + } + + private static void addConnector(final BrokerService brokerService, final String brokerURL) throws Exception { + TransportFactory.registerTransportFactory(CUSTOM_HTTP_PROTOCOL, new CustomHttpTransportFactory()); + + brokerService.addConnector(brokerURL); + } +} diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/AssertingTransportFactory.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/AssertingTransportFactory.java new file mode 100644 index 00000000000..dc1a691422c --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/AssertingTransportFactory.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.http.openwire; + +import org.apache.activemq.transport.http.HttpTransportFactory; +import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; +import org.apache.activemq.wireformat.WireFormat; +import org.hamcrest.CoreMatchers; + +import java.util.LinkedList; + +import static org.junit.Assert.assertThat; + +/** + * Ensures that all transports created by this factory are of the expected type. + */ +public class AssertingTransportFactory extends HttpTransportFactory { + private final Class expectedMarshallerType; + private final LinkedList spyMarshallers = new LinkedList<>(); + + public AssertingTransportFactory(final String wireFormat, final Class expectedMarshallerType) { + super(wireFormat); + this.expectedMarshallerType = expectedMarshallerType; + } + + @Override + protected HttpTransportMarshaller createMarshaller(final WireFormat wireFormat) + { + final HttpTransportMarshaller marshaller = super.createMarshaller(wireFormat); + assertThat("Unexpected marshaller used", marshaller, CoreMatchers.instanceOf(expectedMarshallerType)); + final SpyMarshaller spyMarshaller = new SpyMarshaller(marshaller); + spyMarshallers.add(spyMarshaller); + return spyMarshaller; + } + + public LinkedList getSpyMarshallers() { + return spyMarshallers; + } +} diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/CustomHttpTransportFactory.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/CustomHttpTransportFactory.java new file mode 100644 index 00000000000..16258514e22 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/CustomHttpTransportFactory.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.http.openwire; + +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.http.HttpTransportFactory; +import org.apache.activemq.transport.http.HttpTransportServer; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + +public class CustomHttpTransportFactory extends HttpTransportFactory +{ + @Override + public TransportServer doBind(final URI location) throws IOException { + try { + final Map options = new HashMap(URISupport.parseParameters(location)); + final HttpTransportServer result = new CustomHttpTransportServer(location, this); + final Map transportOptions = IntrospectionSupport.extractProperties(options, "transport."); + result.setTransportOption(transportOptions); + return result; + } catch (final URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } +} diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/CustomHttpTransportServer.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/CustomHttpTransportServer.java new file mode 100644 index 00000000000..3f27994e554 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/CustomHttpTransportServer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.http.openwire; + +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.transport.http.HttpTransportFactory; +import org.apache.activemq.transport.http.HttpTransportServer; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.component.AbstractLifeCycle.AbstractLifeCycleListener; +import org.eclipse.jetty.util.component.LifeCycle; + +import java.net.URI; + +public class CustomHttpTransportServer extends HttpTransportServer { + private final HttpTransportFactory transportFactory; + + public CustomHttpTransportServer(final URI location, final CustomHttpTransportFactory transportFactory) { + super(location, transportFactory); + this.transportFactory = transportFactory; + } + + @Override + protected void createServer() { + super.createServer(); + + server.addLifeCycleListener(new AbstractLifeCycleListener() + { + @Override + public void lifeCycleStarting(final LifeCycle event) + { + setupServletContext((ServletContextHandler)server.getHandler()); + } + }); + } + + private void setupServletContext(final ServletContextHandler handler) { + ServletContextAttributes.setAcceptListener(handler, getAcceptListener()); + ServletContextAttributes.setTransportOptions(handler, transportOptions); + ServletContextAttributes.setTransportFactory(handler, transportFactory); + ServletContextAttributes.setWireFormat(handler, new OpenWireFormatFactory().createWireFormat()); + } +} diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/ServletContextAttributes.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/ServletContextAttributes.java new file mode 100644 index 00000000000..22b09368659 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/ServletContextAttributes.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.http.openwire; + +import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.http.HttpTransportFactory; +import org.apache.activemq.wireformat.WireFormat; +import org.eclipse.jetty.servlet.ServletContextHandler; + +import java.util.Map; + +public final class ServletContextAttributes { + + private ServletContextAttributes() {} + + public static void setWireFormat(final ServletContextHandler servletContext, final WireFormat wireFormat) { + servletContext.setAttribute("wireFormat", wireFormat); + } + + public static void setTransportFactory(final ServletContextHandler servletContext, final HttpTransportFactory transportFactory) { + servletContext.setAttribute("transportFactory", transportFactory); + } + + public static void setTransportOptions(final ServletContextHandler servletContext, final Map transportOptions) { + servletContext.setAttribute("transportOptions", transportOptions); + } + + public static void setAcceptListener(final ServletContextHandler servletContext, final TransportAcceptListener acceptListener) { + servletContext.setAttribute("acceptListener", acceptListener); + } +} diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/SpyMarshaller.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/SpyMarshaller.java new file mode 100644 index 00000000000..f80fabd478d --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/SpyMarshaller.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.http.openwire; + +import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicInteger; + +public class SpyMarshaller implements HttpTransportMarshaller { + private final HttpTransportMarshaller marshaller; + + private final AtomicInteger marshallCalls = new AtomicInteger(); + private final AtomicInteger unmarshallCalls = new AtomicInteger(); + + public SpyMarshaller(final HttpTransportMarshaller marshaller) { + this.marshaller = marshaller; + } + + @Override + public void marshal(final Object command, final OutputStream outputStream) throws IOException { + marshallCalls.incrementAndGet(); + marshaller.marshal(command, outputStream); + } + + @Override + public Object unmarshal(final InputStream stream) throws IOException { + unmarshallCalls.incrementAndGet(); + return marshaller.unmarshal(stream); + } + + public int getMarshallCallsCnt() + { + return marshallCalls.get(); + } + + public int getUnmarshallCallsCnt() + { + return unmarshallCalls.get(); + } +} From d5f1d8b43bb06fb4daf8a5003886f9a373430963 Mon Sep 17 00:00:00 2001 From: Przemek Bruski Date: Mon, 5 Sep 2016 13:56:52 +0200 Subject: [PATCH 4/4] AMQ-6135 adaptations for AMQ-6339 --- .../activemq/transport/http/HttpClientTransport.java | 3 +-- .../http/marshallers/HttpTextWireFormatMarshaller.java | 6 ++++++ .../http/marshallers/HttpTransportMarshaller.java | 8 ++++++++ .../http/marshallers/HttpWireFormatMarshaller.java | 5 +++++ .../http/marshallers/TextWireFormatMarshallers.java | 10 ++++++++++ .../transport/http/openwire/SpyMarshaller.java | 6 ++++++ 6 files changed, 36 insertions(+), 2 deletions(-) diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java index d75f4ca9277..e0ed182e127 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java @@ -27,7 +27,6 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; -import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; @@ -418,6 +417,6 @@ public void setPeerCertificates(X509Certificate[] certificates) { @Override public WireFormat getWireFormat() { - return getTextWireFormat(); + return getMarshaller().getWireFormat(); } } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTextWireFormatMarshaller.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTextWireFormatMarshaller.java index c25b2172ed7..a3dd25bb24e 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTextWireFormatMarshaller.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTextWireFormatMarshaller.java @@ -25,6 +25,7 @@ import java.nio.charset.StandardCharsets; import org.apache.activemq.transport.util.TextWireFormat; +import org.apache.activemq.wireformat.WireFormat; /** * A {@link HttpTransportMarshaller} implementation using a {@link TextWireFormat} and UTF8 encoding. @@ -48,4 +49,9 @@ public void marshal(final Object command, final OutputStream outputStream) throw public Object unmarshal(final InputStream stream) throws IOException { return wireFormat.unmarshalText(new InputStreamReader(stream, CHARSET)); } + + @Override + public WireFormat getWireFormat() { + return wireFormat; + } } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTransportMarshaller.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTransportMarshaller.java index 36b6a6c71c3..68c154efc0d 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTransportMarshaller.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpTransportMarshaller.java @@ -21,6 +21,8 @@ import java.io.InputStream; import java.io.OutputStream; +import org.apache.activemq.wireformat.WireFormat; + /** * A generic interface for marshallers used for HTTP communication. */ @@ -43,4 +45,10 @@ public interface HttpTransportMarshaller * @throws IOException */ Object unmarshal(final InputStream stream) throws IOException; + + /** + * + * @return the wire format used by this marshaller + */ + WireFormat getWireFormat(); } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpWireFormatMarshaller.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpWireFormatMarshaller.java index 92eec3c0cf5..660faadfafb 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpWireFormatMarshaller.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/HttpWireFormatMarshaller.java @@ -44,4 +44,9 @@ public void marshal(final Object command, final OutputStream outputStream) throw public Object unmarshal(final InputStream stream) throws IOException { return wireFormat.unmarshal(new DataInputStream(stream)); } + + @Override + public WireFormat getWireFormat() { + return wireFormat; + } } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/TextWireFormatMarshallers.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/TextWireFormatMarshallers.java index d44468350ec..f7796671002 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/TextWireFormatMarshallers.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/marshallers/TextWireFormatMarshallers.java @@ -71,6 +71,11 @@ public void marshal(final Object command, final OutputStream outputStream) throw public Object unmarshal(final InputStream stream) throws IOException { return wireFormat.unmarshal(new DataInputStream(stream)); } + + @Override + public WireFormat getWireFormat() { + return wireFormat; + } } private static class MarshalPlainUnmarshalTextMarshaller implements HttpTransportMarshaller { @@ -89,5 +94,10 @@ public void marshal(final Object command, final OutputStream outputStream) throw public Object unmarshal(final InputStream stream) throws IOException { return wireFormat.unmarshalText(new InputStreamReader(stream, CHARSET)); } + + @Override + public WireFormat getWireFormat() { + return wireFormat; + } } } \ No newline at end of file diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/SpyMarshaller.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/SpyMarshaller.java index f80fabd478d..ae9e97825c4 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/SpyMarshaller.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/openwire/SpyMarshaller.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.http.openwire; import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller; +import org.apache.activemq.wireformat.WireFormat; import java.io.IOException; import java.io.InputStream; @@ -55,4 +56,9 @@ public int getUnmarshallCallsCnt() { return unmarshallCalls.get(); } + + @Override + public WireFormat getWireFormat() { + return marshaller.getWireFormat(); + } }