Skip to content

feat: add reliability manager #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

feat: add reliability manager #6

wants to merge 3 commits into from

Conversation

shash256
Copy link
Collaborator

This PR adds ReliabilityManager which has all the core logic for SDS. Would greatly appreciate nim related review and comments from the nwaku team !!

This PR is a part of the work towards waku-org/pm#194

Copy link

@jm-clius jm-clius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for great work! I do have some comments below. Most importantly, I think we can find a way to simplify how we view the incoming buffer and process missing dependencies. I've also suggested not using the bloomFilter for anything other than checking ACK status.

Comment on lines 16 to 17
if channelId.len == 0:
return err(ReliabilityError.reInvalidArgument)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would depend on how channelId is set elsewhere, but could this rather be an Option[SdsChannelID] which you test with isSome()?

let outMsg = rm.outgoingBuffer[i]

# Check if message is in causal history
for msgID in msg.causalHistory:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use in or contains here to replace the block:

if outMsg.message.messagId in msg.causalHistory:

Comment on lines 53 to 55
if bfResult.isOk():
var rbf = RollingBloomFilter(
filter: bfResult.get(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why parse the RollingBloomFilter for each message being read in the outgoingBuffer loop? I think you can parse and create the bloom filter once before entering the while loop?

while i < rm.outgoingBuffer.len:
var acknowledged = false
let outMsg = rm.outgoingBuffer[i]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me this block of code will read a bit easier if you introduce a isAcknowledged function that tests the ack status of each message in the outgoingBuffer.

Assuming you parse the rollingBloomFilter once before entering the while loop, it could look something like:

Suggested change
if outMsg.isAcknowledged(msg.causalHistory, rollingBloomFilter):
if not rm.onMessageSent.isNil():
rm.onMessageSent(outMsg.message.messageId)

etc.

if acknowledged:
if not rm.onMessageSent.isNil():
rm.onMessageSent(outMsg.message.messageId)
rm.outgoingBuffer.delete(i)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's good practice to remove items from the buffer you are iterating over. I would suggest building a list of items that should be cleaned and then deleting them all at once after exiting the iteration loop. @Ivansete-status any suggestions here on general best practice in Nim?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think is better the approach you suggest @jm-clius.
I'd use something like: let newFilteredSeq = sequence.filterIt( ...)

for msg in rm.incomingBuffer:
var hasMissingDeps = false
for depId in msg.causalHistory:
if not rm.bloomFilter.contains(depId):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I didn't initially consider the bloomFilter a good source for checking the dependencies of received messages as we already have a more reliable source (local history itself). Wouldn't it be possible to have a dependency check callback, perhaps implemented by the application, that takes a list of dependencies as argument and return the sublist of resolved dependencies after checking local history?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a first step, we can check the rm.messageHistory and only call the application-level callback if we require longer-term history. The application can also at that point try to retrieve still unresolved dependencies from Store nodes.

Comment on lines 182 to 183
if rm.bloomFilter.contains(msg.messageId):
return ok((msg.content, @[]))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this is a duplicate check? I wouldn't rely on bloomFilter unnecessarily here, as false positives could mean that we accidentally believe that we've seen this message before. I'd only use messageHistory here.


var missingDeps: seq[SdsMessageID] = @[]
for depId in msg.causalHistory:
if not rm.bloomFilter.contains(depId):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See point elsewhere. I don't think we should use the bloomFilter (where false positives are possible) if we have access to reliable information in local messageHistory.

Comment on lines 198 to 207
if missingDeps.len == 0:
# Check if any dependencies are still in incoming buffer
var depsInBuffer = false
for bufferedMsg in rm.incomingBuffer:
if bufferedMsg.messageId in msg.causalHistory:
depsInBuffer = true
break

if depsInBuffer:
rm.incomingBuffer.add(msg)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this step is necessary if we're going to add the message to the incomingBuffer in any case. Presumably, you'd like to avoid the onMissingDependencies call? What is expected to happen within the onMissingDependencies call?

Comment on lines +236 to +238
for msgId in messageIds:
if not rm.bloomFilter.contains(msgId):
rm.bloomFilter.add(msgId)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking that if we simplify the incomingBuffer to be a map of messageId to a seq[SdsMessageID] of missing dependencies (we can cache the full message contents elsewhere), we could directly mark these dependencies as met by removing them from the missingDependencies seq of each message in the incoming buffer. Processing the incoming buffer then becomes simply finding and processing the messages for which we now have an empty missingDependencies seq. WDYT?

Copy link

@Ivansete-status Ivansete-status left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments so far :)
I'd also suggest renaming message.nim module to sds_message.nim

Comment on lines 5 to 7
proc newReliabilityManager*(
channelId: SdsChannelID, config: ReliabilityConfig = defaultConfig()
): Result[ReliabilityManager, ReliabilityError] =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to create a new reliability_manager.nim module and then all the ReliabilityManager funcs/procs defined in reliability_utils.nim, to be moved into that new reliability_manager.nim.

Also, for ref object types (heap allocated and GC'ed), the idiomatic way to create them is through a new proc. I think it is fine returning a Result but better rename it to new. See the following as an example: https://github.com/waku-org/nwaku/blob/addce8dc3338ea1ca7d6ce1dd525f6f50ccbf467/waku/node/delivery_monitor/recv_monitor.nim#L153-L157

Comment on lines 52 to 53
let bfResult = deserializeBloomFilter(msg.bloomFilter)
if bfResult.isOk():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to use the valueOr approach.

Suggested change
let bfResult = deserializeBloomFilter(msg.bloomFilter)
if bfResult.isOk():
let bloomFilter = deserializeBloomFilter(msg.bloomFilter).valueOr:
error "Failed to deserialize bloom filter", error = $error
let rbf = RollingBloomFilter(
filter: bloomFilter,
...

if not acknowledged and msg.bloomFilter.len > 0:
let bfResult = deserializeBloomFilter(msg.bloomFilter)
if bfResult.isOk():
var rbf = RollingBloomFilter(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid creating instances of RollingBloomFilter outside the rolling_bloom_filter.nim module.
Then, we need a special init proc, which will be in charge of creating such objects from a seq[byte] bloom filter.

if acknowledged:
if not rm.onMessageSent.isNil():
rm.onMessageSent(outMsg.message.messageId)
rm.outgoingBuffer.delete(i)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think is better the approach you suggest @jm-clius.
I'd use something like: let newFilteredSeq = sequence.filterIt( ...)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create a nim library for SDS implementation from the API specification
3 participants