Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.activemq.transport.http;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.URI;
import java.security.cert.X509Certificate;
Expand All @@ -26,7 +26,7 @@

import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
Expand Down Expand Up @@ -90,8 +90,8 @@ public class HttpClientTransport extends HttpTransportSupport {
protected boolean canSendCompressed = false;
private int minSendAsCompressedSize = 0;

public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
super(wireFormat, remoteUrl);
public HttpClientTransport(final HttpTransportMarshaller marshaller, URI remoteUrl) {
super(marshaller, remoteUrl);
}

public FutureResponse asyncRequest(Object command) throws IOException {
Expand All @@ -106,8 +106,7 @@ public void oneway(Object command) throws IOException {
}
HttpPost httpMethod = new HttpPost(getRemoteUrl().toString());
configureMethod(httpMethod);
String data = getTextWireFormat().marshalText(command);
byte[] bytes = data.getBytes("UTF-8");
byte[] bytes = asBytes(command);
if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
GZIPOutputStream stream = new GZIPOutputStream(bytesOut);
Expand Down Expand Up @@ -147,17 +146,24 @@ public void oneway(Object command) throws IOException {
}
}

private byte[] asBytes(final Object command) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
getMarshaller().marshal(command, outputStream);
return outputStream.toByteArray();
}

@Override
public Object request(Object command) throws IOException {
return null;
}

private DataInputStream createDataInputStream(HttpResponse answer) throws IOException {
Header encoding = answer.getEntity().getContentEncoding();
if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) {
return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent()));
private InputStream createInputStream(final HttpResponse answer) throws IOException {
final InputStream inputStream = answer.getEntity().getContent();
final Header encoding = answer.getEntity().getContentEncoding();
if (encoding == null || !"gzip".equalsIgnoreCase(encoding.getValue())) {
return inputStream;
} else {
return new DataInputStream(answer.getEntity().getContent());
return new GZIPInputStream(inputStream);
}
}

