Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Manager interface {
GetTokenConnectors(ctx context.Context, ns string) ([]*fftypes.TokenConnector, error)

// Bound token callbacks
TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error
TokenPoolCreated(ti tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error

Start() error
WaitStop()
Expand Down
10 changes: 2 additions & 8 deletions internal/assets/token_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,11 @@ import (

"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/sysmessaging"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
)

// Note: the counterpart to below (retrieveTokenTransferInputs) lives in the events package
func addTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) {
op.Input = fftypes.JSONObject{
"id": transfer.LocalID.String(),
}
}

func (am *assetManager) GetTokenTransfers(ctx context.Context, ns string, filter database.AndFilter) ([]*fftypes.TokenTransfer, *database.FilterResult, error) {
return am.database.GetTokenTransfers(ctx, filter)
}
Expand Down Expand Up @@ -225,7 +219,7 @@ func (s *transferSender) resolveAndSend(ctx context.Context, waitConfirm bool) (
fftypes.OpTypeTokenTransfer,
fftypes.OpStatusPending,
"")
addTokenTransferInputs(op, &s.transfer.TokenTransfer)
txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer)
if err := s.mgr.database.UpsertOperation(ctx, op, false); err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions internal/events/operation_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,13 @@ func (em *eventManager) OperationUpdate(plugin fftypes.Named, operationID *fftyp
if err := em.database.UpdateOperation(em.ctx, op.ID, update); err != nil {
return err
}

// Special handling for OpTypeTokenTransfer, which writes an event when it fails
if op.Type == fftypes.OpTypeTokenTransfer && txState == fftypes.OpStatusFailed {
event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID)
if err := em.database.InsertEvent(em.ctx, event); err != nil {
return err
}
}
return nil
}
12 changes: 2 additions & 10 deletions internal/events/tokens_transferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,12 @@ import (
"context"

"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
"github.com/hyperledger/firefly/pkg/tokens"
)

func retrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) {
input := &op.Input
transfer.LocalID, err = fftypes.ParseUUID(ctx, input.GetString("id"))
if err != nil {
return err
}
return nil
}

