diff --git a/.gitignore b/.gitignore index bcc52fed..d844bfbc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,6 @@ *~ target/ +/bin +lib/ +.classpath +.project diff --git a/src/main/java/com/cloudhopper/smpp/SmppConstants.java b/src/main/java/com/cloudhopper/smpp/SmppConstants.java index 6eb18c10..434b0f5b 100644 --- a/src/main/java/com/cloudhopper/smpp/SmppConstants.java +++ b/src/main/java/com/cloudhopper/smpp/SmppConstants.java @@ -52,10 +52,12 @@ public class SmppConstants { public static final long DEFAULT_CONNECT_TIMEOUT = 10000; public static final long DEFAULT_BIND_TIMEOUT = 5000; public static final long DEFAULT_REQUEST_EXPIRY_TIMEOUT = -1; // disabled + public static final long DEFAULT_ENQUIRE_LINK_TIMER = 500; public static final long DEFAULT_WINDOW_MONITOR_INTERVAL = -1; // disabled public static final int DEFAULT_SERVER_MAX_CONNECTION_SIZE = 100; public static final boolean DEFAULT_SERVER_NON_BLOCKING_SOCKETS_ENABLED = true; public static final boolean DEFAULT_SERVER_REUSE_ADDRESS = true; + public static final boolean DEFAULT_USE_AUTOMATIC_ENQUIRE_LINK = false; // // SUBMIT_MULTI destination type flags diff --git a/src/main/java/com/cloudhopper/smpp/SmppSessionConfiguration.java b/src/main/java/com/cloudhopper/smpp/SmppSessionConfiguration.java index 0b7d5c95..e4a0ce3d 100644 --- a/src/main/java/com/cloudhopper/smpp/SmppSessionConfiguration.java +++ b/src/main/java/com/cloudhopper/smpp/SmppSessionConfiguration.java @@ -53,6 +53,9 @@ public class SmppSessionConfiguration extends SmppConnectionConfiguration { private long requestExpiryTimeout; private long windowMonitorInterval; private boolean countersEnabled; + + private boolean automaticEnquireLink; + private long enquireLinkTimer; public SmppSessionConfiguration() { this(SmppBindType.TRANSCEIVER, null, null, null); @@ -74,6 +77,8 @@ public SmppSessionConfiguration(SmppBindType type, String systemId, String passw this.windowWaitTimeout = SmppConstants.DEFAULT_WINDOW_WAIT_TIMEOUT; this.requestExpiryTimeout = SmppConstants.DEFAULT_REQUEST_EXPIRY_TIMEOUT; this.windowMonitorInterval = SmppConstants.DEFAULT_WINDOW_MONITOR_INTERVAL; + this.automaticEnquireLink = SmppConstants.DEFAULT_USE_AUTOMATIC_ENQUIRE_LINK; + this.enquireLinkTimer = SmppConstants.DEFAULT_ENQUIRE_LINK_TIMER; this.countersEnabled = false; } @@ -228,4 +233,20 @@ public void setCountersEnabled(boolean countersEnabled) { this.countersEnabled = countersEnabled; } + public boolean isAutomaticEnquireLink() { + return automaticEnquireLink; + } + + public void setAutomaticEnquireLink(boolean automaticEnquireLink) { + this.automaticEnquireLink = automaticEnquireLink; + } + + public long getEnquireLinkTimer() { + return enquireLinkTimer; + } + + public void setEnquireLinkTimer(long enquireLinkTimer) { + this.enquireLinkTimer = enquireLinkTimer; + } + } diff --git a/src/main/java/com/cloudhopper/smpp/channel/AwaitChannelFutureListener.java b/src/main/java/com/cloudhopper/smpp/channel/AwaitChannelFutureListener.java new file mode 100644 index 00000000..aa376c81 --- /dev/null +++ b/src/main/java/com/cloudhopper/smpp/channel/AwaitChannelFutureListener.java @@ -0,0 +1,61 @@ +package com.cloudhopper.smpp.channel; + +/* + * #%L + * ch-smpp + * %% + * Copyright (C) 2009 - 2013 Cloudhopper by Twitter + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; + +public class AwaitChannelFutureListener implements ChannelFutureListener { + + private CountDownLatch countDownLatch = new CountDownLatch(1); + private ChannelFuture channelFuture; + + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + this.channelFuture = channelFuture; + countDownLatch.countDown(); + } + + public boolean timeout(long timeoutInMillis) throws InterruptedException { + return !countDownLatch.await(timeoutInMillis, TimeUnit.MILLISECONDS); + } + + public void await() throws InterruptedException { + countDownLatch.await(); + } + + public boolean successed() { + return channelFuture != null && channelFuture.isSuccess(); + } + + public Throwable getCause() { + return channelFuture != null ? channelFuture.getCause() : null; + } + + public Channel getChannel() { + return channelFuture != null ? channelFuture.getChannel() : null; + } + +} diff --git a/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppClient.java b/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppClient.java index be7fdbb3..4f20b15e 100644 --- a/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppClient.java +++ b/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppClient.java @@ -20,20 +20,35 @@ * #L% */ -import com.cloudhopper.smpp.SmppClient; -import com.cloudhopper.smpp.util.DaemonExecutors; +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import javax.net.ssl.SSLEngine; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.handler.ssl.SslHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.cloudhopper.smpp.SmppBindType; -import com.cloudhopper.smpp.type.SmppChannelException; +import com.cloudhopper.smpp.SmppClient; import com.cloudhopper.smpp.SmppSession; import com.cloudhopper.smpp.SmppSessionConfiguration; import com.cloudhopper.smpp.SmppSessionHandler; +import com.cloudhopper.smpp.channel.AwaitChannelFutureListener; import com.cloudhopper.smpp.channel.SmppChannelConstants; -import com.cloudhopper.smpp.type.SmppTimeoutException; import com.cloudhopper.smpp.channel.SmppClientConnector; -import com.cloudhopper.smpp.channel.SmppSessionPduDecoder; import com.cloudhopper.smpp.channel.SmppSessionLogger; -import com.cloudhopper.smpp.channel.SmppSessionWrapper; +import com.cloudhopper.smpp.channel.SmppSessionPduDecoder; import com.cloudhopper.smpp.channel.SmppSessionThreadRenamer; +import com.cloudhopper.smpp.channel.SmppSessionWrapper; import com.cloudhopper.smpp.pdu.BaseBind; import com.cloudhopper.smpp.pdu.BaseBindResp; import com.cloudhopper.smpp.pdu.BindReceiver; @@ -45,21 +60,10 @@ import com.cloudhopper.smpp.type.SmppBindException; import com.cloudhopper.smpp.type.SmppChannelConnectException; import com.cloudhopper.smpp.type.SmppChannelConnectTimeoutException; +import com.cloudhopper.smpp.type.SmppChannelException; +import com.cloudhopper.smpp.type.SmppTimeoutException; import com.cloudhopper.smpp.type.UnrecoverablePduException; -import java.net.InetSocketAddress; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import javax.net.ssl.SSLEngine; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.handler.ssl.SslHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.cloudhopper.smpp.util.DaemonExecutors; /** * Default implementation to "bootstrap" client SMPP sessions (create & bind). @@ -259,22 +263,23 @@ protected Channel createConnectedChannel(String host, int port, long connectTime // a socket address used to "bind" to the remote system InetSocketAddress socketAddr = new InetSocketAddress(host, port); + AwaitChannelFutureListener listener = new AwaitChannelFutureListener(); // attempt to connect to the remote system ChannelFuture connectFuture = this.clientBootstrap.connect(socketAddr); + connectFuture.addListener(listener); // wait until the connection is made successfully - boolean timeout = !connectFuture.await(connectTimeoutMillis); + boolean timeout = listener.timeout(connectTimeoutMillis); if (timeout) { throw new SmppChannelConnectTimeoutException("Unable to connect to host [" + host + "] and port [" + port + "] within " + connectTimeoutMillis + " ms"); } - if (!connectFuture.isSuccess()) { - throw new SmppChannelConnectException("Unable to connect to host [" + host + "] and port [" + port + "]: " + connectFuture.getCause().getMessage(), connectFuture.getCause()); + if (!listener.successed()) { + throw new SmppChannelConnectException("Unable to connect to host [" + host + "] and port [" + port + "]: " + listener.getCause().getMessage(), listener.getCause()); } // if we get here, then we were able to connect and get a channel - return connectFuture.getChannel(); + return listener.getChannel(); } - } diff --git a/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java b/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java index 3e718b88..18ff5499 100644 --- a/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java +++ b/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java @@ -20,8 +20,23 @@ * #L% */ +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.ObjectName; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.cloudhopper.commons.util.PeriodFormatterUtil; -import com.cloudhopper.smpp.jmx.DefaultSmppSessionMXBean; import com.cloudhopper.commons.util.windowing.DuplicateKeyException; import com.cloudhopper.commons.util.windowing.OfferTimeoutException; import com.cloudhopper.commons.util.windowing.Window; @@ -30,11 +45,11 @@ import com.cloudhopper.smpp.SmppBindType; import com.cloudhopper.smpp.SmppConstants; import com.cloudhopper.smpp.SmppServerSession; -import com.cloudhopper.smpp.type.SmppChannelException; import com.cloudhopper.smpp.SmppSessionConfiguration; import com.cloudhopper.smpp.SmppSessionCounters; import com.cloudhopper.smpp.SmppSessionHandler; -import com.cloudhopper.smpp.type.SmppTimeoutException; +import com.cloudhopper.smpp.channel.AwaitChannelFutureListener; +import com.cloudhopper.smpp.jmx.DefaultSmppSessionMXBean; import com.cloudhopper.smpp.pdu.BaseBind; import com.cloudhopper.smpp.pdu.BaseBindResp; import com.cloudhopper.smpp.pdu.EnquireLink; @@ -52,23 +67,13 @@ import com.cloudhopper.smpp.transcoder.PduTranscoder; import com.cloudhopper.smpp.type.RecoverablePduException; import com.cloudhopper.smpp.type.SmppBindException; +import com.cloudhopper.smpp.type.SmppChannelException; +import com.cloudhopper.smpp.type.SmppTimeoutException; import com.cloudhopper.smpp.type.UnrecoverablePduException; +import com.cloudhopper.smpp.util.EnquireLinkSender; import com.cloudhopper.smpp.util.SequenceNumber; import com.cloudhopper.smpp.util.SmppSessionUtil; import com.cloudhopper.smpp.util.SmppUtil; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.nio.channels.ClosedChannelException; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import javax.management.ObjectName; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Default implementation of either an ESME or SMSC SMPP session. @@ -99,6 +104,8 @@ public class DefaultSmppSession implements SmppServerSession, SmppSessionChannel private BaseBindResp preparedBindResponse; private ScheduledExecutorService monitorExecutor; private DefaultSmppSessionCounters counters; + + private EnquireLinkSender enquireLinkSender; /** * Creates an SmppSession for a server-based session. @@ -170,6 +177,10 @@ public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration if (configuration.isCountersEnabled()) { this.counters = new DefaultSmppSessionCounters(); } + + if (configuration.isAutomaticEnquireLink()) { + this.enquireLinkSender = new EnquireLinkSender(this); + } } public void registerMBean(String objectName) { @@ -216,6 +227,10 @@ public Type getRemoteType() { protected void setBound() { this.state.set(STATE_BOUND); this.boundTime.set(System.currentTimeMillis()); + + if (enquireLinkSender != null) { + enquireLinkSender.start(); + } } @Override @@ -518,10 +533,14 @@ public WindowFuture sendRequestPdu(PduRequest pd } // write the pdu out & wait till its written - ChannelFuture channelFuture = this.channel.write(buffer).await(); + AwaitChannelFutureListener listener = new AwaitChannelFutureListener(); + ChannelFuture channelFuture = this.channel.write(buffer); + channelFuture.addListener(listener); + + listener.await(); // check if the write was a success - if (!channelFuture.isSuccess()) { + if (!listener.successed()) { // the write failed, make sure to throw an exception throw new SmppChannelException(channelFuture.getCause().getMessage(), channelFuture.getCause()); } @@ -557,16 +576,19 @@ public void sendResponsePdu(PduResponse pdu) throws RecoverablePduException, Unr } // write the pdu out & wait till its written - ChannelFuture channelFuture = this.channel.write(buffer).await(); + AwaitChannelFutureListener listener = new AwaitChannelFutureListener(); + ChannelFuture channelFuture = this.channel.write(buffer); + channelFuture.addListener(listener); + + listener.await(); // check if the write was a success - if (!channelFuture.isSuccess()) { + if (!listener.successed()) { // the write failed, make sure to throw an exception throw new SmppChannelException(channelFuture.getCause().getMessage(), channelFuture.getCause()); } } - @SuppressWarnings("unchecked") @Override public void firePduReceived(Pdu pdu) { if (configuration.getLoggingOptions().isLogPduEnabled()) { diff --git a/src/main/java/com/cloudhopper/smpp/util/EnquireLinkSender.java b/src/main/java/com/cloudhopper/smpp/util/EnquireLinkSender.java new file mode 100644 index 00000000..ae4b6887 --- /dev/null +++ b/src/main/java/com/cloudhopper/smpp/util/EnquireLinkSender.java @@ -0,0 +1,95 @@ +package com.cloudhopper.smpp.util; + +/* + * #%L + * ch-smpp + * %% + * Copyright (C) 2009 - 2013 Cloudhopper by Twitter + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.cloudhopper.smpp.SmppSession; +import com.cloudhopper.smpp.impl.SmppSessionChannelListener; +import com.cloudhopper.smpp.pdu.EnquireLink; +import com.cloudhopper.smpp.pdu.EnquireLinkResp; +import com.cloudhopper.smpp.type.SmppChannelException; + +public class EnquireLinkSender implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(EnquireLinkSender.class); + + private SmppSession session; + private SequenceNumber sequence; + + private boolean started; + private Thread thread; + + public EnquireLinkSender(SmppSession session) { + if (session == null) { + throw new NullPointerException("The SmppSession cannot be null."); + } + + this.session = session; + this.thread = new Thread(this); + + this.sequence = new SequenceNumber(); + } + + public void start() { + if (!started) { + started = true; + thread.start(); + } + + } + + public void stop() { + started = false; + } + + @Override + public void run() { + while (started) { + try { + Thread.sleep(session.getConfiguration().getEnquireLinkTimer()); + + if (session.isBound()) { + EnquireLink enquireLink = new EnquireLink(); + enquireLink.setSequenceNumber(sequence.next()); + + EnquireLinkResp response = session.enquireLink(enquireLink, session.getConfiguration().getRequestExpiryTimeout()); + response.getCommandStatus(); + } else if (session.isClosed()) { + stop(); + } + } catch (SmppChannelException e) { + stop(); + session.close(); + + if (session instanceof SmppSessionChannelListener) { + ((SmppSessionChannelListener) session).fireChannelClosed(); + } + } catch (Exception e) { + logger.error(e.getLocalizedMessage(), e); + } + } + } + +} \ No newline at end of file