Skip to content

Commit

Permalink
BS-232 Uniqueness of SmppMessageId is now enforced on submit_response
Browse files Browse the repository at this point in the history
  • Loading branch information
hrosa committed Jun 11, 2018
1 parent f84d178 commit d9db91b
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 111 deletions.
Expand Up @@ -126,10 +126,8 @@
LIMIT #{limit} OFFSET #{offset}
</select>

<select id="findBySmppMessageIdAndDateCreatedGreaterOrEqualThanOrderedByDateCreatedDesc" parameterType="map" resultType="hashmap">
<select id="findBySmppMessageId" parameterType="map" resultType="hashmap">
SELECT * FROM "restcomm_sms_messages" AS "restcomm_sms_messages"
WHERE "smpp_message_id" = #{smppMessageId}
AND "date_created" &gt;= #{startDate}
ORDER BY "date_created" DESC
</select>
</mapper>
Expand Up @@ -125,11 +125,10 @@
order by "date_created"
LIMIT #{limit} OFFSET #{offset}
</select>
<select id="findBySmppMessageIdAndDateCreatedGreaterOrEqualThanOrderedByDateCreatedDesc" parameterType="map" resultType="hashmap">

<select id="findBySmppMessageId" parameterType="map" resultType="hashmap">
SELECT * FROM "restcomm_sms_messages" AS "restcomm_sms_messages"
WHERE "smpp_message_id" = #{smppMessageId}
AND "date_created" &gt;= #{startDate}
ORDER BY "date_created" DESC
</select>

</mapper>
Expand Up @@ -55,5 +55,5 @@ public interface SmsMessagesDao {
Integer getTotalSmsMessage(SmsMessageFilter filter);
List<SmsMessage> getSmsMessages(SmsMessageFilter filter);

List<SmsMessage> findBySmppMessageIdAndDateCreatedGreaterOrEqualThanOrderedByDateCreatedDesc(String smppMessageId, DateTime startDate);
List<SmsMessage> findBySmppMessageId(String smppMessageId);
}
Expand Up @@ -22,8 +22,6 @@
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.restcomm.connect.commons.annotations.concurrency.ThreadSafe;
import org.restcomm.connect.commons.dao.Sid;
import org.restcomm.connect.dao.SmsMessagesDao;
Expand Down Expand Up @@ -198,19 +196,16 @@ public int getSmsMessagesPerAccountLastPerMinute(String accountSid) throws Parse
}

