Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public Future<Boolean> persistMessages(List<InEvent<StandardMessage>> eventsQueu
String destAddr = msg.to;

MessageState state = new MessageState(gatewayMsgId, accountId, systemId, sourceAddr, destAddr, null);
state.setReassembledParts(msg.reassembledParts);
worker.getWorkerResources().getDlrService().saveInitialState(state);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class SmppServerBindHandler<M extends StandardMessage> implements SmppServerHandler {
private static final Logger logger = LoggerFactory.getLogger(SmppServerBindHandler.class);
private static final long UNPUSHED_DLR_REPLAY_BIND_WAIT_MILLIS = 5_000;
private static final long UNPUSHED_DLR_REPLAY_BIND_POLL_MILLIS = 50;
public final ServerConnections connections;
private final SmppServerWorker<M> worker;
private final ConcurrentHashMap<Long, SmppSessionContext> pendingSessionContexts;
Expand Down Expand Up @@ -84,15 +87,42 @@ public void sessionCreated(

logger.info("Session created for account ID: {}", accountId);
session.serverReady(handler);
scheduleUnpushedDlrReplay(handler);
}

private void scheduleUnpushedDlrReplay(SmppServerSessionHandler<M> handler) {
if (worker.getMessageStore() == null) {
return;
}
Thread.ofVirtual()
.name("SmppServer-UnpushedDlrReplay-" + handler.getSessionId())
.start(() -> replayUnpushedDlrsWhenBound(handler));
}

private void replayUnpushedDlrsWhenBound(SmppServerSessionHandler<M> handler) {
try {
if (worker.getMessageStore() != null) {
worker.getMessageStore().onClientConnected(handler.getSystemId());
if (!waitForBoundSession(handler)) {
logger.warn("Skipping unpushed DLR replay because session did not become bound for systemId:{}",
handler.getSystemId());
return;
}
worker.getMessageStore().onClientConnected(handler.getSystemId());
} catch (Exception e) {
logger.warn("Failed to process unpushed DLRs for systemId:{}", handler.getSystemId(), e);
}
}

private boolean waitForBoundSession(SmppServerSessionHandler<M> handler) throws InterruptedException {
long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(UNPUSHED_DLR_REPLAY_BIND_WAIT_MILLIS);
while (System.nanoTime() < deadline) {
if (handler.getSession() != null && handler.getSession().isBound()) {
return true;
}
TimeUnit.MILLISECONDS.sleep(UNPUSHED_DLR_REPLAY_BIND_POLL_MILLIS);
}
return handler.getSession() != null && handler.getSession().isBound();
}

public void sessionDestroyed(Long sessionId, SmppServerSession session) {
logger.info("Session destroyed: id:{} - name:{}", sessionId, session.getConfiguration().getName());
if (session.hasCounters()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,12 @@ public M doMessage(int pThreadIndex, M pMsg) throws IOException {
}

for (DeliverSm deliverSm : requests) {
deliverSm.setReferenceObject(new Object[]{handler, pMsg});
Object deliverMsgId = deliverSm.getReferenceObject();
if (deliverMsgId instanceof String msgId) {
deliverSm.setReferenceObject(new Object[]{handler, pMsg, msgId});
} else {
deliverSm.setReferenceObject(new Object[]{handler, pMsg});
}
enqueueOut(deliverSm);
if (MessageTrace.shouldLog(configurationProvider, MessageTrace.EVENT_DELIVER_ENQUEUED)) {
logger.info("message.deliver.enqueued worker={} {}", getFullName(), MessageTrace.identifiers(pMsg));
Expand Down Expand Up @@ -689,6 +694,7 @@ protected DeliverSm getDeliverSm(M pMsg, String messageId, int errorCode, Addres
}

deliverSm.setEsmClass(SmppConstants.ESM_CLASS_MT_SMSC_DELIVERY_RECEIPT);
deliverSm.setReferenceObject(messageId);
return deliverSm;
}

Expand Down Expand Up @@ -861,7 +867,7 @@ protected boolean checkReassembling(M msg) {
}

public void reEnqueueIn(List<InEvent<M>> inEvents) {
inEvents.forEach(event -> enqueueNoExceptions(event.pMsg));
inEvents.forEach(event -> enqueueToRouterNoExceptions(event.pMsg));
inEventQueue.addAll(inEvents);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class OutTask<M extends StandardMessage> implements Runnable {
private final SmppServerWorker<M> worker;
private final Pdu pdu;
private M msg;
private String deliverMsgId;

public OutTask(SmppServerWorker<M> worker, Pdu pdu) {
this.worker = worker;
Expand Down Expand Up @@ -42,6 +43,7 @@ public void run() {
Object[] arr = (Object[]) pdu.getReferenceObject();
SmppServerSessionHandler handler = (SmppServerSessionHandler) arr[0];
msg = (M) arr[1];
deliverMsgId = arr.length > 2 && arr[2] instanceof String id ? id : null;
success = handler.sendPduRequest((PduRequest) pdu);
}
} catch (Exception e) {
Expand All @@ -53,7 +55,8 @@ public void run() {
worker.outTaskFailed(pdu, msg);
} else if (!pdu.isResponse() && msg != null) {
if (MessageTrace.shouldLog(worker.getConfigurationProvider(), MessageTrace.EVENT_DELIVER_SENT)) {
logger.info("message.deliver.sent worker={} {}", worker.getFullName(), MessageTrace.identifiers(msg));
logger.info("message.deliver.sent worker={} deliverMsgId={} {}", worker.getFullName(),
MessageTrace.value(deliverMsgId), MessageTrace.identifiers(msg));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -86,6 +87,8 @@ public void createAndEnqueueDLR(int mqid, String smscid, String smsid, String fr
dlrMsg.errcode = errorCode != null ? errorCode : "";
dlrMsg.systemId = msgState.getSystemId();
dlrMsg.owner_id = msgState.getAccountId();
var reassembledParts = msgState.getReassembledParts();
dlrMsg.reassembledParts = reassembledParts == null ? null : new ArrayList<>(reassembledParts);
dlrMsg.type = StandardMessage.MSG_DLR;
try {
outWorker.enqueueToRouter(dlrMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io.quarkus.runtime.annotations.RegisterForReflection;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

@RegisterForReflection
public class MessageState implements Serializable {
Expand All @@ -15,6 +17,7 @@ public class MessageState implements Serializable {
private String destAddr;
private String operatorMsgId;
private String forwardDlrUrl;
private List<String> reassembledParts;
private MessageStatus status;
private long timestamp;

Expand Down Expand Up @@ -69,6 +72,10 @@ public MessageStatus getStatus() {
return status;
}

public List<String> getReassembledParts() {
return reassembledParts == null ? null : new ArrayList<>(reassembledParts);
}

public long getTimestamp() {
return timestamp;
}
Expand All @@ -81,6 +88,10 @@ public void setStatus(MessageStatus status) {
this.status = status;
}

public void setReassembledParts(List<String> reassembledParts) {
this.reassembledParts = reassembledParts == null ? null : new ArrayList<>(reassembledParts);
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ void persistMessages_SavesStateForEachMessage() {
assertEquals("sys2", captor.getAllValues().get(1).getSystemId());
}

@Test
void persistMessages_SavesReassembledPartIds() {
StandardMessage msg = new StandardMessage();
msg.serial = "gw-1";
msg.owner_id = "account1";
msg.systemId = "sys1";
msg.from = "from1";
msg.to = "to1";
msg.reassembledParts = new ArrayList<>(List.of("part-1", "part-2"));

messageStore.persistMessages(List.of(new InEvent<>(msg, null, 1, new Timestamp(System.currentTimeMillis()))));

ArgumentCaptor<MessageState> captor = ArgumentCaptor.forClass(MessageState.class);
verify(dlrService).saveInitialState(captor.capture());
assertEquals(List.of("part-1", "part-2"), captor.getValue().getReassembledParts());
}

@Test
void persistMessages_WithNullMessage_Skips() {
List<InEvent<StandardMessage>> events = new ArrayList<>();
Expand Down
Loading
Loading