-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mqtt gateway refactoring due to OOM #10804
mqtt gateway refactoring due to OOM #10804
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, the code becomes much more readable with less overhead. A few points to work on:
- check device - single method, single logic
- How does the client will get an error if some conversion goes wrong? Currently it
swallowed. - Make sure that the netty workers does not occupied for a long time . Otherwise TCP clients will fell off
} | ||
|
||
String deviceName = deviceEntry.getKey(); | ||
T deviceCtx = devices.get(deviceName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use a single method
checkDeviceConnected(deviceName)
That returns future (immediate or real future)
Then check if the future is completed and successful - process now or add a callback
Having a custom device check (accessing internal map) looks like we have duplicated the logic
|
||
deviceMsgList.forEach(telemetryMsg -> { | ||
String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); | ||
T deviceCtx = devices.get(deviceName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same as above with checkDeviceConnected
if (deviceCtx != null) { | ||
processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId); | ||
} else { | ||
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (deviceCtx != null) { | ||
processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId); | ||
} else { | ||
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use the util DonAsynchron to simplify code
continue; | ||
} | ||
String deviceName = deviceEntry.getKey(); | ||
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here there is no immediate logic implemented. please, check the future result . it might be already done
TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject()); | ||
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); | ||
} catch (Throwable e) { | ||
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will the client on the remote end get an error if some conversion does not go well? Logs are for the ThingsBoard sysadmin purposes only.
Pull Request description
MQTT transport OOM - AbstractGatewaySessionHandler
https://thingsboard-portal.atlassian.net/browse/PROD-3598
General checklist
Front-End feature checklist
Back-End feature checklist