Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

受信したメッセージのキューイングに関する調査 #8

Closed
TodorokiKohei opened this issue Jun 6, 2023 · 5 comments
Closed

Comments

@TodorokiKohei
Copy link
Owner

動的な負荷分散アルゴリズムの中で、クライアントの保留中のメッセージ数から動的に重みを計算するアルゴリズムがあった。それを実現するための調査を行う。

@TodorokiKohei
Copy link
Owner Author

メッセージの受信はCommsReceiverrunメソッドで行われている。受信したメッセージはClientStatenotifyReceivedMsgメソッドに渡される。

public void run() {
recThread = Thread.currentThread();
recThread.setName(threadName);
final String methodName = "run";
MqttToken token = null;
synchronized (lifecycle) {
current_state = State.RUNNING;
}
try {
State my_target;
synchronized (lifecycle) {
my_target = target_state;
}
while (my_target == State.RUNNING && (in != null)) {
try {
//@TRACE 852=network read message
log.fine(CLASS_NAME,methodName,"852");
if (in.available() > 0) {
synchronized (lifecycle) {
current_state = State.RECEIVING;
}
}
MqttWireMessage message = in.readMqttWireMessage();
synchronized (lifecycle) {
current_state = State.RUNNING;
}
// instanceof checks if message is null
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token != null) {
synchronized (token) {
// Ensure the notify processing is done under a lock on the token
// This ensures that the send processing can complete before the
// receive processing starts! ( request and ack and ack processing
// can occur before request processing is complete if not!
clientState.notifyReceivedAck((MqttAck) message);
}
} else {
// This is an ack for a message we no longer have a ticket for.
log.fine(CLASS_NAME, methodName, "857");
clientState.handleOrphanedAcks((MqttAck) message);
}
} else if (message != null && message instanceof MqttDisconnect) {
// This is a Disconnect Message
clientComms.shutdownConnection(null, new MqttException(MqttClientException.REASON_CODE_SERVER_DISCONNECTED, (MqttDisconnect) message), (MqttDisconnect) message);
} else {
if (message != null) {
// A new message has arrived
clientState.notifyReceivedMsg(message);
}
else {
if (!clientComms.isConnected() && !clientComms.isConnecting()) {
throw new IOException("Connection is lost.");
}
}
}
}
catch (MqttException ex) {
// @TRACE 856=Stopping, MQttException
log.fine(CLASS_NAME, methodName, "856", null, ex);
synchronized (lifecycle) {
target_state = State.STOPPED;
}
// Token maybe null but that is handled in shutdown
clientComms.shutdownConnection(token, ex, null);
}
catch (IOException ioe) {
// @TRACE 853=Stopping due to IOException
log.fine(CLASS_NAME, methodName, "853");
if (target_state != State.STOPPED) {
synchronized (lifecycle) {
target_state = State.STOPPED;
}
// An EOFException could be raised if the broker processes the
// DISCONNECT and ends the socket before we complete. As such,
// only shutdown the connection if we're not already shutting down.
if (!clientComms.isDisconnecting()) {
clientComms.shutdownConnection(token,
new MqttException(MqttClientException.REASON_CODE_CONNECTION_LOST, ioe), null);
}
}
}
finally {
synchronized (lifecycle) {
current_state = State.RUNNING;
}
}
synchronized (lifecycle) {
my_target = target_state;
}
} // end while
} finally {
synchronized (lifecycle) {
current_state = State.STOPPED;
}
} // end try
recThread = null;
//@TRACE 854=<
log.fine(CLASS_NAME,methodName,"854");
}

@TodorokiKohei
Copy link
Owner Author

ClientSatenotifyReceivedMsgは受け取ったメッセージをCommsCallbackmessageArrivedに渡す。

switch (send.getMessage().getQos()) {
case 0:
case 1:
if (callback != null) {
callback.messageArrived(send);
}
break;
case 2:
persistence.put(getReceivedPersistenceKey(message), (MqttPublish) message);
inboundQoS2.put(Integer.valueOf(send.getMessageId()), send);
if (callback != null) {
callback.messageArrived(send);
}
// Currently this client has no need of the properties, so this is left empty.

@TodorokiKohei
Copy link
Owner Author

TodorokiKohei commented Jun 6, 2023

CommsCallbackmessageArrivedは受け取ったメッセージをmessageQueueに追加する。このmessageQueueは別スレッドで動いているCommsCallback.run()によって処理されている。

if (!isQuiescing()) {
// Notify the CommsCallback thread that there's work to do...
synchronized (workAvailable) {
messageQueue.add(sendMessage);
// @TRACE 710=new msg avail, notify workAvailable
log.fine(CLASS_NAME, methodName, "710");
workAvailable.notifyAll();
}
}

// Check for messageArrived callbacks...
MqttPublish message = null;
synchronized (workAvailable) {
if (!messageQueue.isEmpty()) {
// Note, there is a window on connect where a publish
// could arrive before we've
// finished the connect logic.
message = messageQueue.get(0);
messageQueue.remove(0);
}
}
if (null != message) {
handleMessage(message);
}

@TodorokiKohei
Copy link
Owner Author

デフォルトだとサブスクライバのキューサイズはINBOUND_QUEUE_SIZEで10しかない。この値を可変にする必要がある。この値をどのように設定するかは設計段階で考える必要がある。

synchronized (spaceAvailable) {
while (isRunning() && !isQuiescing() && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
try {
// @TRACE 709=wait for spaceAvailable
log.fine(CLASS_NAME, methodName, "709");
spaceAvailable.wait(200);
} catch (InterruptedException ex) {
}
}
}

@TodorokiKohei
Copy link
Owner Author

TodorokiKohei commented Jun 19, 2023

キューのサイズを小さくし、サブスクライバのmessageArrived処理にスリープを導入した。その場合、実行中にエラーが発生し、コネクションが強制終了されてしまう。エラーメッセージとパケットキャプチャを以下に示す。

6月 19, 2023 6:20:28 午後 org.eclipse.paho.mqttv5.client.internal.ClientState checkForActivity 重大: subscriber: Timed out as no activity, keepAlive=2,000,000,000 lastOutboundActivity=3,377,282,055,425,600 lastInboundActivity=3,377,281,852,554,600 time=3,377,284,060,795,900 lastPing=3,377,282,055,441,700

image

エラー内容だとサーバからKeepAliveの時間内にPingReqに対するPingRespが受信できていないようである。しかし、パケットキャプチャを見てみると全てのPingReqに対するPingRespは受信できている。どうやらサーバからのMQTTパケットはクライアントのソケットのバッファに格納されているが、アプリケーションがバッファ内のデータを受信できていないようである。ソケットのバッファサイズのデフォルトは64KBであった(TCPNetworkModuleでSocketクラスが作成されており、これが使われているようである)。

問題の原因として、PingRespはクライアントに到着しているが、アプリケーションの処理(messsageArrived)が詰まっているため、CommsReceiverのキューイング処理がロックされてしまい、lastInboundActivityの更新が行われているないことだと考えられる。ローカルのキューサイズを大きくするとCommsReceiverのキューイング処理がロックされなくなるため、改善可能である。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

No branches or pull requests

1 participant