Expand Down Expand Up @@ -195,8 +201,8 @@ public void run() {
}
} else {
receiveCounter++;
DataInputStream stream = createDataInputStream(answer);
Object command = getTextWireFormat().unmarshal(stream);
final InputStream stream = createInputStream(answer);
final Object command = getMarshaller().unmarshal(stream);
if (command == null) {
LOG.debug("Received null command from url: " + remoteUrl);
} else {
Expand Down Expand Up @@ -415,6 +421,6 @@ public void setPeerCertificates(X509Certificate[] certificates) {

@Override
public WireFormat getWireFormat() {
return getTextWireFormat();
return getMarshaller().getWireFormat();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller;
import org.apache.activemq.transport.http.marshallers.HttpWireFormatMarshaller;
import org.apache.activemq.transport.http.marshallers.TextWireFormatMarshallers;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.xstream.XStreamWireFormat;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
Expand All @@ -39,6 +41,16 @@
public class HttpTransportFactory extends TransportFactory {

private static final Logger LOG = LoggerFactory.getLogger(HttpTransportFactory.class);
private static final String WIRE_FORMAT_XSTREAM = "xstream";
private final String defaultWireFormatType;

public HttpTransportFactory() {
defaultWireFormatType = WIRE_FORMAT_XSTREAM;
}

public HttpTransportFactory(final String defaultWireFormatType) {
this.defaultWireFormatType = defaultWireFormatType;
}

@Override
public TransportServer doBind(URI location) throws IOException {
Expand All @@ -57,22 +69,18 @@ public TransportServer doBind(URI location) throws IOException {
}
}

protected TextWireFormat asTextWireFormat(WireFormat wireFormat) {
if (wireFormat instanceof TextWireFormat) {
return (TextWireFormat)wireFormat;
}
LOG.trace("Not created with a TextWireFormat: {}", wireFormat);
return new XStreamWireFormat();
protected WireFormat processWireFormat(final WireFormat wireFormat) {
return wireFormat;
}

@Override
protected String getDefaultWireFormatType() {
return "xstream";
return defaultWireFormatType;
}

@Override
protected Transport createTransport(URI location, WireFormat wf) throws IOException {
TextWireFormat textWireFormat = asTextWireFormat(wf);
final WireFormat wireFormat = processWireFormat(wf);
// need to remove options from uri
URI uri;
try {
Expand All @@ -82,7 +90,14 @@ protected Transport createTransport(URI location, WireFormat wf) throws IOExcept
cause.initCause(e);
throw cause;
}
return new HttpClientTransport(textWireFormat, uri);
return new HttpClientTransport(createMarshaller(wireFormat), uri);
}

protected HttpTransportMarshaller createMarshaller(final WireFormat wireFormat)
{
return wireFormat instanceof TextWireFormat ?
TextWireFormatMarshallers.newTransportMarshaller((TextWireFormat)wireFormat) :
new HttpWireFormatMarshaller(wireFormat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@
import java.net.URI;

import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller;

/**
* A useful base class for HTTP Transport implementations.
*
*
*/
public abstract class HttpTransportSupport extends TransportThreadSupport {
private TextWireFormat textWireFormat;
private HttpTransportMarshaller marshaller;
private URI remoteUrl;
private String proxyHost;
private int proxyPort = 8080;
private String proxyUser;
private String proxyPassword;

public HttpTransportSupport(TextWireFormat textWireFormat, URI remoteUrl) {
this.textWireFormat = textWireFormat;
public HttpTransportSupport(final HttpTransportMarshaller marshaller, final URI remoteUrl) {
this.marshaller = marshaller;
this.remoteUrl = remoteUrl;
}

Expand All @@ -53,12 +53,8 @@ public URI getRemoteUrl() {
return remoteUrl;
}

public TextWireFormat getTextWireFormat() {
return textWireFormat;
}

public void setTextWireFormat(TextWireFormat textWireFormat) {
this.textWireFormat = textWireFormat;
public HttpTransportMarshaller getMarshaller() {
return marshaller;
}

public String getProxyHost() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package org.apache.activemq.transport.http;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -38,10 +36,14 @@
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.http.marshallers.HttpTransportMarshaller;
import org.apache.activemq.transport.http.marshallers.HttpWireFormatMarshaller;
import org.apache.activemq.transport.http.marshallers.TextWireFormatMarshallers;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.xstream.XStreamWireFormat;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceListener;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,7 +58,7 @@ public class HttpTunnelServlet extends HttpServlet {

private TransportAcceptListener listener;
private HttpTransportFactory transportFactory;
private TextWireFormat wireFormat;
private HttpTransportMarshaller marshaller;
private ConcurrentMap<String, BlockingQueueTransport> clients = new ConcurrentHashMap<String, BlockingQueueTransport>();
private final long requestTimeout = 30000L;
private HashMap<String, Object> transportOptions;
Expand All @@ -74,10 +76,15 @@ public void init() throws ServletException {
throw new ServletException("No such attribute 'transportFactory' available in the ServletContext");
}
transportOptions = (HashMap<String, Object>)getServletContext().getAttribute("transportOptions");
wireFormat = (TextWireFormat)getServletContext().getAttribute("wireFormat");
WireFormat wireFormat = (WireFormat) getServletContext().getAttribute("wireFormat");
if (wireFormat == null) {
wireFormat = createWireFormat();
}
if (wireFormat instanceof TextWireFormat) {
marshaller = TextWireFormatMarshallers.newServletMarshaller(wireFormat);
} else {
marshaller = new HttpWireFormatMarshaller(wireFormat);
}
}

@Override
Expand All @@ -104,8 +111,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t

packet = (Command)transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS);

DataOutputStream stream = new DataOutputStream(response.getOutputStream());
wireFormat.marshal(packet, stream);
marshaller.marshal(packet, response.getOutputStream());
count++;
} catch (InterruptedException ignore) {
}
Expand All @@ -124,8 +130,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
stream = new GZIPInputStream(stream);
}

// Read the command directly from the reader, assuming UTF8 encoding
Command command = (Command) wireFormat.unmarshalText(new InputStreamReader(stream, "UTF-8"));
final Command command = (Command)marshaller.unmarshal(stream);

if (command instanceof WireFormatInfo) {
WireFormatInfo info = (WireFormatInfo) command;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.transport.http.marshallers;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.wireformat.WireFormat;

/**
* A {@link HttpTransportMarshaller} implementation using a {@link TextWireFormat} and UTF8 encoding.
*/
public class HttpTextWireFormatMarshaller implements HttpTransportMarshaller
{
private static final Charset CHARSET = StandardCharsets.UTF_8;
private final TextWireFormat wireFormat;

public HttpTextWireFormatMarshaller(final TextWireFormat wireFormat) {
this.wireFormat = wireFormat;
}

@Override
public void marshal(final Object command, final OutputStream outputStream) throws IOException {
final String s = wireFormat.marshalText(command);
outputStream.write(s.getBytes(CHARSET));
}

@Override
public Object unmarshal(final InputStream stream) throws IOException {
return wireFormat.unmarshalText(new InputStreamReader(stream, CHARSET));
}

@Override
public WireFormat getWireFormat() {
return wireFormat;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.transport.http.marshallers;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.activemq.wireformat.WireFormat;

/**
* A generic interface for marshallers used for HTTP communication.
*/
public interface HttpTransportMarshaller
{
/**
* The implementations of this method should be able to marshall the supplied object into the output stream.
*
* @param command the object to marshall
* @param outputStream output stream for the serialised form.
* @throws IOException
*/
void marshal(final Object command, final OutputStream outputStream) throws IOException;

/**
* The implementations of this method handle unmarshalling of objects from a wire format into Java objects.
*
* @param stream the stream with the serialised form of an object
* @return the deserialised object
* @throws IOException
*/
Object unmarshal(final InputStream stream) throws IOException;

/**
*
* @return the wire format used by this marshaller
*/
WireFormat getWireFormat();
}
Loading