Skip to content
Permalink
Browse files
WSCOMMONS-543: Decouple org.apache.axis2.transport.base.datagram from…
… SocketAddress so that it's initial design is preserved.
  • Loading branch information
veithen committed May 24, 2010
1 parent 5f1fe22 commit 543cb6d79c2478057a80d7e93adc9bccd409dd52
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 105 deletions.
@@ -20,7 +20,6 @@

import java.io.IOException;
import java.net.SocketException;
import java.net.SocketAddress;

import org.apache.axis2.AxisFault;
import org.apache.axis2.transport.base.AbstractTransportListenerEx;
@@ -35,11 +34,11 @@
protected void doInit() throws AxisFault {
DatagramDispatcherCallback callback = new DatagramDispatcherCallback() {

public void receive(SocketAddress address,
DatagramEndpoint endpoint,
public void receive(DatagramEndpoint endpoint,
byte[] data,
int length) {
workerPool.execute(new ProcessPacketTask(address, endpoint, data, length));
int length,
DatagramOutTransportInfo outInfo) {
workerPool.execute(new ProcessPacketTask(endpoint, data, length, outInfo));
}
};

@@ -18,12 +18,22 @@
*/
package org.apache.axis2.transport.base.datagram;

import java.nio.channels.DatagramChannel;
import java.net.SocketAddress;

public interface DatagramDispatcherCallback {
void receive(SocketAddress address,
DatagramEndpoint endpoint,
/**
* Receive a message and inject it into the Axis2 engine.
*
* @param endpoint
* the endpoint that received the message
* @param data
* the data of the message
* @param length
* the length of the message
* @param outInfo
* The out transport information that should be used to send back a response. This
* should only be set by transports that support an explicit back channel.
*/
void receive(DatagramEndpoint endpoint,
byte[] data,
int length);
int length,
DatagramOutTransportInfo outInfo);
}
@@ -18,21 +18,8 @@

import org.apache.axis2.transport.OutTransportInfo;

import java.nio.channels.DatagramChannel;
import java.net.SocketAddress;

public class DatagramOutTransportInfo implements OutTransportInfo {
//out transport for back chanel
protected SocketAddress sourceAddress;
protected String contentType;

public SocketAddress getSourceAddress() {
return sourceAddress;
}

public void setSourceAddress(SocketAddress sourceAddress) {
this.sourceAddress = sourceAddress;
}
private String contentType;

public String getContentType() {
return contentType;
@@ -20,8 +20,6 @@

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.channels.DatagramChannel;
import java.net.SocketAddress;

import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.context.MessageContext;
@@ -45,19 +43,16 @@ public class ProcessPacketTask implements Runnable {
private final int length;

//back channel data
private DatagramChannel datagramChannel;
private SocketAddress address;
private DatagramOutTransportInfo outInfo;

public ProcessPacketTask(SocketAddress address,
DatagramEndpoint endpoint,
public ProcessPacketTask(DatagramEndpoint endpoint,
byte[] data,
int length) {
int length,
DatagramOutTransportInfo outInfo) {
this.endpoint = endpoint;
this.data = data;
this.length = length;

this.datagramChannel = datagramChannel;
this.address = address;
this.outInfo = outInfo;
}

public void run() {
@@ -68,12 +63,12 @@ public void run() {
SOAPEnvelope envelope = TransportUtils.createSOAPMessage(msgContext, inputStream, endpoint.getContentType());
msgContext.setEnvelope(envelope);

//create and out transport info object
DatagramOutTransportInfo datagramOutTransportInfo = new DatagramOutTransportInfo();
datagramOutTransportInfo.setContentType(endpoint.getContentType());
datagramOutTransportInfo.setSourceAddress(address);

msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, datagramOutTransportInfo);
if (outInfo != null) {
if (outInfo.getContentType() == null) {
outInfo.setContentType(endpoint.getContentType());
}
msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, outInfo);
}

AxisEngine.receive(msgContext);
metrics.incrementMessagesReceived();
@@ -20,7 +20,6 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
@@ -249,12 +248,12 @@ private void receive(Endpoint endpoint, DatagramChannel channel) {
try {
byte[] data = new byte[endpoint.getMaxPacketSize()];
ByteBuffer buffer = ByteBuffer.wrap(data);
SocketAddress address = channel.receive(buffer);
InetSocketAddress address = (InetSocketAddress)channel.receive(buffer);
int length = buffer.position();
if (log.isDebugEnabled()) {
log.debug("Received packet from " + address + " with length " + length);
}
callback.receive(address, endpoint, data, length);
callback.receive(endpoint, data, length, new UDPOutTransportInfo(address));
} catch (IOException ex) {
endpoint.getMetrics().incrementFaultsReceiving();
log.error("Error receiving UDP packet", ex);
@@ -18,19 +18,18 @@
*/
package org.apache.axis2.transport.udp;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.axis2.AxisFault;
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo;

/**
* Holder of information to send an outgoing message to a UDP destination.
*/
public class UDPOutTransportInfo extends DatagramOutTransportInfo {
private String host;
private int port;
private InetSocketAddress address;

public UDPOutTransportInfo(String eprString) throws AxisFault {
URI epr;
@@ -45,24 +44,19 @@ public UDPOutTransportInfo(String eprString) throws AxisFault {
if (!params.startsWith("contentType=")) {
throw new AxisFault("Invalid endpoint reference: no content type");
}
host = epr.getHost();
port = epr.getPort();
contentType = params.substring(12);
address = new InetSocketAddress(epr.getHost(), epr.getPort());
setContentType(params.substring(12));
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;

public UDPOutTransportInfo(InetSocketAddress address) {
this.address = address;
}

public int getPort() {
return port;
public InetSocketAddress getAddress() {
return address;
}

public void setPort(int port) {
this.port = port;
public void setAddress(InetSocketAddress address) {
this.address = address;
}
}
@@ -23,8 +23,6 @@
import java.io.ByteArrayInputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.ByteBuffer;

import org.apache.axiom.om.OMOutputFormat;
import org.apache.axiom.soap.SOAPEnvelope;
@@ -39,7 +37,6 @@
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.transport.base.AbstractTransportSender;
import org.apache.axis2.transport.base.BaseUtils;
import org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo;
import org.apache.commons.logging.LogFactory;

import javax.xml.stream.XMLStreamException;
@@ -63,55 +60,33 @@ public void init(ConfigurationContext cfgCtx, TransportOutDescription transportO
@Override
public void sendMessage(MessageContext msgContext, String targetEPR,
OutTransportInfo outTransportInfo) throws AxisFault {
UDPOutTransportInfo udpOutInfo;
if ((targetEPR == null) && (outTransportInfo != null)) {
// this can happen only at the server side and send the message using back chanel
DatagramOutTransportInfo datagramOutTransportInfo =
(DatagramOutTransportInfo) outTransportInfo;
MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext);
OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
format.setContentType(datagramOutTransportInfo.getContentType());
byte[] payload = messageFormatter.getBytes(msgContext, format);

ByteBuffer byteBuffer = ByteBuffer.allocate(payload.length);
byteBuffer.put(payload);

DatagramSocket socket;
try {
socket = new DatagramSocket();
try {
socket.send(new DatagramPacket(payload, payload.length,
datagramOutTransportInfo.getSourceAddress()));
} finally {
socket.close();
}
} catch (IOException e) {
throw new AxisFault("Unable to send packet", e);
}

udpOutInfo = (UDPOutTransportInfo) outTransportInfo;
} else {
UDPOutTransportInfo udpOutInfo = new UDPOutTransportInfo(targetEPR);
MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext);
OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
format.setContentType(udpOutInfo.getContentType());
byte[] payload = messageFormatter.getBytes(msgContext, format);
udpOutInfo = new UDPOutTransportInfo(targetEPR);
}
MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext);
OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
format.setContentType(udpOutInfo.getContentType());
byte[] payload = messageFormatter.getBytes(msgContext, format);
try {
DatagramSocket socket = new DatagramSocket();
try {
DatagramSocket socket = new DatagramSocket();
try {
socket.send(new DatagramPacket(payload, payload.length,
InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort()));
if (!msgContext.getOptions().isUseSeparateListener() &&
!msgContext.isServerSide()){
waitForReply(msgContext, socket, udpOutInfo.getContentType());
}
}
finally {
socket.close();
socket.send(new DatagramPacket(payload, payload.length, udpOutInfo.getAddress()));
if (!msgContext.getOptions().isUseSeparateListener() &&
!msgContext.isServerSide()){
waitForReply(msgContext, socket, udpOutInfo.getContentType());
}
}
catch (IOException ex) {
throw new AxisFault("Unable to send packet", ex);
finally {
socket.close();
}
}
catch (IOException ex) {
throw new AxisFault("Unable to send packet", ex);
}
}

private void waitForReply(MessageContext messageContext, DatagramSocket datagramSocket,

0 comments on commit 543cb6d

Please sign in to comment.