Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5102,14 +5102,120 @@ paths:
schema:
properties:
amount: {}
created: {}
from:
type: string
key:
type: string
localId: {}
message:
properties:
batch: {}
confirmed: {}
data:
items:
properties:
blob:
properties:
hash: {}
public:
type: string
type: object
datatype:
properties:
name:
type: string
version:
type: string
type: object
hash: {}
id: {}
validator:
type: string
value:
format: byte
type: string
type: object
type: array
group:
properties:
ledger: {}
members:
items:
properties:
identity:
type: string
node:
type: string
type: object
type: array
name:
type: string
type: object
hash: {}
header:
properties:
author:
type: string
cid: {}
created: {}
datahash: {}
group: {}
id: {}
key:
type: string
namespace:
type: string
tag:
type: string
topics:
items:
type: string
type: array
txtype:
type: string
type:
enum:
- definition
- broadcast
- private
- groupinit
- transfer_broadcast
- transfer_private
type: string
type: object
local:
type: boolean
pending:
type: boolean
pins:
items:
type: string
type: array
rejected:
type: boolean
type: object
messageHash: {}
poolProtocolId:
type: string
protocolId:
type: string
to:
type: string
tokenIndex:
type: string
tx:
properties:
id: {}
type:
type: string
type: object
type:
enum:
- mint
- burn
- transfer
type: string
type: object
responses:
"200":
Expand Down
4 changes: 2 additions & 2 deletions internal/apiserver/route_post_token_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ var postTokenTransfer = &oapispec.Route{
},
FilterFactory: nil,
Description: i18n.MsgTBD,
JSONInputValue: func() interface{} { return &fftypes.TokenTransfer{} },
JSONInputValue: func() interface{} { return &fftypes.TokenTransferInput{} },
JSONInputMask: []string{"Type", "LocalID", "PoolProtocolID", "ProtocolID", "MessageHash", "TX", "Created"},
JSONOutputValue: func() interface{} { return &fftypes.TokenTransfer{} },
JSONOutputCodes: []int{http.StatusAccepted, http.StatusOK},
JSONHandler: func(r *oapispec.APIRequest) (output interface{}, err error) {
waitConfirm := strings.EqualFold(r.QP["confirm"], "true")
r.SuccessStatus = syncRetcode(waitConfirm)
return r.Or.Assets().TransferTokens(r.Ctx, r.PP["ns"], r.PP["type"], r.PP["name"], r.Input.(*fftypes.TokenTransfer), waitConfirm)
return r.Or.Assets().TransferTokens(r.Ctx, r.PP["ns"], r.PP["type"], r.PP["name"], r.Input.(*fftypes.TokenTransferInput), waitConfirm)
},
}
4 changes: 2 additions & 2 deletions internal/apiserver/route_post_token_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ func TestPostTokenTransfer(t *testing.T) {
o, r := newTestAPIServer()
mam := &assetmocks.Manager{}
o.On("Assets").Return(mam)
input := fftypes.TokenTransfer{}
input := fftypes.TokenTransferInput{}
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&input)
req := httptest.NewRequest("POST", "/api/v1/namespaces/ns1/tokens/tok1/pools/pool1/transfers", &buf)
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

mam.On("TransferTokens", mock.Anything, "ns1", "tok1", "pool1", mock.AnythingOfType("*fftypes.TokenTransfer"), false).
mam.On("TransferTokens", mock.Anything, "ns1", "tok1", "pool1", mock.AnythingOfType("*fftypes.TokenTransferInput"), false).
Return(&fftypes.TokenTransfer{}, nil)
r.ServeHTTP(res, req)

Expand Down
52 changes: 37 additions & 15 deletions internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hyperledger/firefly/internal/data"
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/identity"
"github.com/hyperledger/firefly/internal/privatemessaging"
"github.com/hyperledger/firefly/internal/retry"
"github.com/hyperledger/firefly/internal/syncasync"
"github.com/hyperledger/firefly/internal/txcommon"
Expand All @@ -42,11 +43,10 @@ type Manager interface {
GetTokenTransfers(ctx context.Context, ns, typeName, poolName string, filter database.AndFilter) ([]*fftypes.TokenTransfer, *database.FilterResult, error)
MintTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error)
BurnTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error)
TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error)
TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error)

// Bound token callbacks
TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error
TokensTransferred(tk tokens.Plugin, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) error

Start() error
WaitStop()
Expand All @@ -59,13 +59,14 @@ type assetManager struct {
data data.Manager
syncasync syncasync.Bridge
broadcast broadcast.Manager
messaging privatemessaging.Manager
tokens map[string]tokens.Plugin
retry retry.Retry
txhelper txcommon.Helper
}

