Skip to content
Permalink
Browse files
Applied Hiranya's patch for AXIS2-4722 (AXIS2-4722-update2.patch) wit…
…hout changes.
  • Loading branch information
veithen committed May 23, 2010
1 parent 31a4858 commit 29072766f6a61d107a11a26016cd514bcea4465c
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 24 deletions.
@@ -173,7 +173,7 @@ public void start() throws AxisFault {
state = BaseConstants.STARTED;
// register to receive updates on services for lifetime management
// cfgCtx.getAxisConfiguration().addObservers(axisObserver);
log.info(getTransportName().toUpperCase() + " Listener started");
log.info(getTransportName().toUpperCase() + " listener started");
// iterate through deployed services and start
serviceTracker.start();
}
@@ -45,6 +45,8 @@
/** A Map of service name to the protocol endpoints */
private List<E> endpoints = new ArrayList<E>();

protected boolean useGlobalListener = false;

@Override
public void init(ConfigurationContext cfgCtx,
TransportInDescription transportIn) throws AxisFault {
@@ -112,6 +114,8 @@ protected final void startListeningForService(AxisService service) throws AxisFa
if (endpoint.loadConfiguration(service)) {
startEndpoint(endpoint);
endpoints.add(endpoint);
} else if (useGlobalListener) {
return;
} else {
throw new AxisFault("Service doesn't have configuration information for transport " +
getTransportName());
@@ -38,6 +38,14 @@ public void init(ConfigurationContext cfgCtx, TransportInDescription transportIn
throws AxisFault {

super.init(cfgCtx, transportIn);
initDispatcher();
}

private void initDispatcher() throws AxisFault {
if (dispatcher != null) {
return;
}

DatagramDispatcherCallback callback = new DatagramDispatcherCallback() {

public void receive(SocketAddress address,
@@ -47,11 +55,13 @@ public void receive(SocketAddress address,
workerPool.execute(new ProcessPacketTask(address, endpoint, data, length));
}
};

try {
dispatcher = createDispatcher(callback);
} catch (IOException ex) {
throw new AxisFault("Unable to create selector", ex);
}

try {
defaultIp = org.apache.axis2.util.Utils.getIpAddress(cfgCtx.getAxisConfiguration());
} catch (SocketException ex) {
@@ -70,16 +80,19 @@ protected final E createEndpoint() {

@Override
protected void startEndpoint(E endpoint) throws AxisFault {
initDispatcher();

try {
dispatcher.addEndpoint(endpoint);
} catch (IOException ex) {
throw new AxisFault("Unable to listen on endpoint "
+ endpoint.getEndpointReferences(defaultIp)[0], ex);
}
if (log.isDebugEnabled()) {
log.debug("Started listening on endpoint " + endpoint.getEndpointReferences(defaultIp)[0]
+ " [contentType=" + endpoint.getContentType()
+ "; service=" + endpoint.getServiceName() + "]");
log.debug("Started listening on endpoint " +
endpoint.getEndpointReferences(defaultIp)[0] +
" [contentType=" + endpoint.getContentType() +
"; service=" + endpoint.getServiceName() + "]");
}
}

@@ -88,7 +101,8 @@ protected void stopEndpoint(E endpoint) {
try {
dispatcher.removeEndpoint(endpoint);
} catch (IOException ex) {
log.error("I/O exception while stopping listener for service " + endpoint.getServiceName(), ex);
log.error("I/O exception while stopping listener for service " +
endpoint.getServiceName(), ex);
}
}

@@ -50,7 +50,7 @@
* packet received, a {@link ProcessPacketTask} instance is created
* and dispatched to a worker thread from the configured pool.
* <p>
* The methods {@link #addEndpoint(Endpoint)}, {@link #removeEndpoint(String)}
* The methods {@link #addEndpoint(Endpoint)}, {@link #removeEndpoint(Endpoint)}
* and {@link #stop()} are thread safe and may be called from any thread.
* However, to avoid concurrency issues, the operation on the underlying
* {@link Selector} will always be executed by the thread executing the
@@ -88,7 +88,8 @@ public void execute(Selector selector) {

private final DatagramDispatcherCallback callback;
private final Selector selector;
private final Queue<SelectorOperation> selectorOperationQueue = new ConcurrentLinkedQueue<SelectorOperation>();
private final Queue<SelectorOperation> selectorOperationQueue =
new ConcurrentLinkedQueue<SelectorOperation>();

/**
* Constructor.
@@ -120,6 +121,7 @@ public void doExecute(Selector selector) throws IOException {
channel.register(selector, SelectionKey.OP_READ, endpoint);
}
});
log.info("UDP endpoint started on port : " + endpoint.getPort());
}

/**
@@ -39,11 +39,18 @@
* than the specified length will be truncated.</dd>
* </dl>
*
* @see org.apache.synapse.transport.udp
* @see org.apache.axis2.transport.udp
*/
public class UDPListener extends AbstractDatagramTransportListener<Endpoint> implements ManagementSupport {
public class UDPListener extends AbstractDatagramTransportListener<Endpoint>
implements ManagementSupport {

public UDPListener() {
this.useGlobalListener = true;
}

@Override
protected IODispatcher createDispatcher(DatagramDispatcherCallback callback) throws IOException {
protected IODispatcher createDispatcher(DatagramDispatcherCallback callback)
throws IOException {
IODispatcher dispatcher = new IODispatcher(callback);
new Thread(dispatcher, getTransportName() + "-dispatcher").start();
// Start a new thread for the I/O dispatcher
@@ -24,8 +24,6 @@
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
import java.nio.ByteBuffer;

import org.apache.axiom.om.OMOutputFormat;
@@ -49,23 +47,26 @@
/**
* Transport sender for the UDP protocol.
*
* @see org.apache.synapse.transport.udp
* @see org.apache.axis2.transport.udp
*/
public class UDPSender extends AbstractTransportSender {
public UDPSender() {
log = LogFactory.getLog(UDPSender.class);
}

@Override
public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut)
throws AxisFault {
super.init(cfgCtx, transportOut);
}

@Override
public void sendMessage(MessageContext msgContext, String targetEPR, OutTransportInfo outTransportInfo) throws AxisFault {
public void sendMessage(MessageContext msgContext, String targetEPR,
OutTransportInfo outTransportInfo) throws AxisFault {
if ((targetEPR == null) && (outTransportInfo != null)) {
// this can happen only at the server side and send the message using back chanel
DatagramOutTransportInfo datagramOutTransportInfo = (DatagramOutTransportInfo) outTransportInfo;
DatagramOutTransportInfo datagramOutTransportInfo =
(DatagramOutTransportInfo) outTransportInfo;
MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext);
OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
format.setContentType(datagramOutTransportInfo.getContentType());
@@ -74,14 +75,17 @@ public void sendMessage(MessageContext msgContext, String targetEPR, OutTranspor
ByteBuffer byteBuffer = ByteBuffer.allocate(payload.length);
byteBuffer.put(payload);

DatagramSocket socket = null;
DatagramSocket socket;
try {
socket = new DatagramSocket();
socket.send(new DatagramPacket(payload, payload.length, datagramOutTransportInfo.getSourceAddress()));
try {
socket.send(new DatagramPacket(payload, payload.length,
datagramOutTransportInfo.getSourceAddress()));
} finally {
socket.close();
}
} catch (IOException e) {
throw new AxisFault("Unable to send packet", e);
} finally {
socket.close();
}

} else {
@@ -93,8 +97,10 @@ public void sendMessage(MessageContext msgContext, String targetEPR, OutTranspor
try {
DatagramSocket socket = new DatagramSocket();
try {
socket.send(new DatagramPacket(payload, payload.length, InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort()));
if (!msgContext.getOptions().isUseSeparateListener() && !msgContext.isServerSide()){
socket.send(new DatagramPacket(payload, payload.length,
InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort()));
if (!msgContext.getOptions().isUseSeparateListener() &&
!msgContext.isServerSide()){
waitForReply(msgContext, socket, udpOutInfo.getContentType());
}
}
@@ -108,12 +114,13 @@ public void sendMessage(MessageContext msgContext, String targetEPR, OutTranspor
}
}

private void waitForReply(MessageContext messageContext, DatagramSocket datagramSocket, String contentType) throws IOException {
private void waitForReply(MessageContext messageContext, DatagramSocket datagramSocket,
String contentType) throws IOException {

// piggy back message constant is used to pass a piggy back
// message context in asnych model
if (!(messageContext.getAxisOperation() instanceof OutInAxisOperation) &&
(messageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) == null)) {
messageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) == null) {
return;
}

0 comments on commit 2907276

Please sign in to comment.