Skip to content

Commit

Permalink
Fixed the 'await*()' problem of the netty.io library and created a Au…
Browse files Browse the repository at this point in the history
…tomatica Enquire Link Sender
  • Loading branch information
root authored and root committed May 27, 2013
1 parent 3f55aa1 commit bc6b7fa
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 46 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
*~
target/
/bin
lib/
.classpath
.project
2 changes: 2 additions & 0 deletions src/main/java/com/cloudhopper/smpp/SmppConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ public class SmppConstants {
public static final long DEFAULT_CONNECT_TIMEOUT = 10000;
public static final long DEFAULT_BIND_TIMEOUT = 5000;
public static final long DEFAULT_REQUEST_EXPIRY_TIMEOUT = -1; // disabled
public static final long DEFAULT_ENQUIRE_LINK_TIMER = 500;
public static final long DEFAULT_WINDOW_MONITOR_INTERVAL = -1; // disabled
public static final int DEFAULT_SERVER_MAX_CONNECTION_SIZE = 100;
public static final boolean DEFAULT_SERVER_NON_BLOCKING_SOCKETS_ENABLED = true;
public static final boolean DEFAULT_SERVER_REUSE_ADDRESS = true;
public static final boolean DEFAULT_USE_AUTOMATIC_ENQUIRE_LINK = false;

//
// SUBMIT_MULTI destination type flags
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/cloudhopper/smpp/SmppSessionConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class SmppSessionConfiguration extends SmppConnectionConfiguration {
private long requestExpiryTimeout;
private long windowMonitorInterval;
private boolean countersEnabled;

private boolean automaticEnquireLink;
private long enquireLinkTimer;

public SmppSessionConfiguration() {
this(SmppBindType.TRANSCEIVER, null, null, null);
Expand All @@ -74,6 +77,8 @@ public SmppSessionConfiguration(SmppBindType type, String systemId, String passw
this.windowWaitTimeout = SmppConstants.DEFAULT_WINDOW_WAIT_TIMEOUT;
this.requestExpiryTimeout = SmppConstants.DEFAULT_REQUEST_EXPIRY_TIMEOUT;
this.windowMonitorInterval = SmppConstants.DEFAULT_WINDOW_MONITOR_INTERVAL;
this.automaticEnquireLink = SmppConstants.DEFAULT_USE_AUTOMATIC_ENQUIRE_LINK;
this.enquireLinkTimer = SmppConstants.DEFAULT_ENQUIRE_LINK_TIMER;
this.countersEnabled = false;
}

Expand Down Expand Up @@ -228,4 +233,20 @@ public void setCountersEnabled(boolean countersEnabled) {
this.countersEnabled = countersEnabled;
}

public boolean isAutomaticEnquireLink() {
return automaticEnquireLink;
}

public void setAutomaticEnquireLink(boolean automaticEnquireLink) {
this.automaticEnquireLink = automaticEnquireLink;
}

public long getEnquireLinkTimer() {
return enquireLinkTimer;
}

public void setEnquireLinkTimer(long enquireLinkTimer) {
this.enquireLinkTimer = enquireLinkTimer;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.cloudhopper.smpp.channel;

/*
* #%L
* ch-smpp
* %%
* Copyright (C) 2009 - 2013 Cloudhopper by Twitter
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;

public class AwaitChannelFutureListener implements ChannelFutureListener {

private CountDownLatch countDownLatch = new CountDownLatch(1);
private ChannelFuture channelFuture;

@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
this.channelFuture = channelFuture;
countDownLatch.countDown();
}

public boolean timeout(long timeoutInMillis) throws InterruptedException {
return !countDownLatch.await(timeoutInMillis, TimeUnit.MILLISECONDS);
}

public void await() throws InterruptedException {
countDownLatch.await();
}

public boolean successed() {
return channelFuture != null && channelFuture.isSuccess();
}

public Throwable getCause() {
return channelFuture != null ? channelFuture.getCause() : null;
}

public Channel getChannel() {
return channelFuture != null ? channelFuture.getChannel() : null;
}

}
55 changes: 30 additions & 25 deletions src/main/java/com/cloudhopper/smpp/impl/DefaultSmppClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,35 @@
* #L%
*/

import com.cloudhopper.smpp.SmppClient;
import com.cloudhopper.smpp.util.DaemonExecutors;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import javax.net.ssl.SSLEngine;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudhopper.smpp.SmppBindType;
import com.cloudhopper.smpp.type.SmppChannelException;
import com.cloudhopper.smpp.SmppClient;
import com.cloudhopper.smpp.SmppSession;
import com.cloudhopper.smpp.SmppSessionConfiguration;
import com.cloudhopper.smpp.SmppSessionHandler;
import com.cloudhopper.smpp.channel.AwaitChannelFutureListener;
import com.cloudhopper.smpp.channel.SmppChannelConstants;
import com.cloudhopper.smpp.type.SmppTimeoutException;
import com.cloudhopper.smpp.channel.SmppClientConnector;
import com.cloudhopper.smpp.channel.SmppSessionPduDecoder;
import com.cloudhopper.smpp.channel.SmppSessionLogger;
import com.cloudhopper.smpp.channel.SmppSessionWrapper;
import com.cloudhopper.smpp.channel.SmppSessionPduDecoder;
import com.cloudhopper.smpp.channel.SmppSessionThreadRenamer;
import com.cloudhopper.smpp.channel.SmppSessionWrapper;
import com.cloudhopper.smpp.pdu.BaseBind;
import com.cloudhopper.smpp.pdu.BaseBindResp;
import com.cloudhopper.smpp.pdu.BindReceiver;
Expand All @@ -45,21 +60,10 @@
import com.cloudhopper.smpp.type.SmppBindException;
import com.cloudhopper.smpp.type.SmppChannelConnectException;
import com.cloudhopper.smpp.type.SmppChannelConnectTimeoutException;
import com.cloudhopper.smpp.type.SmppChannelException;
import com.cloudhopper.smpp.type.SmppTimeoutException;
import com.cloudhopper.smpp.type.UnrecoverablePduException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.SSLEngine;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudhopper.smpp.util.DaemonExecutors;

/**
* Default implementation to "bootstrap" client SMPP sessions (create & bind).
Expand Down Expand Up @@ -259,22 +263,23 @@ protected Channel createConnectedChannel(String host, int port, long connectTime
// a socket address used to "bind" to the remote system
InetSocketAddress socketAddr = new InetSocketAddress(host, port);

AwaitChannelFutureListener listener = new AwaitChannelFutureListener();
// attempt to connect to the remote system
ChannelFuture connectFuture = this.clientBootstrap.connect(socketAddr);
connectFuture.addListener(listener);

// wait until the connection is made successfully
boolean timeout = !connectFuture.await(connectTimeoutMillis);
boolean timeout = listener.timeout(connectTimeoutMillis);

if (timeout) {
throw new SmppChannelConnectTimeoutException("Unable to connect to host [" + host + "] and port [" + port + "] within " + connectTimeoutMillis + " ms");
}

if (!connectFuture.isSuccess()) {
throw new SmppChannelConnectException("Unable to connect to host [" + host + "] and port [" + port + "]: " + connectFuture.getCause().getMessage(), connectFuture.getCause());
if (!listener.successed()) {
throw new SmppChannelConnectException("Unable to connect to host [" + host + "] and port [" + port + "]: " + listener.getCause().getMessage(), listener.getCause());
}

// if we get here, then we were able to connect and get a channel
return connectFuture.getChannel();
return listener.getChannel();
}

}
64 changes: 43 additions & 21 deletions src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,23 @@
* #L%
*/

import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import javax.management.ObjectName;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudhopper.commons.util.PeriodFormatterUtil;
import com.cloudhopper.smpp.jmx.DefaultSmppSessionMXBean;
import com.cloudhopper.commons.util.windowing.DuplicateKeyException;
import com.cloudhopper.commons.util.windowing.OfferTimeoutException;
import com.cloudhopper.commons.util.windowing.Window;
Expand All @@ -30,11 +45,11 @@
import com.cloudhopper.smpp.SmppBindType;
import com.cloudhopper.smpp.SmppConstants;
import com.cloudhopper.smpp.SmppServerSession;
import com.cloudhopper.smpp.type.SmppChannelException;
import com.cloudhopper.smpp.SmppSessionConfiguration;
import com.cloudhopper.smpp.SmppSessionCounters;
import com.cloudhopper.smpp.SmppSessionHandler;
import com.cloudhopper.smpp.type.SmppTimeoutException;
import com.cloudhopper.smpp.channel.AwaitChannelFutureListener;
import com.cloudhopper.smpp.jmx.DefaultSmppSessionMXBean;
import com.cloudhopper.smpp.pdu.BaseBind;
import com.cloudhopper.smpp.pdu.BaseBindResp;
import com.cloudhopper.smpp.pdu.EnquireLink;
Expand All @@ -52,23 +67,13 @@
import com.cloudhopper.smpp.transcoder.PduTranscoder;
import com.cloudhopper.smpp.type.RecoverablePduException;
import com.cloudhopper.smpp.type.SmppBindException;
import com.cloudhopper.smpp.type.SmppChannelException;
import com.cloudhopper.smpp.type.SmppTimeoutException;
import com.cloudhopper.smpp.type.UnrecoverablePduException;
import com.cloudhopper.smpp.util.EnquireLinkSender;
import com.cloudhopper.smpp.util.SequenceNumber;
import com.cloudhopper.smpp.util.SmppSessionUtil;
import com.cloudhopper.smpp.util.SmppUtil;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default implementation of either an ESME or SMSC SMPP session.
Expand Down Expand Up @@ -99,6 +104,8 @@ public class DefaultSmppSession implements SmppServerSession, SmppSessionChannel
private BaseBindResp preparedBindResponse;
private ScheduledExecutorService monitorExecutor;
private DefaultSmppSessionCounters counters;

private EnquireLinkSender enquireLinkSender;

/**
* Creates an SmppSession for a server-based session.
Expand Down Expand Up @@ -170,6 +177,10 @@ public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration
if (configuration.isCountersEnabled()) {
this.counters = new DefaultSmppSessionCounters();
}

if (configuration.isAutomaticEnquireLink()) {
this.enquireLinkSender = new EnquireLinkSender(this);
}
}

public void registerMBean(String objectName) {
Expand Down Expand Up @@ -216,6 +227,10 @@ public Type getRemoteType() {
protected void setBound() {
this.state.set(STATE_BOUND);
this.boundTime.set(System.currentTimeMillis());

if (enquireLinkSender != null) {
enquireLinkSender.start();
}
}

@Override
Expand Down Expand Up @@ -518,10 +533,14 @@ public WindowFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest pd
}

// write the pdu out & wait till its written
ChannelFuture channelFuture = this.channel.write(buffer).await();
AwaitChannelFutureListener listener = new AwaitChannelFutureListener();
ChannelFuture channelFuture = this.channel.write(buffer);
channelFuture.addListener(listener);

listener.await();

// check if the write was a success
if (!channelFuture.isSuccess()) {
if (!listener.successed()) {
// the write failed, make sure to throw an exception
throw new SmppChannelException(channelFuture.getCause().getMessage(), channelFuture.getCause());
}
Expand Down Expand Up @@ -557,16 +576,19 @@ public void sendResponsePdu(PduResponse pdu) throws RecoverablePduException, Unr
}

// write the pdu out & wait till its written
ChannelFuture channelFuture = this.channel.write(buffer).await();
AwaitChannelFutureListener listener = new AwaitChannelFutureListener();
ChannelFuture channelFuture = this.channel.write(buffer);
channelFuture.addListener(listener);

listener.await();

// check if the write was a success
if (!channelFuture.isSuccess()) {
if (!listener.successed()) {
// the write failed, make sure to throw an exception
throw new SmppChannelException(channelFuture.getCause().getMessage(), channelFuture.getCause());
}
}

@SuppressWarnings("unchecked")
@Override
public void firePduReceived(Pdu pdu) {
if (configuration.getLoggingOptions().isLogPduEnabled()) {
Expand Down
Loading

0 comments on commit bc6b7fa

Please sign in to comment.