Skip to content

Commit

Permalink
feat #12 受信キューのサイズ変更機能を実装
Browse files Browse the repository at this point in the history
  • Loading branch information
TodorokiKohei committed Jul 11, 2023
1 parent 27c9f2e commit 4112e66
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1808,4 +1808,8 @@ public IMqttToken authenticate(int reasonCode, Object userContext, MqttPropertie
comms.sendNoWait(auth, token);
return null;
}

public void resizeReceiverQueueSize(int size){
comms.resizeReceiverQueueSize(size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,8 @@ public int getActualInFlight() {
public boolean doesSubscriptionIdentifierExist(int subscriptionIdentifier) {
return this.callback.doesSubscriptionIdentifierExist(subscriptionIdentifier);

public void resizeReceiverQueueSize(int size){
callback.resizeQueueSize(size);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class CommsCallback implements Runnable {
private static final String CLASS_NAME = CommsCallback.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);

private static final int INBOUND_QUEUE_SIZE = 10;
private static int inboundQueueSize = 10;
private MqttCallback mqttCallback;
private MqttCallback reconnectInternalCallback;
private HashMap<Integer, IMqttMessageListener> callbackMap; // Map of message handler callbacks to internal IDs
Expand Down Expand Up @@ -81,8 +81,8 @@ private enum State {STOPPED, RUNNING, QUIESCING}

CommsCallback(ClientComms clientComms) {
this.clientComms = clientComms;
this.messageQueue = new ArrayList<>(INBOUND_QUEUE_SIZE);
this.completeQueue = new ArrayList<>(INBOUND_QUEUE_SIZE);
this.messageQueue = new ArrayList<>(inboundQueueSize);
this.completeQueue = new ArrayList<>(inboundQueueSize);
this.callbackMap = new HashMap<>();
this.callbackTopicMap = new HashMap<>();
this.subscriptionIdMap = new HashMap<>();
Expand Down Expand Up @@ -380,7 +380,7 @@ public void messageArrived(MqttPublish sendMessage) {
// the client protect itself from getting flooded by messages
// from the server.
synchronized (spaceAvailable) {
while (isRunning() && !isQuiescing() && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
while (isRunning() && !isQuiescing() && messageQueue.size() >= inboundQueueSize) {
try {
// @TRACE 709=wait for spaceAvailable
log.fine(CLASS_NAME, methodName, "709");
Expand Down Expand Up @@ -666,5 +666,17 @@ public boolean isQuiescing() {
}
return result;
}

public void resizeQueueSize(int size) {
if (isRunning()) {
throw new IllegalStateException("It cannot be resized after startup.");
}
inboundQueueSize = size;
messageQueue.ensureCapacity(size);
}

public int getNumberOfMsgsInQueue(){
return messageQueue.size();
}

}

0 comments on commit 4112e66

Please sign in to comment.