func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, ti map[string]tokens.Plugin) (Manager, error) {
if di == nil || im == nil || sa == nil || bm == nil || ti == nil {
func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin) (Manager, error) {
if di == nil || im == nil || sa == nil || bm == nil || pm == nil || ti == nil {
return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError)
}
am := &assetManager{
Expand All @@ -75,6 +76,7 @@ func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manage
data: dm,
syncasync: sa,
broadcast: bm,
messaging: pm,
tokens: ti,
retry: retry.Retry{
InitialDelay: config.GetDuration(config.AssetManagerRetryInitialDelay),
Expand Down Expand Up @@ -119,21 +121,13 @@ func retrieveTokenPoolCreateInputs(ctx context.Context, op *fftypes.Operation, p
return nil
}

// Note: the counterpart to below (retrieveTokenTransferInputs) lives in the events package
func addTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) {
op.Input = fftypes.JSONObject{
"id": transfer.LocalID.String(),
}
}

func retrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) {
input := &op.Input
transfer.LocalID, err = fftypes.ParseUUID(ctx, input.GetString("id"))
if err != nil {
return err
}
return nil
}

func (am *assetManager) CreateTokenPool(ctx context.Context, ns string, typeName string, pool *fftypes.TokenPool, waitConfirm bool) (*fftypes.TokenPool, error) {
return am.createTokenPoolWithID(ctx, fftypes.NewUUID(), ns, typeName, pool, waitConfirm)
}
Expand Down Expand Up @@ -291,7 +285,25 @@ func (am *assetManager) BurnTokens(ctx context.Context, ns, typeName, poolName s
return am.transferTokensWithID(ctx, fftypes.NewUUID(), ns, typeName, poolName, transfer, waitConfirm)
}

func (am *assetManager) TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) {
func (am *assetManager) sendTransferMessage(ctx context.Context, ns string, in *fftypes.MessageInOut) (*fftypes.Message, error) {
allowedTypes := []fftypes.FFEnum{
fftypes.MessageTypeTransferBroadcast,
fftypes.MessageTypeTransferPrivate,
}
if in.Header.Type == "" {
in.Header.Type = fftypes.MessageTypeTransferBroadcast
}
switch in.Header.Type {
case fftypes.MessageTypeTransferBroadcast:
return am.broadcast.BroadcastMessage(ctx, ns, in, false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that even in the waitConfirm=true case, the message is dispatched async. This is because dispatching the message synchronously would block forever waiting for the transfer to arrive.

Technically we could build and seal the message without sending it in this case, then record the hash, then send the transfer synchronously, then send the message synchronously. I spent some time going down that road and it requires some non-trivial refactoring of the message sending logic, so I tabled it for the moment.

This means that the current behavior of waitConfirm=true guarantees the token transfer has occurred, but does not totally guarantee the message data has been received (although it should have been in most cases).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of where we were aiming with this feature, was that the transaction object would be the thing we would generate an event on, and as such the sync/async code would need to have a path where the thing in-flight wasn't a message, but instead a higher-level transaction.

... going to look through the rest of the changes to see if we've learned more that means this approach didn't work out, or if we're still on that path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a transaction in flight, but it's possible I'm resolving that prematurely. We discussed holding the message confirmation until the transfer completes, which is what I tackled in the event aggregation - but I think I am still missing a few pieces of the puzzle here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to close the loop on the comments here, per the summary #245 (comment) when we're done with this, we will block until the message confirms.

However, that's not a blocker on merging this PR as we've done the architectural work to know where we're up to

case fftypes.MessageTypeTransferPrivate:
return am.messaging.SendMessage(ctx, ns, in, false)
default:
return nil, i18n.NewError(ctx, i18n.MsgInvalidMessageType, allowedTypes)
}
}

func (am *assetManager) TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error) {
transfer.Type = fftypes.TokenTransferTypeTransfer
if transfer.Key == "" {
org, err := am.identity.GetLocalOrganization(ctx)
Expand All @@ -309,7 +321,17 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns, typeName, poolNa
if transfer.From == transfer.To {
return nil, i18n.NewError(ctx, i18n.MsgCannotTransferToSelf)
}
return am.transferTokensWithID(ctx, fftypes.NewUUID(), ns, typeName, poolName, transfer, waitConfirm)

if transfer.Message != nil {
msg, err := am.sendTransferMessage(ctx, ns, transfer.Message)
if err != nil {
return nil, err
}
transfer.MessageHash = msg.Hash
}

result, err := am.transferTokensWithID(ctx, fftypes.NewUUID(), ns, typeName, poolName, &transfer.TokenTransfer, waitConfirm)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's my understanding @awrichar that this will switch around, so that in the case of a message+transfer we'll no longer use syncasync.SendConfirmTokenTransfer to block until the transfer completes, but instead will restructure the code to allow the transfer to be fired off with the message hash, but then to block on the message returning. But that's going to be in a follow-on PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#249 is my parallel attempt to restructure the messaging code in order to have a hook at the point that the message is sealed but not sent. Hopefully I can leverage that to fire off the token transfer just before firing the message, and then wait for the message to be confirmed. Once both of these PRs are merged, I'll tackle that follow-on change.

return result, err
}

func (am *assetManager) transferTokensWithID(ctx context.Context, id *fftypes.UUID, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) {
Expand Down
Loading