func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) {
transfer.LocalID = nil

Expand All @@ -48,7 +40,7 @@ func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string,
return false, err
}
if len(operations) > 0 {
err = retrieveTokenTransferInputs(ctx, operations[0], transfer)
err = txcommon.RetrieveTokenTransferInputs(ctx, operations[0], transfer)
if err != nil {
log.L(ctx).Warnf("Failed to read operation inputs for token transfer '%s': %s", transfer.ProtocolID, err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/i18n/en_translations.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,5 @@ var (
MsgFailedToDecodeCertificate = ffm("FF10286", "Failed to decode certificate: %s", 500)
MsgInvalidMessageType = ffm("FF10287", "Invalid message type - allowed types are %s", 400)
MsgNoUUID = ffm("FF10288", "Field '%s' must not be a UUID", 400)
MsgTokenTransferFailed = ffm("FF10289", "Token transfer with ID '%s' failed. Please check the FireFly logs for more information")
)
2 changes: 1 addition & 1 deletion internal/orchestrator/bound_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (bc *boundCallbacks) BlockchainOpUpdate(operationID *fftypes.UUID, txState
return bc.ei.OperationUpdate(bc.bi, operationID, txState, errorMessage, opOutput)
}

func (bc *boundCallbacks) TokensOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error {
func (bc *boundCallbacks) TokenOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error {
return bc.ei.OperationUpdate(plugin, operationID, txState, errorMessage, opOutput)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/bound_callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestBoundCallbacks(t *testing.T) {
assert.EqualError(t, err, "pop")

mei.On("OperationUpdate", mti, opID, fftypes.OpStatusFailed, "error info", info).Return(fmt.Errorf("pop"))
err = bc.TokensOpUpdate(mti, opID, fftypes.OpStatusFailed, "error info", info)
err = bc.TokenOpUpdate(mti, opID, fftypes.OpStatusFailed, "error info", info)
assert.EqualError(t, err, "pop")

mei.On("TransferResult", mdx, "tracking12345", fftypes.OpStatusFailed, "error info", info).Return(fmt.Errorf("pop"))
Expand Down
61 changes: 46 additions & 15 deletions internal/syncasync/sync_async_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/sysmessaging"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
)
Expand Down Expand Up @@ -146,9 +147,8 @@ func (inflight *inflightRequest) msInflight() float64 {
return float64(dur) / float64(time.Millisecond)
}

func (sa *syncAsyncBridge) getMessageFromEvent(event *fftypes.EventDelivery) (*fftypes.Message, error) {
msg, err := sa.database.GetMessageByID(sa.ctx, event.Reference)
if err != nil {
func (sa *syncAsyncBridge) getMessageFromEvent(event *fftypes.EventDelivery) (msg *fftypes.Message, err error) {
if msg, err = sa.database.GetMessageByID(sa.ctx, event.Reference); err != nil {
return nil, err
}
if msg == nil {
Expand All @@ -158,9 +158,8 @@ func (sa *syncAsyncBridge) getMessageFromEvent(event *fftypes.EventDelivery) (*f
return msg, nil
}

func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (*fftypes.TokenPool, error) {
pool, err := sa.database.GetTokenPoolByID(sa.ctx, event.Reference)
if err != nil {
func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (pool *fftypes.TokenPool, err error) {
if pool, err = sa.database.GetTokenPoolByID(sa.ctx, event.Reference); err != nil {
return nil, err
}
if pool == nil {
Expand All @@ -170,9 +169,8 @@ func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (*ffty
return pool, nil
}

func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (*fftypes.TokenTransfer, error) {
transfer, err := sa.database.GetTokenTransfer(sa.ctx, event.Reference)
if err != nil {
func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (transfer *fftypes.TokenTransfer, err error) {
if transfer, err = sa.database.GetTokenTransfer(sa.ctx, event.Reference); err != nil {
return nil, err
}
if transfer == nil {
Expand All @@ -182,6 +180,17 @@ func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (*
return transfer, nil
}

func (sa *syncAsyncBridge) getOperationFromEvent(event *fftypes.EventDelivery) (op *fftypes.Operation, err error) {
if op, err = sa.database.GetOperationByID(sa.ctx, event.Reference); err != nil {
return nil, err
}
if op == nil {
// This should not happen (but we need to move on)
log.L(sa.ctx).Errorf("Unable to resolve operation '%s' for %s event '%s'", event.Reference, event.Type, event.ID)
}
return op, nil
}

func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error {
sa.inflightMux.Lock()
defer sa.inflightMux.Unlock()
Expand Down Expand Up @@ -218,7 +227,7 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error {
// See if this is a rejection of an inflight message
inflight := sa.getInFlight(event.Namespace, messageConfirm, msg.Header.ID)
if inflight != nil {
go sa.resolveRejected(inflight, msg)
go sa.resolveRejected(inflight, msg.Header.ID)
}

case fftypes.EventTypePoolConfirmed:
Expand All @@ -240,7 +249,7 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error {
// See if this is a rejection of an inflight token pool
inflight := sa.getInFlight(event.Namespace, tokenPoolConfirm, pool.ID)
if inflight != nil {
go sa.resolveRejectedTokenPool(inflight, pool)
go sa.resolveRejectedTokenPool(inflight, pool.ID)
}

case fftypes.EventTypeTransferConfirmed:
Expand All @@ -253,6 +262,22 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error {
if inflight != nil {
go sa.resolveConfirmedTokenTransfer(inflight, transfer)
}

case fftypes.EventTypeTransferOpFailed:
op, err := sa.getOperationFromEvent(event)
if err != nil || op == nil {
return err
}
// Extract the LocalID of the transfer
var transfer fftypes.TokenTransfer
if err := txcommon.RetrieveTokenTransferInputs(sa.ctx, op, &transfer); err != nil {
log.L(sa.ctx).Warnf("Failed to extract token transfer inputs for operation '%s': %s", op.ID, err)
}
// See if this is a failure of an inflight token transfer operation
inflight := sa.getInFlight(event.Namespace, tokenTransferConfirm, transfer.LocalID)
if inflight != nil {
go sa.resolveFailedTokenTransfer(inflight, transfer.LocalID)
}
}

return nil
Expand All @@ -276,8 +301,8 @@ func (sa *syncAsyncBridge) resolveConfirmed(inflight *inflightRequest, msg *ffty
inflight.response <- inflightResponse{id: msg.Header.ID, data: msg}
}

func (sa *syncAsyncBridge) resolveRejected(inflight *inflightRequest, msg *fftypes.Message) {
err := i18n.NewError(sa.ctx, i18n.MsgRejected, msg.Header.ID)
func (sa *syncAsyncBridge) resolveRejected(inflight *inflightRequest, msgID *fftypes.UUID) {
err := i18n.NewError(sa.ctx, i18n.MsgRejected, msgID)
log.L(sa.ctx).Errorf("Resolving message confirmation request '%s' with error: %s", inflight.id, err)
inflight.response <- inflightResponse{err: err}
}
Expand All @@ -287,8 +312,8 @@ func (sa *syncAsyncBridge) resolveConfirmedTokenPool(inflight *inflightRequest,
inflight.response <- inflightResponse{id: pool.ID, data: pool}
}

func (sa *syncAsyncBridge) resolveRejectedTokenPool(inflight *inflightRequest, pool *fftypes.TokenPool) {
err := i18n.NewError(sa.ctx, i18n.MsgTokenPoolRejected, pool.ID)
func (sa *syncAsyncBridge) resolveRejectedTokenPool(inflight *inflightRequest, poolID *fftypes.UUID) {
err := i18n.NewError(sa.ctx, i18n.MsgTokenPoolRejected, poolID)
log.L(sa.ctx).Errorf("Resolving token pool confirmation request '%s' with error '%s'", inflight.id, err)
inflight.response <- inflightResponse{err: err}
}
Expand All @@ -298,6 +323,12 @@ func (sa *syncAsyncBridge) resolveConfirmedTokenTransfer(inflight *inflightReque
inflight.response <- inflightResponse{id: transfer.LocalID, data: transfer}
}

func (sa *syncAsyncBridge) resolveFailedTokenTransfer(inflight *inflightRequest, transferID *fftypes.UUID) {
err := i18n.NewError(sa.ctx, i18n.MsgTokenTransferFailed, transferID)
log.L(sa.ctx).Debugf("Resolving token transfer confirmation request '%s' with error '%s'", inflight.id, err)
inflight.response <- inflightResponse{err: err}
}

func (sa *syncAsyncBridge) sendAndWait(ctx context.Context, ns string, id *fftypes.UUID, reqType requestType, send RequestSender) (interface{}, error) {
inflight, err := sa.addInFlight(ns, id, reqType)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/tokens/fftokens/fftokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (ft *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject)
replyType = fftypes.OpStatusFailed
}
l.Infof("Tokens '%s' reply: request=%s message=%s", replyType, requestID, message)
return ft.callbacks.TokensOpUpdate(ft, operationID, replyType, message, data)
return ft.callbacks.TokenOpUpdate(ft, operationID, replyType, message, data)
}

func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSONObject) (err error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/tokens/fftokens/fftokens_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,11 @@ func TestEvents(t *testing.T) {
fromServer <- `{"id":"3","event":"receipt","data":{"id":"abc"}}`

// receipt: success
mcb.On("TokensOpUpdate", h, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(nil).Once()
mcb.On("TokenOpUpdate", h, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(nil).Once()
fromServer <- `{"id":"4","event":"receipt","data":{"id":"` + opID.String() + `","success":true}}`

// receipt: failure
mcb.On("TokensOpUpdate", h, opID, fftypes.OpStatusFailed, "", mock.Anything).Return(nil).Once()
mcb.On("TokenOpUpdate", h, opID, fftypes.OpStatusFailed, "", mock.Anything).Return(nil).Once()
fromServer <- `{"id":"5","event":"receipt","data":{"id":"` + opID.String() + `","success":false}}`

// token-pool: missing data
Expand Down
38 changes: 38 additions & 0 deletions internal/txcommon/token_inputs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright © 2021 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package txcommon

import (
"context"

"github.com/hyperledger/firefly/pkg/fftypes"
)

func AddTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) {
op.Input = fftypes.JSONObject{
"id": transfer.LocalID.String(),
}
}

func RetrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) {
input := &op.Input
transfer.LocalID, err = fftypes.ParseUUID(ctx, input.GetString("id"))
if err != nil {
return err
}
return nil
}
8 changes: 4 additions & 4 deletions mocks/assetmocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions mocks/tokenmocks/callbacks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/fftypes/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (
EventTypePoolRejected EventType = ffEnum("eventtype", "token_pool_rejected")
// EventTypeTransferConfirmed occurs when a token transfer has been confirmed
EventTypeTransferConfirmed EventType = ffEnum("eventtype", "token_transfer_confirmed")
// EventTypeTransferOpFailed occurs when a token transfer submitted by this node has failed (based on feedback from connector)
EventTypeTransferOpFailed EventType = ffEnum("eventtype", "token_transfer_op_failed")
)

// Event is an activity in the system, delivered reliably to applications, that indicates something has happened in the network
Expand Down
2 changes: 1 addition & 1 deletion pkg/tokens/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Callbacks interface {
// Only the party submitting the transaction will see this data.
//
// Error should will only be returned in shutdown scenarios
TokensOpUpdate(plugin Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error
TokenOpUpdate(plugin Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error

// TokenPoolCreated notifies on the creation of a new token pool, which might have been
// submitted by us, or by any other authorized party in the network.
Expand Down