@Override
public List<SmsMessage> findBySmppMessageIdAndDateCreatedGreaterOrEqualThanOrderedByDateCreatedDesc(String smppMessageId, DateTime startDate) {
final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
public List<SmsMessage> findBySmppMessageId(String smppMessageId) {
final Map<String, Object> parameters = new HashMap<>(2);
parameters.put("smppMessageId", smppMessageId);
parameters.put("startDate", formatter.print(startDate.withTime(0, 0, 0, 0)));

final SqlSession session = this.sessions.openSession();

try {
final List<Map<String, Object>> results = session.selectList(namespace + "findBySmppMessageIdAndDateCreatedGreaterOrEqualThanOrderedByDateCreatedDesc", parameters);
final List<Map<String, Object>> results = session.selectList(namespace + "findBySmppMessageId", parameters);
final List<SmsMessage> messages = new ArrayList<>(results.size());

for (Map<String, Object> result: results) {
for (Map<String, Object> result : results) {
messages.add(toSmsMessage(result));
}
return messages;
Expand Down
Expand Up @@ -127,11 +127,9 @@
LIMIT #{limit} OFFSET #{offset}
</select>

<select id="findBySmppMessageIdAndDateCreatedGreaterOrEqualThanOrderedByDateCreatedDesc" parameterType="map" resultType="hashmap">
<select id="findBySmppMessageId" parameterType="map" resultType="hashmap">
SELECT * FROM "restcomm_sms_messages" AS "restcomm_sms_messages"
WHERE "smpp_message_id" = #{smppMessageId}
AND "date_created" &gt;= #{startDate}
ORDER BY "date_created" DESC
</select>

</mapper>
Expand Up @@ -116,9 +116,9 @@ public void createReadUpdateDelete() {
// Validate that the CDR was removed.
assertTrue(messages.getSmsMessage(sid) == null);
}

private SmsMessage createSms() {
return createSms(Sid.generate(Sid.Type.ACCOUNT), SmsMessage.Direction.OUTBOUND_API, 0, DateTime.now());
return createSms(Sid.generate(Sid.Type.ACCOUNT), SmsMessage.Direction.OUTBOUND_API, 0, DateTime.now());
}

private SmsMessage createSms(Sid account, SmsMessage.Direction direction, int index, DateTime date) {
Expand All @@ -134,7 +134,7 @@ private SmsMessage createSms(Sid account, SmsMessage.Direction direction, int in
.setApiVersion("2012-04-24")
.setRecipient("+12223334444")
.setSender("+17778889999")
.setBody("Hello World - "+index)
.setBody("Hello World - " + index)
.setStatus(SmsMessage.Status.SENDING)
.setDirection(direction)
.setPrice(new BigDecimal("0.00"))
Expand All @@ -154,33 +154,33 @@ public void testGetSmsMessagesLastMinute() throws InterruptedException, ParseExc
SmsMessage message = createSms(account, SmsMessage.Direction.OUTBOUND_API, i, oneMinuteAgo);
// Create a new sms message in the data store.
messages.addSmsMessage(message);
logger.info("Created message: "+message);
logger.info("Created message: " + message);
}
for (int i = 0; i < 2; i++) {
SmsMessage message = createSms(account, SmsMessage.Direction.OUTBOUND_CALL, i, oneMinuteAgo);
// Create a new sms message in the data store.
messages.addSmsMessage(message);
logger.info("Created message: "+message);
logger.info("Created message: " + message);
}
for (int i = 0; i < 2; i++) {
SmsMessage message = createSms(account, SmsMessage.Direction.OUTBOUND_REPLY, i, oneMinuteAgo);
// Create a new sms message in the data store.
messages.addSmsMessage(message);
logger.info("Created message: "+message);
logger.info("Created message: " + message);
}
int lastMessages = messages.getSmsMessagesPerAccountLastPerMinute(account.toString());
logger.info("SMS Messages last minutes: "+lastMessages);
logger.info("SMS Messages last minutes: " + lastMessages);
assertEquals(6, lastMessages);
Thread.sleep(5000);
DateTime oneMinuteLater = DateTime.now();
for (int i = 0; i < 3; i++) {
SmsMessage message = createSms(account, SmsMessage.Direction.OUTBOUND_CALL, i, oneMinuteLater);
// Create a new sms message in the data store.
messages.addSmsMessage(message);
logger.info("Created message: "+message);
logger.info("Created message: " + message);
}
lastMessages = messages.getSmsMessagesPerAccountLastPerMinute(account.toString());
logger.info("SMS Messages last minutes: "+lastMessages);
logger.info("SMS Messages last minutes: " + lastMessages);
assertEquals(3, lastMessages);
messages.removeSmsMessages(account);
}
Expand Down Expand Up @@ -217,20 +217,20 @@ public void testReadDeleteByAccount() {
}

@Test
public void testUpdateSmsMessageDateSentAndStatusAndGetBySmppMsgId(){
final DateTime dateSent = DateTime.now();
final SmsMessage.Status status = SmsMessage.Status.SENT;
final String smppMessageId = "0000058049";
public void testUpdateSmsMessageDateSentAndStatusAndGetBySmppMsgId() {
final DateTime dateSent = DateTime.now();
final SmsMessage.Status status = SmsMessage.Status.SENT;
final String smppMessageId = "0000058049";

// add a new msg
SmsMessage smsMessage = createSms();
final SmsMessagesDao messages = manager.getSmsMessagesDao();
// add a new msg
SmsMessage smsMessage = createSms();
final SmsMessagesDao messages = manager.getSmsMessagesDao();
messages.addSmsMessage(smsMessage);

//set status and dateSent
smsMessage = smsMessage.setStatus(status).setDateSent(dateSent).setSmppMessageId(smppMessageId);
messages.updateSmsMessage(smsMessage);
smsMessage = smsMessage.setStatus(status).setDateSent(dateSent).setSmppMessageId(smppMessageId);
messages.updateSmsMessage(smsMessage);

//get SmsMessage By SmppMessageId
SmsMessage resultantSmsMessage = messages.getSmsMessageBySmppMessageId(smppMessageId);

Expand Down Expand Up @@ -271,13 +271,14 @@ public void testFindBySmppMessageIdAndDateCreatedGreaterOrEqualThanOrderedByDate
smsMessagesDao.addSmsMessage(smsMessage6);
smsMessagesDao.addSmsMessage(smsMessage7);

final List<SmsMessage> messages = smsMessagesDao.findBySmppMessageIdAndDateCreatedGreaterOrEqualThanOrderedByDateCreatedDesc(smppMessageId, threeDaysAgo);
final List<SmsMessage> messages = smsMessagesDao.findBySmppMessageId(smppMessageId);

// then
try {
assertEquals(3, messages.size());
assertEquals(smsMessage6.getSid(), messages.get(0).getSid());
assertEquals(smsMessage5.getSid(), messages.get(1).getSid());
assertEquals(smsMessage3.getSid(), messages.get(2).getSid());
assertEquals(5, messages.size());
for (SmsMessage message: messages) {
assertEquals(smppMessageId, message.getSmppMessageId());
}
} finally {
smsMessagesDao.removeSmsMessages(accountSid);
}
Expand Down
Expand Up @@ -149,24 +149,15 @@ public void onReceive(Object message) throws Exception {
logger.debug("DLR Received for SMPP Message " + deliveryReceipt.getId() + " with status " + deliveryStatus);
}

// Find all messages correlated with SMPP Message ID in last three days
final List<SmsMessage> smsMessages = this.storage.getSmsMessagesDao().findBySmppMessageIdAndDateCreatedGreaterOrEqualThanOrderedByDateCreatedDesc(smppMessageId, DateTime.now().minusDays(3));

// Update status of messages and remove correlation with SMPP Message ID
// IMPORTANT: First message in the results list is considered the real target of the DLR
// BS-232: Other "pending" messages that might share correlation with same SMPP Message ID will have their status forcefully updated to SENT
final int smsCount = smsMessages.size();
if (smsCount == 0) {
logger.warning("responseMessageId=" + deliveryReceipt.getId() + " was never received! ");
} else {
for (int index = 0; index < smsCount; index++) {
final SmsMessage.Status status = (index > 0) ? SmsMessage.Status.SENT : deliveryStatus;
this.storage.getSmsMessagesDao().updateSmsMessage(smsMessages.get(index).setSmppMessageId(null).setStatus(status));
// Find message bound to the SMPP Message ID
// NOTE: We ensure there is only one message bound to any SmppMessageId at this point because uniqueness is enforced on submit_response event
final SmsMessage sms = this.storage.getSmsMessagesDao().getSmsMessageBySmppMessageId(smppMessageId);

if (index > 0) {
logger.warning("Correlation between SmsMessage " + smsMessages.get(index).getSid() + " and SMPP Message " + smppMessageId + " expired. Status forcefully set to " + status);
}
}
// Update status of message and remove correlation with SMPP Message ID
if (sms == null) {
logger.warning("responseMessageId=" + smppMessageId + " was never received!");
} else {
this.storage.getSmsMessagesDao().updateSmsMessage(sms.setSmppMessageId(null).setStatus(deliveryStatus));
}
} else if (message instanceof CreateSmsSession) {
IExtensionCreateSmsSessionRequest ier = (CreateSmsSession) message;
Expand All @@ -187,43 +178,41 @@ public void onReceive(Object message) throws Exception {
final ActorRef session = destroySmsSession.session();
context.stop(session);
} else if (message instanceof PduAsyncResponse) {

PduAsyncResponse pduAsyncResponse = (PduAsyncResponse) message;

final PduAsyncResponse pduAsyncResponse = (PduAsyncResponse) message;
if (pduAsyncResponse instanceof DefaultPduAsyncResponse && pduAsyncResponse.getResponse() instanceof SubmitSmResp) {
SubmitSmResp submitSmResp = (SubmitSmResp) pduAsyncResponse.getResponse();
final SubmitSmResp submitSmResp = (SubmitSmResp) pduAsyncResponse.getResponse();
if (logger.isInfoEnabled()) {
logger.info(" ********** SmppMessageHandler received SubmitSmResp: " + submitSmResp + "SubmitSmResp Status:" + submitSmResp.getCommandStatus());
}

String smppMessageId = submitSmResp.getMessageId();

Object ref = pduAsyncResponse.getRequest().getReferenceObject();
final String smppMessageId = submitSmResp.getMessageId();
final Object ref = pduAsyncResponse.getRequest().getReferenceObject();

if (ref != null && ref instanceof Sid) {
// BS-230: Ensure there is no other message sharing same SMPP Message ID
SmsMessage existingMessage = this.storage.getSmsMessagesDao().getSmsMessageBySmppMessageId(smppMessageId);
if (existingMessage != null) {
// Cut correlation between SMS and SMPP Message ID and update message to a final state
existingMessage = existingMessage.setSmppMessageId(null);
logger.warning("Correlation between SmsMessage " + existingMessage.getSid() + " and SMPP Message " + smppMessageId + " expired.");
this.storage.getSmsMessagesDao().updateSmsMessage(existingMessage);
final List<SmsMessage> smsMessages = this.storage.getSmsMessagesDao().findBySmppMessageId(smppMessageId);

// Delete correlation between messages and SMPP Message ID
for (SmsMessage smsMessage : smsMessages) {
this.storage.getSmsMessagesDao().updateSmsMessage(smsMessage.setSmppMessageId(null));
logger.warning("Correlation between SmsMessage " + smsMessage.getSid() + " and SMPP Message " + smppMessageId + " expired.");
}

Sid sid = (Sid) ref;
SmsMessage smsMessage = storage.getSmsMessagesDao().getSmsMessage(sid);
if (submitSmResp.getCommandStatus() != 0) {
logger.warning(String.format("SubmitSmResp Failure! Message could not be sent Status Code %s Result Messages: %s", submitSmResp.getCommandStatus(), submitSmResp.getResultMessage()));
smsMessage = smsMessage.setSmppMessageId(smppMessageId).setStatus(SmsMessage.Status.FAILED);
} else {
//update smppMessageId as well as status to SENT and date sent
// Update status of target message
SmsMessage smsMessage = storage.getSmsMessagesDao().getSmsMessage((Sid) ref);
if (submitSmResp.getCommandStatus() == SmppConstants.STATUS_OK) {
// Successful reponse: update smppMessageId as well as status to SENT and date sent
smsMessage = smsMessage.setSmppMessageId(smppMessageId).setStatus(SmsMessage.Status.SENT).setDateSent(DateTime.now());
} else {
// Failure response: set status to FAILED and do not correlate to any smppMessageId
logger.warning(String.format("SubmitSmResp Failure! Message could not be sent Status Code %s Result Messages: %s", submitSmResp.getCommandStatus(), submitSmResp.getResultMessage()));
smsMessage = smsMessage.setSmppMessageId(null).setStatus(SmsMessage.Status.FAILED);
}
storage.getSmsMessagesDao().updateSmsMessage(smsMessage);
} else {
logger.warning("PduAsyncResponse reference is null or not Sid");
}
} else {
} else if (logger.isInfoEnabled()) {
logger.info("PduAsyncResponse not SubmitSmResp " + pduAsyncResponse.getClass().toString());
}
}
Expand Down

0 comments on commit d9db91b

Please sign in to comment.