diff --git a/core/smpp-extensions/src/main/java/org/restcomm/smpp/Esme.java b/core/smpp-extensions/src/main/java/org/restcomm/smpp/Esme.java index 848bb9c..0553ffd 100644 --- a/core/smpp-extensions/src/main/java/org/restcomm/smpp/Esme.java +++ b/core/smpp-extensions/src/main/java/org/restcomm/smpp/Esme.java @@ -22,6 +22,7 @@ package org.restcomm.smpp; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -158,6 +159,7 @@ public class Esme extends SslConfigurationWrapper implements XMLSerializable, Es private boolean linkRecvMessCheck = false; private boolean linkStartFirstTime = false; + private AtomicBoolean inConnectingQueue = new AtomicBoolean(false); // Default Server private SmppSession.Type smppSessionType = SmppSession.Type.SERVER; @@ -356,6 +358,11 @@ public Esme(String name, String systemId, String password, String host, int port this.maxMessageLength = maxMessageLength; } + + public AtomicBoolean getInConnectingQueue() { + return inConnectingQueue; + } + /** * @return the name */ diff --git a/core/smpp-extensions/src/main/java/org/restcomm/smpp/SmppClientOpsThread.java b/core/smpp-extensions/src/main/java/org/restcomm/smpp/SmppClientOpsThread.java index c599df4..4721e58 100644 --- a/core/smpp-extensions/src/main/java/org/restcomm/smpp/SmppClientOpsThread.java +++ b/core/smpp-extensions/src/main/java/org/restcomm/smpp/SmppClientOpsThread.java @@ -21,7 +21,10 @@ */ package org.restcomm.smpp; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; import javolution.util.FastList; import javolution.util.FastMap; @@ -52,10 +55,12 @@ public class SmppClientOpsThread implements Runnable { private static final Logger logger = Logger.getLogger(SmppClientOpsThread.class); private static final long SCHEDULE_CONNECT_DELAY = 1000 * 30; // 30 sec + private final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); protected volatile boolean started = true; - private FastList pendingChanges = new FastList(); + ConcurrentLinkedQueue workingSet = new ConcurrentLinkedQueue(); + ConcurrentLinkedQueue futureSet = new ConcurrentLinkedQueue(); private Object waitObject = new Object(); @@ -87,13 +92,18 @@ protected void setStarted(boolean started) { protected void scheduleConnect(Esme esme) { - logger.info("Initiating a Client SMPP connection for ESME: " + esme.getName()); - - synchronized (this.pendingChanges) { - this.pendingChanges.add(new ChangeRequest(esme, ChangeRequest.CONNECT, System.currentTimeMillis() - + SCHEDULE_CONNECT_DELAY)); - } - + logger.debug("Scheduling a Client SMPP connection for ESME: " + esme.getName() + + " systemId=" + esme.getSystemId()); + + long executionTime = System.currentTimeMillis() + SCHEDULE_CONNECT_DELAY; + if (esme.getInConnectingQueue().compareAndSet(false, true)) { + this.futureSet.offer(new ChangeRequest(esme, ChangeRequest.CONNECT, executionTime)); + logger.debug("Pending change request CONNECT has been added for esme: " + esme.getName() + + " with scheduled execution time on " + DATE_FORMAT.format(new Date(executionTime))); + } else { + logger.debug("Pending change request CONNECT has NOT been added for esme: " + esme.getName() + + " because it is already in queue"); + } synchronized (this.waitObject) { this.waitObject.notify(); } @@ -101,10 +111,8 @@ protected void scheduleConnect(Esme esme) { } protected void scheduleEnquireLink(Esme esme) { - synchronized (this.pendingChanges) { - this.pendingChanges.add(new ChangeRequest(esme, ChangeRequest.ENQUIRE_LINK, System.currentTimeMillis() + this.futureSet.offer(new ChangeRequest(esme, ChangeRequest.ENQUIRE_LINK, System.currentTimeMillis() + esme.getEnquireLinkDelay())); - } synchronized (this.waitObject) { this.waitObject.notify(); @@ -118,56 +126,60 @@ public void run() { if (logger.isInfoEnabled()) { logger.info("SmppClientOpsThread started."); } - + while (this.started) { - FastList pendingList = new FastList(); - - try { - synchronized (this.pendingChanges) { - Iterator changes = pendingChanges.iterator(); - - while (changes.hasNext()) { - ChangeRequest change = changes.next(); - switch (change.getType()) { - case ChangeRequest.CONNECT: - if (!change.getEsme().isStarted()) { - pendingChanges.remove(change); - } else { - if (change.getExecutionTime() <= System.currentTimeMillis()) { - pendingChanges.remove(change); - initiateConnection(change.getEsme()); - } - } - break; - case ChangeRequest.ENQUIRE_LINK: - if (!change.getEsme().isStarted()) { - pendingChanges.remove(change); - } else { - if (change.getEsme().getEnquireClientEnabled()) { - - if (change.getExecutionTime() <= System.currentTimeMillis()) { - pendingList.add(change.getEsme()); - pendingChanges.remove(change); - } - } - } - break; - } - } - } - - // Sending Enquire messages - Iterator pendingchanges = pendingList.iterator(); - while (pendingchanges.hasNext()) { - Esme change = pendingchanges.next(); - this.enquireLink(change); - } - - synchronized (this.waitObject) { - this.waitObject.wait(5000); - } - - // checking of ESME CLOSED state - TODO: we need to refactor it after finding a reason of ESME not connecting + workingSet = futureSet; + futureSet = new ConcurrentLinkedQueue(); + + FastList pendingList = new FastList(); + + ChangeRequest change = workingSet.poll(); + while (change != null) { + switch (change.getType()) { + case ChangeRequest.CONNECT: + if (!change.getEsme().isStarted()) { + change.getEsme().getInConnectingQueue().set(false); + logger.warn("ESME " + change.getEsme().getName() + " is stopped. Removing change request."); + } else { + if (change.getExecutionTime() <= System.currentTimeMillis()) { + change.getEsme().getInConnectingQueue().set(false); + initiateConnection(change.getEsme()); + } else { + futureSet.offer(change); + logger.debug("Change request for ESME " + change.getEsme().getName() + " is scheduled for later: " + + DATE_FORMAT.format(new Date(change.getExecutionTime()))); + } + } + break; + case ChangeRequest.ENQUIRE_LINK: + if (change.getEsme().isStarted()) { + if (change.getEsme().getEnquireClientEnabled()) { + if (change.getExecutionTime() <= System.currentTimeMillis()) { + pendingList.add(change.getEsme()); + } else { + futureSet.offer(change); + } + } + } + break; + } + + change = workingSet.poll(); + } + + // Sending Enquire messages + Iterator pendingchanges = pendingList.iterator(); + while (pendingchanges.hasNext()) { + Esme esme = pendingchanges.next(); + this.enquireLink(esme); + } + + try { + synchronized (this.waitObject) { + this.waitObject.wait(5000); + } + + // checking of ESME CLOSED state - TODO: we need to refactor it after finding a reason of ESME not connecting try { long curTimeStamp = System.currentTimeMillis(); for (FastList.Node n = this.esmeManagement.esmes.head(), end = this.esmeManagement.esmes.tail(); (n = n @@ -193,10 +205,10 @@ public void run() { } } catch (Throwable e) { } - - } catch (InterruptedException e) { - logger.error("Error while looping SmppClientOpsThread thread", e); - } + } catch (InterruptedException e) { + logger.error("Error while looping SmppClientOpsThread thread", e); + } + }// while if (logger.isInfoEnabled()) { @@ -246,7 +258,7 @@ private void enquireLink(Esme esme) { } else { // This should never happen - logger.warn(String.format("Sending ENQURE_LINK fialed for ESME SystemId=%s as SmppSession is =%s !", + logger.warn(String.format("Sending ENQURE_LINK failed for ESME SystemId=%s as SmppSession is =%s !", esme.getSystemId(), (smppSession == null ? null : smppSession.getStateName()))); if (smppSession != null) { @@ -257,28 +269,34 @@ private void enquireLink(Esme esme) { smppSession.getConfiguration().getName())); } } + this.scheduleConnect(esme); } } private void initiateConnection(Esme esme) { + logger.debug("Initiating connection for esme " + esme.getName() + " is started."); + // If Esme is stopped, don't try to initiate connect if (!esme.isStarted()) { + logger.warn("ESME: " + esme.getName() + " is stopped. Will not try to initiate connection."); return; } SmppSession smppSession = esme.getSmppSession(); if ((smppSession != null && smppSession.isBound()) || (smppSession != null && smppSession.isBinding())) { // If process has already begun lets not do it again + logger.debug("SMPP session is already bound or binding for ESME: " + esme.getName() + ". Will not try to initiate connection"); return; } SmppSession session0 = null; try { + logger.debug("Creating SMPP Session with ESME " + esme.getName() + " started."); SmppSessionConfiguration config0 = new SmppSessionConfiguration(); config0.setWindowSize(esme.getWindowSize()); - config0.setName(esme.getSystemId()); + config0.setName(esme.getName()); config0.setType(esme.getSmppBindType()); config0.setBindTimeout(esme.getClientBindTimeout()); config0.setHost(esme.getHost()); @@ -325,17 +343,19 @@ private void initiateConnection(Esme esme) { config0.setSslConfiguration(sslConfiguration); } + logger.debug("Binding with ESME " + esme.getName() + " systemId=" + esme.getSystemId()); session0 = clientBootstrap.bind(config0, sessionHandler); // Set in ESME + logger.debug("SMPP session has been created for ESME: " + esme.getName()); esme.setSmppSession((DefaultSmppSession) session0); // Finally set Enquire Link schedule this.scheduleEnquireLink(esme); - } catch (Exception e) { + } catch (Throwable e) { logger.error( String.format("Exception when trying to bind client SMPP connection for ESME systemId=%s", - esme.getSystemId()), e); + esme.getSystemId()) + " name = " + esme.getName(), e); if (session0 != null) { session0.close(); } @@ -373,7 +393,9 @@ public String lookupTlvTagName(short arg0) { public void fireChannelUnexpectedlyClosed() { this.wrappedSmppSessionHandler.fireChannelUnexpectedlyClosed(); - this.esme.getSmppSession().close(); + if (this.esme.getSmppSession() != null) { + this.esme.getSmppSession().close(); + } // Schedule the connection again scheduleConnect(this.esme); @@ -409,7 +431,9 @@ public void fireUnknownThrowable(Throwable e) { this.wrappedSmppSessionHandler.fireUnknownThrowable(e); // TODO is this ok? - this.esme.getSmppSession().close(); + if (this.esme.getSmppSession() != null) { + this.esme.getSmppSession().close(); + } // Schedule the connection again scheduleConnect(this.esme);