-
Notifications
You must be signed in to change notification settings - Fork 1
feat: add reliability manager #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for great work! I do have some comments below. Most importantly, I think we can find a way to simplify how we view the incoming buffer and process missing dependencies. I've also suggested not using the bloomFilter
for anything other than checking ACK status.
src/reliability.nim
Outdated
if channelId.len == 0: | ||
return err(ReliabilityError.reInvalidArgument) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would depend on how channelId
is set elsewhere, but could this rather be an Option[SdsChannelID]
which you test with isSome()
?
src/reliability.nim
Outdated
let outMsg = rm.outgoingBuffer[i] | ||
|
||
# Check if message is in causal history | ||
for msgID in msg.causalHistory: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can use in
or contains
here to replace the block:
if outMsg.message.messagId in msg.causalHistory:
src/reliability.nim
Outdated
if bfResult.isOk(): | ||
var rbf = RollingBloomFilter( | ||
filter: bfResult.get(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why parse the RollingBloomFilter
for each message being read in the outgoingBuffer
loop? I think you can parse and create the bloom filter once before entering the while
loop?
while i < rm.outgoingBuffer.len: | ||
var acknowledged = false | ||
let outMsg = rm.outgoingBuffer[i] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me this block of code will read a bit easier if you introduce a isAcknowledged
function that tests the ack status of each message in the outgoingBuffer.
Assuming you parse the rollingBloomFilter once before entering the while
loop, it could look something like:
if outMsg.isAcknowledged(msg.causalHistory, rollingBloomFilter): | |
if not rm.onMessageSent.isNil(): | |
rm.onMessageSent(outMsg.message.messageId) |
etc.
src/reliability.nim
Outdated
if acknowledged: | ||
if not rm.onMessageSent.isNil(): | ||
rm.onMessageSent(outMsg.message.messageId) | ||
rm.outgoingBuffer.delete(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's good practice to remove items from the buffer you are iterating over. I would suggest building a list of items that should be cleaned and then deleting them all at once after exiting the iteration loop. @Ivansete-status any suggestions here on general best practice in Nim?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think is better the approach you suggest @jm-clius.
I'd use something like: let newFilteredSeq = sequence.filterIt( ...)
src/reliability.nim
Outdated
for msg in rm.incomingBuffer: | ||
var hasMissingDeps = false | ||
for depId in msg.causalHistory: | ||
if not rm.bloomFilter.contains(depId): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I didn't initially consider the bloomFilter
a good source for checking the dependencies of received messages as we already have a more reliable source (local history itself). Wouldn't it be possible to have a dependency check callback, perhaps implemented by the application, that takes a list of dependencies as argument and return the sublist of resolved dependencies after checking local history?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a first step, we can check the rm.messageHistory
and only call the application-level callback if we require longer-term history. The application can also at that point try to retrieve still unresolved dependencies from Store nodes.
src/reliability.nim
Outdated
if rm.bloomFilter.contains(msg.messageId): | ||
return ok((msg.content, @[])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presumably this is a duplicate check? I wouldn't rely on bloomFilter
unnecessarily here, as false positives could mean that we accidentally believe that we've seen this message before. I'd only use messageHistory
here.
src/reliability.nim
Outdated
|
||
var missingDeps: seq[SdsMessageID] = @[] | ||
for depId in msg.causalHistory: | ||
if not rm.bloomFilter.contains(depId): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See point elsewhere. I don't think we should use the bloomFilter
(where false positives are possible) if we have access to reliable information in local messageHistory
.
src/reliability.nim
Outdated
if missingDeps.len == 0: | ||
# Check if any dependencies are still in incoming buffer | ||
var depsInBuffer = false | ||
for bufferedMsg in rm.incomingBuffer: | ||
if bufferedMsg.messageId in msg.causalHistory: | ||
depsInBuffer = true | ||
break | ||
|
||
if depsInBuffer: | ||
rm.incomingBuffer.add(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this step is necessary if we're going to add the message to the incomingBuffer
in any case. Presumably, you'd like to avoid the onMissingDependencies
call? What is expected to happen within the onMissingDependencies
call?
for msgId in messageIds: | ||
if not rm.bloomFilter.contains(msgId): | ||
rm.bloomFilter.add(msgId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking that if we simplify the incomingBuffer
to be a map of messageId
to a seq[SdsMessageID]
of missing dependencies (we can cache the full message contents elsewhere), we could directly mark these dependencies as met by removing them from the missingDependencies seq of each message in the incoming buffer. Processing the incoming buffer then becomes simply finding and processing the messages for which we now have an empty missingDependencies
seq. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments so far :)
I'd also suggest renaming message.nim module to sds_message.nim
src/reliability.nim
Outdated
proc newReliabilityManager*( | ||
channelId: SdsChannelID, config: ReliabilityConfig = defaultConfig() | ||
): Result[ReliabilityManager, ReliabilityError] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend to create a new reliability_manager.nim
module and then all the ReliabilityManager
funcs/procs defined in reliability_utils.nim
, to be moved into that new reliability_manager.nim
.
Also, for ref object
types (heap allocated and GC'ed), the idiomatic way to create them is through a new
proc. I think it is fine returning a Result
but better rename it to new
. See the following as an example: https://github.com/waku-org/nwaku/blob/addce8dc3338ea1ca7d6ce1dd525f6f50ccbf467/waku/node/delivery_monitor/recv_monitor.nim#L153-L157
src/reliability.nim
Outdated
let bfResult = deserializeBloomFilter(msg.bloomFilter) | ||
if bfResult.isOk(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use the valueOr
approach.
let bfResult = deserializeBloomFilter(msg.bloomFilter) | |
if bfResult.isOk(): | |
let bloomFilter = deserializeBloomFilter(msg.bloomFilter).valueOr: | |
error "Failed to deserialize bloom filter", error = $error | |
let rbf = RollingBloomFilter( | |
filter: bloomFilter, | |
... |
src/reliability.nim
Outdated
if not acknowledged and msg.bloomFilter.len > 0: | ||
let bfResult = deserializeBloomFilter(msg.bloomFilter) | ||
if bfResult.isOk(): | ||
var rbf = RollingBloomFilter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid creating instances of RollingBloomFilter
outside the rolling_bloom_filter.nim
module.
Then, we need a special init
proc, which will be in charge of creating such objects from a seq[byte]
bloom filter.
src/reliability.nim
Outdated
if acknowledged: | ||
if not rm.onMessageSent.isNil(): | ||
rm.onMessageSent(outMsg.message.messageId) | ||
rm.outgoingBuffer.delete(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think is better the approach you suggest @jm-clius.
I'd use something like: let newFilteredSeq = sequence.filterIt( ...)
This PR adds ReliabilityManager which has all the core logic for SDS. Would greatly appreciate nim related review and comments from the nwaku team !!
This PR is a part of the work towards waku-org/pm#194