Skip to content

Commit

Permalink
Fixes issue with SMSC not being able to reconnect to CLIENT type esme…
Browse files Browse the repository at this point in the history
…s. Reworking queue processing logic. Closes #24
  • Loading branch information
satanatoly committed Sep 13, 2017
1 parent 1bef51f commit 87b4614
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 70 deletions.
Expand Up @@ -22,6 +22,7 @@

package org.restcomm.smpp;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -158,6 +159,7 @@ public class Esme extends SslConfigurationWrapper implements XMLSerializable, Es
private boolean linkRecvMessCheck = false;
private boolean linkStartFirstTime = false;

private AtomicBoolean inConnectingQueue = new AtomicBoolean(false);

// Default Server
private SmppSession.Type smppSessionType = SmppSession.Type.SERVER;
Expand Down Expand Up @@ -356,6 +358,11 @@ public Esme(String name, String systemId, String password, String host, int port
this.maxMessageLength = maxMessageLength;
}


public AtomicBoolean getInConnectingQueue() {
return inConnectingQueue;
}

/**
* @return the name
*/
Expand Down
Expand Up @@ -21,7 +21,10 @@
*/
package org.restcomm.smpp;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

import javolution.util.FastList;
import javolution.util.FastMap;
Expand Down Expand Up @@ -52,10 +55,12 @@ public class SmppClientOpsThread implements Runnable {
private static final Logger logger = Logger.getLogger(SmppClientOpsThread.class);

private static final long SCHEDULE_CONNECT_DELAY = 1000 * 30; // 30 sec
private final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

protected volatile boolean started = true;

private FastList<ChangeRequest> pendingChanges = new FastList<ChangeRequest>();
ConcurrentLinkedQueue<ChangeRequest> workingSet = new ConcurrentLinkedQueue<ChangeRequest>();
ConcurrentLinkedQueue<ChangeRequest> futureSet = new ConcurrentLinkedQueue<ChangeRequest>();

private Object waitObject = new Object();

Expand Down Expand Up @@ -87,24 +92,27 @@ protected void setStarted(boolean started) {

protected void scheduleConnect(Esme esme) {

logger.info("Initiating a Client SMPP connection for ESME: " + esme.getName());

synchronized (this.pendingChanges) {
this.pendingChanges.add(new ChangeRequest(esme, ChangeRequest.CONNECT, System.currentTimeMillis()
+ SCHEDULE_CONNECT_DELAY));
}

logger.debug("Scheduling a Client SMPP connection for ESME: " + esme.getName()
+ " systemId=" + esme.getSystemId());

long executionTime = System.currentTimeMillis() + SCHEDULE_CONNECT_DELAY;
if (esme.getInConnectingQueue().compareAndSet(false, true)) {
this.futureSet.offer(new ChangeRequest(esme, ChangeRequest.CONNECT, executionTime));
logger.debug("Pending change request CONNECT has been added for esme: " + esme.getName()
+ " with scheduled execution time on " + DATE_FORMAT.format(new Date(executionTime)));
} else {
logger.debug("Pending change request CONNECT has NOT been added for esme: " + esme.getName()
+ " because it is already in queue");
}
synchronized (this.waitObject) {
this.waitObject.notify();
}

}

protected void scheduleEnquireLink(Esme esme) {
synchronized (this.pendingChanges) {
this.pendingChanges.add(new ChangeRequest(esme, ChangeRequest.ENQUIRE_LINK, System.currentTimeMillis()
this.futureSet.offer(new ChangeRequest(esme, ChangeRequest.ENQUIRE_LINK, System.currentTimeMillis()
+ esme.getEnquireLinkDelay()));
}

synchronized (this.waitObject) {
this.waitObject.notify();
Expand All @@ -118,56 +126,60 @@ public void run() {
if (logger.isInfoEnabled()) {
logger.info("SmppClientOpsThread started.");
}

while (this.started) {
FastList<Esme> pendingList = new FastList<Esme>();

try {
synchronized (this.pendingChanges) {
Iterator<ChangeRequest> changes = pendingChanges.iterator();

while (changes.hasNext()) {
ChangeRequest change = changes.next();
switch (change.getType()) {
case ChangeRequest.CONNECT:
if (!change.getEsme().isStarted()) {
pendingChanges.remove(change);
} else {
if (change.getExecutionTime() <= System.currentTimeMillis()) {
pendingChanges.remove(change);
initiateConnection(change.getEsme());
}
}
break;
case ChangeRequest.ENQUIRE_LINK:
if (!change.getEsme().isStarted()) {
pendingChanges.remove(change);
} else {
if (change.getEsme().getEnquireClientEnabled()) {

if (change.getExecutionTime() <= System.currentTimeMillis()) {
pendingList.add(change.getEsme());
pendingChanges.remove(change);
}
}
}
break;
}
}
}

// Sending Enquire messages
Iterator<Esme> pendingchanges = pendingList.iterator();
while (pendingchanges.hasNext()) {
Esme change = pendingchanges.next();
this.enquireLink(change);
}

synchronized (this.waitObject) {
this.waitObject.wait(5000);
}

// checking of ESME CLOSED state - TODO: we need to refactor it after finding a reason of ESME not connecting
workingSet = futureSet;
futureSet = new ConcurrentLinkedQueue<ChangeRequest>();

FastList<Esme> pendingList = new FastList<Esme>();

ChangeRequest change = workingSet.poll();
while (change != null) {
switch (change.getType()) {
case ChangeRequest.CONNECT:
if (!change.getEsme().isStarted()) {
change.getEsme().getInConnectingQueue().set(false);
logger.warn("ESME " + change.getEsme().getName() + " is stopped. Removing change request.");
} else {
if (change.getExecutionTime() <= System.currentTimeMillis()) {
change.getEsme().getInConnectingQueue().set(false);
initiateConnection(change.getEsme());
} else {
futureSet.offer(change);
logger.debug("Change request for ESME " + change.getEsme().getName() + " is scheduled for later: "
+ DATE_FORMAT.format(new Date(change.getExecutionTime())));
}
}
break;
case ChangeRequest.ENQUIRE_LINK:
if (change.getEsme().isStarted()) {
if (change.getEsme().getEnquireClientEnabled()) {
if (change.getExecutionTime() <= System.currentTimeMillis()) {
pendingList.add(change.getEsme());
} else {
futureSet.offer(change);
}
}
}
break;
}

change = workingSet.poll();
}

// Sending Enquire messages
Iterator<Esme> pendingchanges = pendingList.iterator();
while (pendingchanges.hasNext()) {
Esme esme = pendingchanges.next();
this.enquireLink(esme);
}

try {
synchronized (this.waitObject) {
this.waitObject.wait(5000);
}

// checking of ESME CLOSED state - TODO: we need to refactor it after finding a reason of ESME not connecting
try {
long curTimeStamp = System.currentTimeMillis();
for (FastList.Node<Esme> n = this.esmeManagement.esmes.head(), end = this.esmeManagement.esmes.tail(); (n = n
Expand All @@ -193,10 +205,10 @@ public void run() {
}
} catch (Throwable e) {
}
} catch (InterruptedException e) {
logger.error("Error while looping SmppClientOpsThread thread", e);
}
} catch (InterruptedException e) {
logger.error("Error while looping SmppClientOpsThread thread", e);
}
}// while

if (logger.isInfoEnabled()) {
Expand Down Expand Up @@ -246,7 +258,7 @@ private void enquireLink(Esme esme) {

} else {
// This should never happen
logger.warn(String.format("Sending ENQURE_LINK fialed for ESME SystemId=%s as SmppSession is =%s !",
logger.warn(String.format("Sending ENQURE_LINK failed for ESME SystemId=%s as SmppSession is =%s !",
esme.getSystemId(), (smppSession == null ? null : smppSession.getStateName())));

if (smppSession != null) {
Expand All @@ -257,28 +269,34 @@ private void enquireLink(Esme esme) {
smppSession.getConfiguration().getName()));
}
}

this.scheduleConnect(esme);
}
}

private void initiateConnection(Esme esme) {
logger.debug("Initiating connection for esme " + esme.getName() + " is started.");

// If Esme is stopped, don't try to initiate connect
if (!esme.isStarted()) {
logger.warn("ESME: " + esme.getName() + " is stopped. Will not try to initiate connection.");
return;
}

SmppSession smppSession = esme.getSmppSession();
if ((smppSession != null && smppSession.isBound()) || (smppSession != null && smppSession.isBinding())) {
// If process has already begun lets not do it again
logger.debug("SMPP session is already bound or binding for ESME: " + esme.getName() + ". Will not try to initiate connection");
return;
}

SmppSession session0 = null;
try {
logger.debug("Creating SMPP Session with ESME " + esme.getName() + " started.");

SmppSessionConfiguration config0 = new SmppSessionConfiguration();
config0.setWindowSize(esme.getWindowSize());
config0.setName(esme.getSystemId());
config0.setName(esme.getName());
config0.setType(esme.getSmppBindType());
config0.setBindTimeout(esme.getClientBindTimeout());
config0.setHost(esme.getHost());
Expand Down Expand Up @@ -325,17 +343,19 @@ private void initiateConnection(Esme esme) {
config0.setSslConfiguration(sslConfiguration);
}

logger.debug("Binding with ESME " + esme.getName() + " systemId=" + esme.getSystemId());
session0 = clientBootstrap.bind(config0, sessionHandler);

// Set in ESME
logger.debug("SMPP session has been created for ESME: " + esme.getName());
esme.setSmppSession((DefaultSmppSession) session0);

// Finally set Enquire Link schedule
this.scheduleEnquireLink(esme);
} catch (Exception e) {
} catch (Throwable e) {
logger.error(
String.format("Exception when trying to bind client SMPP connection for ESME systemId=%s",
esme.getSystemId()), e);
esme.getSystemId()) + " name = " + esme.getName(), e);
if (session0 != null) {
session0.close();
}
Expand Down Expand Up @@ -373,7 +393,9 @@ public String lookupTlvTagName(short arg0) {
public void fireChannelUnexpectedlyClosed() {
this.wrappedSmppSessionHandler.fireChannelUnexpectedlyClosed();

this.esme.getSmppSession().close();
if (this.esme.getSmppSession() != null) {
this.esme.getSmppSession().close();
}

// Schedule the connection again
scheduleConnect(this.esme);
Expand Down Expand Up @@ -409,7 +431,9 @@ public void fireUnknownThrowable(Throwable e) {
this.wrappedSmppSessionHandler.fireUnknownThrowable(e);
// TODO is this ok?

this.esme.getSmppSession().close();
if (this.esme.getSmppSession() != null) {
this.esme.getSmppSession().close();
}

// Schedule the connection again
scheduleConnect(this.esme);
Expand Down

0 comments on commit 87b4614

Please sign in to comment.