-
Notifications
You must be signed in to change notification settings - Fork 242
Allow sending a message with a token transfer #245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The message hash will be recorded with the transfer, and the message will not be considered confirmed until the transfer is also confirmed. Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
| } | ||
| switch in.Header.Type { | ||
| case fftypes.MessageTypeTransferBroadcast: | ||
| return am.broadcast.BroadcastMessage(ctx, ns, in, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that even in the waitConfirm=true case, the message is dispatched async. This is because dispatching the message synchronously would block forever waiting for the transfer to arrive.
Technically we could build and seal the message without sending it in this case, then record the hash, then send the transfer synchronously, then send the message synchronously. I spent some time going down that road and it requires some non-trivial refactoring of the message sending logic, so I tabled it for the moment.
This means that the current behavior of waitConfirm=true guarantees the token transfer has occurred, but does not totally guarantee the message data has been received (although it should have been in most cases).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of where we were aiming with this feature, was that the transaction object would be the thing we would generate an event on, and as such the sync/async code would need to have a path where the thing in-flight wasn't a message, but instead a higher-level transaction.
... going to look through the rest of the changes to see if we've learned more that means this approach didn't work out, or if we're still on that path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a transaction in flight, but it's possible I'm resolving that prematurely. We discussed holding the message confirmation until the transfer completes, which is what I tackled in the event aggregation - but I think I am still missing a few pieces of the puzzle here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to close the loop on the comments here, per the summary #245 (comment) when we're done with this, we will block until the message confirms.
However, that's not a blocker on merging this PR as we've done the architectural work to know where we're up to
internal/events/aggregator.go
Outdated
| if err != nil { | ||
| return false, err | ||
| } else if len(transfers) == 0 { | ||
| return false, fmt.Errorf("transfer for message '%s' not available", msg.Hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It felt like maybe I should be able to return false, nil here - but that didn't seem to trigger retries. Perhaps there's an alternate path that I'm supposed to trigger explicitly if the transfer comes in after the message? @peterbroadhurst maybe you can offer some insight?
On the other hand, if returning an error is the correct/optimal route here, I guess maybe I should add it to i18n - this was essentially just a placeholder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to spend some time looking at the private message correlation case. This certainly doesn't seem like an Errorf condition to me - it's simply that things arrived in the other of two potential orders, which given the use of two independent blockchains we need to be able to process in either order with deterministic outcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do also need rules on which order we dispatch the events in. Is it the order of the messages, or the order of the transfers. That needs to be deterministic. Given transfers could be happening for lots of different reasons, I believe it's impossible for it to be the order of the transfers, so that means we have to block further messages on a given topic until a transfer has been completed associated with an earlier message.
Or we need to have some other rule that allows applications to function, without needing to have their own stateful message aggregation capability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's where we deal with a blockchain transaction arriving before the message, within the correlation of a private pinned message (note the logging is Debug level - which I think is appropriate):
firefly/internal/events/aggregator.go
Line 177 in 06dedc5
| l.Debugf("Batch %s not available - pin %s is parked", pin.Batch, pin.Hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can discuss it, but my current opinion as I review things here is that:
The blockchain event for a transfer that contains a message reference, needs to be added to the pins table, and be processed by the aggregator sequentially with other pins.
The object structure I think we discussed is:

- Messages of type
transfer*contain an extra data array element that is the Transaction object - The Transaction object contains a reference to the UUID of a token transfer
This means we can have:
- Pins that refer to indexes of a message within a batch
- Pins that refer to the UUID of a transfer
Note this means some tweaking to the pins table. Specifically I think we need a type field (which maybe this is performance sensitive enough we want to make it a numeric), and to rename batch to ref - as it might not be a batch.
I'd thought we could avoid that, and consider transfers themselves to be pins. But the problem is how you get a single deterministic order of processing, without making the aggregator listen to two tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I was struggling with the charter of what is considered a pin.
We did discuss sharing the transaction details as part of the message, but I didn't include that yet, because I wasn't sure if it was needed. In the case of pool creation, the transaction ID had to be shared via message, because there was no guarantee of a blockchain transaction happening (which is the "normal" way that transaction IDs propagate currently). As of #239, I am assuming that all transfers include a blockchain transaction, and the FireFly transaction ID is shared as part of that blockchain transaction - therefore it was not necessary to also share the transaction ID in the message. But sounds like it may still be needed in both pieces (the transfer and the message) to tie them together.
peterbroadhurst
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @awrichar - I've probably raised more questions than answers here sorry.
The code proposal is awesome in moving the ball forwards, but I think we need a bit more consideration to the points above before we've got the determinism we're hoping to promise apps on these coordinated transfers.
| } | ||
| switch in.Header.Type { | ||
| case fftypes.MessageTypeTransferBroadcast: | ||
| return am.broadcast.BroadcastMessage(ctx, ns, in, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of where we were aiming with this feature, was that the transaction object would be the thing we would generate an event on, and as such the sync/async code would need to have a path where the thing in-flight wasn't a message, but instead a higher-level transaction.
... going to look through the rest of the changes to see if we've learned more that means this approach didn't work out, or if we're still on that path.
internal/events/aggregator.go
Outdated
| if err != nil { | ||
| return false, err | ||
| } else if len(transfers) == 0 { | ||
| return false, fmt.Errorf("transfer for message '%s' not available", msg.Hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to spend some time looking at the private message correlation case. This certainly doesn't seem like an Errorf condition to me - it's simply that things arrived in the other of two potential orders, which given the use of two independent blockchains we need to be able to process in either order with deterministic outcome.
internal/events/aggregator.go
Outdated
| if err != nil { | ||
| return false, err | ||
| } else if len(transfers) == 0 { | ||
| return false, fmt.Errorf("transfer for message '%s' not available", msg.Hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do also need rules on which order we dispatch the events in. Is it the order of the messages, or the order of the transfers. That needs to be deterministic. Given transfers could be happening for lots of different reasons, I believe it's impossible for it to be the order of the transfers, so that means we have to block further messages on a given topic until a transfer has been completed associated with an earlier message.
Or we need to have some other rule that allows applications to function, without needing to have their own stateful message aggregation capability.
internal/events/aggregator.go
Outdated
| if err != nil { | ||
| return false, err | ||
| } else if len(transfers) == 0 { | ||
| return false, fmt.Errorf("transfer for message '%s' not available", msg.Hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's where we deal with a blockchain transaction arriving before the message, within the correlation of a private pinned message (note the logging is Debug level - which I think is appropriate):
firefly/internal/events/aggregator.go
Line 177 in 06dedc5
| l.Debugf("Batch %s not available - pin %s is parked", pin.Batch, pin.Hash) |
internal/events/aggregator.go
Outdated
| if err != nil { | ||
| return false, err | ||
| } else if len(transfers) == 0 { | ||
| return false, fmt.Errorf("transfer for message '%s' not available", msg.Hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can discuss it, but my current opinion as I review things here is that:
The blockchain event for a transfer that contains a message reference, needs to be added to the pins table, and be processed by the aggregator sequentially with other pins.
The object structure I think we discussed is:

- Messages of type
transfer*contain an extra data array element that is the Transaction object - The Transaction object contains a reference to the UUID of a token transfer
This means we can have:
- Pins that refer to indexes of a message within a batch
- Pins that refer to the UUID of a transfer
Note this means some tweaking to the pins table. Specifically I think we need a type field (which maybe this is performance sensitive enough we want to make it a numeric), and to rename batch to ref - as it might not be a batch.
I'd thought we could avoid that, and consider transfers themselves to be pins. But the problem is how you get a single deterministic order of processing, without making the aggregator listen to two tables.
|
I've started working on a flow chart, to help flush out the challenges here. |
|
So a bit more thinking (thanks @awrichar), and actually there is not a big gap from how the code is in this PR and what we need. So instead, we just need to to perform a rewind when transfers come in, and (as the code does already in this PR) add an additional check on transfer messges that the transfer is complete. Summary
As the above flowchart was helpful in describing the aggregator core function, I've updated it as follows: |
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
Codecov Report
@@ Coverage Diff @@
## main #245 +/- ##
=========================================
Coverage 100.00% 100.00%
=========================================
Files 214 214
Lines 11820 11860 +40
=========================================
+ Hits 11820 11860 +40
Continue to review full report at Codecov.
|
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
|
I've pushed a few more commits that hopefully address the majority of the concerns here. Some highlights of the behavior in the context of pins and ordering (reiterating some of what @peterbroadhurst stated above):
Some items that are still outstanding:
|
peterbroadhurst
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last question on LocalID @awrichar, and a couple of comments to state my understanding of where we'll be when this PR drops, and what's left.
| } | ||
| switch in.Header.Type { | ||
| case fftypes.MessageTypeTransferBroadcast: | ||
| return am.broadcast.BroadcastMessage(ctx, ns, in, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to close the loop on the comments here, per the summary #245 (comment) when we're done with this, we will block until the message confirms.
However, that's not a blocker on merging this PR as we've done the architectural work to know where we're up to
| transfer.MessageHash = msg.Hash | ||
| } | ||
|
|
||
| result, err := am.transferTokensWithID(ctx, fftypes.NewUUID(), ns, typeName, poolName, &transfer.TokenTransfer, waitConfirm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's my understanding @awrichar that this will switch around, so that in the case of a message+transfer we'll no longer use syncasync.SendConfirmTokenTransfer to block until the transfer completes, but instead will restructure the code to allow the transfer to be fired off with the message hash, but then to block on the message returning. But that's going to be in a follow-on PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#249 is my parallel attempt to restructure the messaging code in order to have a hook at the point that the message is sealed but not sent. Hopefully I can leverage that to fire off the token transfer just before firing the message, and then wait for the message to be confirmed. Once both of these PRs are merged, I'll tackle that follow-on change.
| if err != nil { | ||
| return false, err | ||
| } | ||
| if len(operations) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@awrichar - I need a bit of help understanding why we're using an operation here to find the LocalID of the transfer. I had thought this was something we were passing through the data of the transfer, so that it could be consistent on all nodes in the case of a FireFly initiated transfer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short answer - either route is definitely possible.
I only ended up here because I felt it was easier to explain - in this case, each node always assigns a different LocalID for each transfer, and that ID is never shared with other nodes or written to the chain.
We could easily stay with the original route - where the ID is shared on the chain for FireFly-initiated transfers, but assigned randomly by each node for non-FireFly-initiated transfers. It just felt a little more confusing. I foresee cases of "why does this transfer ID match across nodes but this other one does not?"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 - going with "local means local" sounds good to me.
I understand now why we're looking at the operation here, because the transfer ID needs to be correlated with an ID, and that ID needs to be known at the beginning of the process by the submitter (only) - because they might be blocking with confirm for it to complete.
peterbroadhurst
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍


The message hash will be recorded with the transfer, and the message will
not be considered confirmed until the transfer is also confirmed.