From 3060445d675779fad6ccfede38c3efdd1dd20afc Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Wed, 29 Jun 2022 12:07:26 -0400 Subject: [PATCH 1/5] Track blockchain callback handlers per namespace Signed-off-by: Andrew Richardson --- internal/blockchain/ethereum/ethereum.go | 7 ++-- internal/blockchain/ethereum/ethereum_test.go | 19 +++++---- internal/blockchain/fabric/fabric.go | 7 ++-- internal/blockchain/fabric/fabric_test.go | 41 ++++++++++--------- internal/orchestrator/orchestrator.go | 2 +- internal/orchestrator/orchestrator_test.go | 6 +-- mocks/blockchainmocks/plugin.go | 6 +-- pkg/blockchain/plugin.go | 3 +- 8 files changed, 48 insertions(+), 43 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 9449a8ffcb..d9cefaf6cb 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -72,7 +72,7 @@ type Ethereum struct { } type callbacks struct { - handlers []blockchain.Callbacks + handlers map[string]blockchain.Callbacks } func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { @@ -190,6 +190,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr e.ctx = log.WithLogField(ctx, "proto", "ethereum") e.metrics = metrics e.capabilities = &blockchain.Capabilities{} + e.callbacks.handlers = make(map[string]blockchain.Callbacks) if addressResolverConf.GetString(AddressResolverURLTemplate) != "" { if e.addressResolver, err = newAddressResolver(ctx, addressResolverConf); err != nil { @@ -239,8 +240,8 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr return nil } -func (e *Ethereum) SetHandler(handler blockchain.Callbacks) { - e.callbacks.handlers = append(e.callbacks.handlers, handler) +func (e *Ethereum) SetHandler(namespace string, handler blockchain.Callbacks) { + e.callbacks.handlers[namespace] = handler } func (e *Ethereum) Start() (err error) { diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 9ae9304a5b..705e591e34 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -92,6 +92,7 @@ func newTestEthereum() (*Ethereum, func()) { wsconn: wsm, metrics: mm, } + e.callbacks.handlers = make(map[string]blockchain.Callbacks) return e, func() { cancel() if e.closed != nil { @@ -811,7 +812,7 @@ func TestHandleMessageBatchPinOK(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} @@ -936,7 +937,7 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} @@ -999,7 +1000,7 @@ func TestHandleMessageBatchPinExit(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" @@ -1229,7 +1230,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { e := &Ethereum{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, wsconn: wsm, } @@ -1299,7 +1300,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) { }`) em := &blockchainmocks.Callbacks{} - e.SetHandler(em) + e.SetHandler("ns1", em) txsu := em.On("BlockchainOpUpdate", e, "ns1:"+operationID.String(), @@ -1544,7 +1545,7 @@ func TestHandleMessageContractEvent(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" @@ -1606,7 +1607,7 @@ func TestHandleMessageContractEventError(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" @@ -2851,7 +2852,7 @@ func TestHandleNetworkAction(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" @@ -2898,7 +2899,7 @@ func TestHandleNetworkActionFail(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index a0436face1..31ed567bb2 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -63,7 +63,7 @@ type Fabric struct { } type callbacks struct { - handlers []blockchain.Callbacks + handlers map[string]blockchain.Callbacks } func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { @@ -197,6 +197,7 @@ func (f *Fabric) Init(ctx context.Context, config config.Section, metrics metric f.idCache = make(map[string]*fabIdentity) f.metrics = metrics f.capabilities = &blockchain.Capabilities{} + f.callbacks.handlers = make(map[string]blockchain.Callbacks) if fabconnectConf.GetString(ffresty.HTTPConfigURL) == "" { return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "blockchain.fabric.fabconnect") @@ -239,8 +240,8 @@ func (f *Fabric) Init(ctx context.Context, config config.Section, metrics metric return nil } -func (f *Fabric) SetHandler(handler blockchain.Callbacks) { - f.callbacks.handlers = append(f.callbacks.handlers, handler) +func (f *Fabric) SetHandler(namespace string, handler blockchain.Callbacks) { + f.callbacks.handlers[namespace] = handler } func (f *Fabric) Start() (err error) { diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 8958e2f0b6..0d0d242eef 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -63,6 +63,7 @@ func newTestFabric() (*Fabric, func()) { prefixLong: defaultPrefixLong, wsconn: wsm, } + e.callbacks.handlers = make(map[string]blockchain.Callbacks) return e, func() { cancel() if e.closed != nil { @@ -733,7 +734,7 @@ func TestHandleMessageBatchPinOK(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" @@ -781,7 +782,7 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" @@ -829,7 +830,7 @@ func TestHandleMessageBatchPinExit(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" @@ -863,7 +864,7 @@ func TestHandleMessageBatchPinEmpty(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" @@ -890,7 +891,7 @@ func TestHandleMessageUnknownEventName(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" @@ -906,7 +907,7 @@ func TestHandleMessageUnknownEventName(t *testing.T) { func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" @@ -930,7 +931,7 @@ func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { func TestHandleMessageBatchPinBadPin(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" @@ -953,7 +954,7 @@ func TestHandleMessageBatchPinBadPin(t *testing.T) { func TestHandleMessageBatchPinBadPayloadEncoding(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" @@ -977,7 +978,7 @@ func TestHandleMessageBatchPinBadPayloadEncoding(t *testing.T) { func TestHandleMessageBatchPinBadPayloadUUIDs(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" @@ -1000,7 +1001,7 @@ func TestHandleMessageBatchPinBadPayloadUUIDs(t *testing.T) { func TestHandleMessageBatchBadJSON(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } err := e.handleMessageBatch(context.Background(), []interface{}{10, 20}) assert.NoError(t, err) @@ -1071,7 +1072,7 @@ func TestEventLoopUnexpectedMessage(t *testing.T) { "requestPayload": "{\"from\":\"0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635\",\"gas\":0,\"gasPrice\":0,\"headers\":{\"id\":\"6fb94fff-81d3-4094-567d-e031b1871694\",\"type\":\"SendTransaction\"},\"method\":{\"inputs\":[{\"internalType\":\"bytes32\",\"name\":\"txnId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"batchId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"payloadRef\",\"type\":\"bytes32\"}],\"name\":\"broadcastBatch\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},\"params\":[\"12345\",\"!\",\"!\"],\"to\":\"0xd3266a857285fb75eb7df37353b4a15c8bb828f5\",\"value\":0}" }`) em := &blockchainmocks.Callbacks{} - e.SetHandler(em) + e.SetHandler("ns1", em) txsu := em.On("BlockchainOpUpdate", e, "ns1:"+operationID.String(), @@ -1097,7 +1098,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, wsconn: wsm, } @@ -1137,7 +1138,7 @@ func TestHandleReceiptNoRequestID(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, wsconn: wsm, } @@ -1154,7 +1155,7 @@ func TestHandleReceiptFailedTx(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, wsconn: wsm, } @@ -1346,7 +1347,7 @@ func TestHandleMessageContractEvent(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" @@ -1405,7 +1406,7 @@ func TestHandleMessageContractEventNoPayload(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" @@ -1432,7 +1433,7 @@ func TestHandleMessageContractEventBadPayload(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-cb37cc07-e873-4f58-44ab-55add6bba320"] = "ns1" @@ -1461,7 +1462,7 @@ func TestHandleMessageContractEventError(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" @@ -1836,7 +1837,7 @@ func TestHandleNetworkAction(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} @@ -1876,7 +1877,7 @@ func TestHandleNetworkActionFail(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } e.subs = map[string]string{} e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 5957c1d30f..3417a34c5f 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -352,7 +352,7 @@ func (or *orchestrator) MultiParty() multiparty.Manager { func (or *orchestrator) initPlugins(ctx context.Context) (err error) { or.plugins.Database.Plugin.SetHandler(or.namespace, or) - or.plugins.Blockchain.Plugin.SetHandler(&or.bc) + or.plugins.Blockchain.Plugin.SetHandler(or.namespace, &or.bc) or.plugins.SharedStorage.Plugin.SetHandler(or.namespace, &or.bc) fb := database.IdentityQueryFactory.NewFilter(ctx) diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 5a255f9ce6..c9f93fe3ed 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -187,7 +187,7 @@ func TestInitOK(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) or.mdi.On("SetHandler", "ns", mock.Anything).Return() - or.mbi.On("SetHandler", mock.Anything).Return() + or.mbi.On("SetHandler", "ns", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return([]*core.Identity{{}}, nil, nil) or.mdx.On("SetHandler", "ns", mock.Anything).Return() or.mdx.On("SetNodes", mock.Anything).Return() @@ -212,7 +212,7 @@ func TestInitTokenListenerFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) or.mdi.On("SetHandler", "ns", mock.Anything).Return() - or.mbi.On("SetHandler", mock.Anything).Return() + or.mbi.On("SetHandler", "ns", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return([]*core.Identity{{}}, nil, nil) or.mdx.On("SetHandler", "ns", mock.Anything).Return() or.mdx.On("SetNodes", mock.Anything).Return() @@ -226,7 +226,7 @@ func TestInitDataexchangeNodesFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) or.mdi.On("SetHandler", "ns", mock.Anything).Return() - or.mbi.On("SetHandler", mock.Anything).Return() + or.mbi.On("SetHandler", "ns", mock.Anything).Return() or.mps.On("SetHandler", "ns", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) ctx := context.Background() diff --git a/mocks/blockchainmocks/plugin.go b/mocks/blockchainmocks/plugin.go index e90c538821..ca4c4ae705 100644 --- a/mocks/blockchainmocks/plugin.go +++ b/mocks/blockchainmocks/plugin.go @@ -326,9 +326,9 @@ func (_m *Plugin) RemoveFireflySubscription(ctx context.Context, subID string) e return r0 } -// SetHandler provides a mock function with given fields: handler -func (_m *Plugin) SetHandler(handler blockchain.Callbacks) { - _m.Called(handler) +// SetHandler provides a mock function with given fields: namespace, handler +func (_m *Plugin) SetHandler(namespace string, handler blockchain.Callbacks) { + _m.Called(namespace, handler) } // Start provides a mock function with given fields: diff --git a/pkg/blockchain/plugin.go b/pkg/blockchain/plugin.go index 90580c45f2..42e129a146 100644 --- a/pkg/blockchain/plugin.go +++ b/pkg/blockchain/plugin.go @@ -36,7 +36,8 @@ type Plugin interface { Init(ctx context.Context, config config.Section, metrics metrics.Manager) error // SetHandler registers a handler to receive callbacks - SetHandler(handler Callbacks) + // If namespace is set, plugin will attempt to deliver only events for that namespace + SetHandler(namespace string, handler Callbacks) // Blockchain interface must not deliver any events until start is called Start() error From f30fba1385bbadac319e63bd64f37eef023fa8a3 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Wed, 29 Jun 2022 15:02:37 -0400 Subject: [PATCH 2/5] Send batch pin and network action events to the correct namespaces Signed-off-by: Andrew Richardson --- internal/blockchain/ethereum/ethereum.go | 131 ++++--- internal/blockchain/ethereum/ethereum_test.go | 328 +++++++++++++++-- internal/blockchain/fabric/fabric.go | 126 +++++-- internal/blockchain/fabric/fabric_test.go | 338 ++++++++++++++++-- internal/events/event_manager.go | 2 +- internal/events/network_action.go | 10 +- internal/events/network_action_test.go | 36 +- internal/multiparty/manager.go | 17 +- internal/multiparty/manager_test.go | 53 ++- internal/namespace/manager_test.go | 2 +- internal/orchestrator/bound_callbacks.go | 4 +- internal/orchestrator/bound_callbacks_test.go | 5 +- internal/tokens/fftokens/fftokens.go | 29 +- mocks/blockchainmocks/callbacks.go | 10 +- mocks/eventmocks/event_manager.go | 10 +- mocks/multipartymocks/manager.go | 10 +- pkg/blockchain/plugin.go | 2 +- 17 files changed, 876 insertions(+), 237 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index d9cefaf6cb..2b86a400d0 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -68,39 +68,56 @@ type Ethereum struct { addressResolver *addressResolver metrics metrics.Manager ethconnectConf config.Section - subs map[string]string + subs map[string]subscriptionInfo +} + +type subscriptionInfo struct { + namespace string + version int } type callbacks struct { handlers map[string]blockchain.Callbacks } -func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { - for _, cb := range cb.handlers { - cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) +func (cb *callbacks) BlockchainOpUpdate(ctx context.Context, plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { + namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID) + if handler, ok := cb.handlers[namespace]; ok { + handler.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) + return } + log.L(ctx).Errorf("No handler found for blockchain operation '%s'", nsOpID) } -func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { - for _, cb := range cb.handlers { - if err := cb.BatchPinComplete(batch, signingKey); err != nil { - return err - } +func (cb *callbacks) BatchPinComplete(ctx context.Context, batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { + if handler, ok := cb.handlers[batch.Namespace]; ok { + return handler.BatchPinComplete(batch, signingKey) } + log.L(ctx).Errorf("No handler found for blockchain batch pin on namespace '%s'", batch.Namespace) return nil } -func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - for _, cb := range cb.handlers { - if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil { - return err +func (cb *callbacks) BlockchainNetworkAction(ctx context.Context, namespace, action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { + if namespace == "" { + // V1 networks don't populate namespace, so deliver the event to every handler + for _, handler := range cb.handlers { + if err := handler.BlockchainNetworkAction(action, location, event, signingKey); err != nil { + return err + } + } + } else { + if handler, ok := cb.handlers[namespace]; ok { + return handler.BlockchainNetworkAction(action, location, event, signingKey) } + log.L(ctx).Errorf("No handler found for blockchain network action on namespace '%s'", namespace) } return nil } func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { for _, cb := range cb.handlers { + // Send the event to all handlers and let them match it to a contract listener + // TODO: can we push more listener/namespace knowledge down to this layer? if err := cb.BlockchainEvent(event); err != nil { return err } @@ -231,7 +248,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr return err } e.streamID = stream.ID - e.subs = make(map[string]string) + e.subs = make(map[string]subscriptionInfo) log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.streamID, e.topic) e.closed = make(chan struct{}) @@ -269,11 +286,16 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace string, if err != nil { return "", err } - // TODO: We will probably need to save the namespace AND network version here - // Ultimately there needs to be a logic branch in the event handling, where for "V1" we expect to receive a namespace in every - // BatchPin event, but for "V2" we infer the namespace based on which subscription ID produced it. - e.subs[sub.ID] = namespace + version, err := e.GetNetworkVersion(ctx, location) + if err != nil { + return "", err + } + + e.subs[sub.ID] = subscriptionInfo{ + namespace: namespace, + version: version, + } return sub.ID, nil } @@ -347,7 +369,7 @@ func (e *Ethereum) parseBlockchainEvent(ctx context.Context, msgJSON fftypes.JSO } } -func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { +func (e *Ethereum) handleBatchPinEvent(ctx context.Context, location *fftypes.JSONAny, subInfo *subscriptionInfo, msgJSON fftypes.JSONObject) (err error) { event := e.parseBlockchainEvent(ctx, msgJSON) if event == nil { return nil // move on @@ -378,7 +400,24 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON // Check if this is actually an operator action if strings.HasPrefix(nsOrAction, blockchain.FireFlyActionPrefix) { action := nsOrAction[len(blockchain.FireFlyActionPrefix):] - return e.callbacks.BlockchainNetworkAction(action, event, verifier) + + // For V1 of the FireFly contract, action is sent to all namespaces + // For V2+, namespace is inferred from the subscription + var namespace string + if subInfo.version > 1 { + namespace = subInfo.namespace + } + + return e.callbacks.BlockchainNetworkAction(ctx, namespace, action, location, event, verifier) + } + + // For V1 of the FireFly contract, namespace is passed explicitly + // For V2+, namespace is inferred from the subscription + var namespace string + if subInfo.version == 1 { + namespace = nsOrAction + } else { + namespace = subInfo.namespace } hexUUIDs, err := hex.DecodeString(strings.TrimPrefix(sUUIDs, "0x")) @@ -410,7 +449,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON } batch := &blockchain.BatchPin{ - Namespace: nsOrAction, + Namespace: namespace, TransactionID: &txnID, BatchID: &batchID, BatchHash: &batchHash, @@ -420,7 +459,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON } // If there's an error dispatching the event, we must return the error and shutdown - return e.callbacks.BatchPinComplete(batch, verifier) + return e.callbacks.BatchPinComplete(ctx, batch, verifier) } func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { @@ -451,7 +490,7 @@ func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject) updateType = core.OpStatusFailed } l.Infof("Ethconnect '%s' reply: request=%s tx=%s message=%s", replyType, requestID, txHash, message) - e.callbacks.BlockchainOpUpdate(e, requestID, updateType, txHash, message, reply) + e.callbacks.BlockchainOpUpdate(ctx, e, requestID, updateType, txHash, message, reply) } func (e *Ethereum) buildEventLocationString(msgJSON fftypes.JSONObject) string { @@ -477,10 +516,17 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{ l1.Tracef("Message: %+v", msgJSON) // Matches one of the active FireFly BatchPin subscriptions - if _, ok := e.subs[sub]; ok { + if subInfo, ok := e.subs[sub]; ok { + location, err := encodeContractLocation(ctx, &Location{ + Address: msgJSON.GetString("address"), + }) + if err != nil { + return err + } + switch signature { case broadcastBatchEventSignature: - if err := e.handleBatchPinEvent(ctx1, msgJSON); err != nil { + if err := e.handleBatchPinEvent(ctx1, location, &subInfo, msgJSON); err != nil { return err } default: @@ -720,15 +766,7 @@ func (e *Ethereum) NormalizeContractLocation(ctx context.Context, location *ffty if err != nil { return nil, err } - parsed.Address, err = validateEthAddress(ctx, parsed.Address) - if err != nil { - return nil, err - } - normalized, err := json.Marshal(parsed) - if err == nil { - result = fftypes.JSONAnyPtrBytes(normalized) - } - return result, err + return encodeContractLocation(ctx, parsed) } func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Location, error) { @@ -742,6 +780,18 @@ func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Loc return ðLocation, nil } +func encodeContractLocation(ctx context.Context, location *Location) (result *fftypes.JSONAny, err error) { + location.Address, err = validateEthAddress(ctx, location.Address) + if err != nil { + return nil, err + } + normalized, err := json.Marshal(location) + if err == nil { + result = fftypes.JSONAnyPtrBytes(normalized) + } + return result, err +} + func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) error { location, err := parseContractLocation(ctx, listener.Location) if err != nil { @@ -1158,16 +1208,9 @@ func (e *Ethereum) GetAndConvertDeprecatedContractConfig(ctx context.Context) (l } else if strings.HasPrefix(address, "/instances/") { address = strings.Replace(address, "/instances/", "", 1) } - address, err = validateEthAddress(ctx, address) - if err != nil { - return nil, "", err - } - ethLocation := &Location{ + location, err = encodeContractLocation(ctx, &Location{ Address: address, - } - normalized, _ := json.Marshal(ethLocation) - location = fftypes.JSONAnyPtrBytes(normalized) - - return location, fromBlock, nil + }) + return location, fromBlock, err } diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 705e591e34..a97ebef878 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -489,7 +489,7 @@ func TestInitAllExistingStreams(t *testing.T) { _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") assert.NoError(t, err) - assert.Equal(t, 3, httpmock.GetTotalCallCount()) + assert.Equal(t, 4, httpmock.GetTotalCallCount()) assert.Equal(t, "es12345", e.streamID) } @@ -815,8 +815,11 @@ func TestHandleMessageBatchPinOK(t *testing.T) { callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, @@ -871,6 +874,127 @@ func TestHandleMessageBatchPinOK(t *testing.T) { } +func TestHandleMessageBatchPinV2(t *testing.T) { + data := fftypes.JSONAnyPtr(` +[ + { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "data": { + "author": "0X91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "namespace": "", + "uuids": "0xe19af8b390604051812d7597d19adfb9847d3bfd074249efb65d3fed15f5b0a6", + "batchHash": "0xd71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", + "payloadRef": "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", + "contexts": [ + "0x68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", + "0x19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771" + ] + }, + "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", + "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])", + "logIndex": "50", + "timestamp": "1620576488" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Ethereum{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 2, + } + + expectedSigningKeyRef := &core.VerifierRef{ + Type: core.VerifierTypeEthAddress, + Value: "0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635", + } + + em.On("BatchPinComplete", mock.Anything, expectedSigningKeyRef, mock.Anything).Return(nil) + + var events []interface{} + err := json.Unmarshal(data.Bytes(), &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + + em.AssertExpectations(t) + + b := em.Calls[0].Arguments[0].(*blockchain.BatchPin) + assert.Equal(t, "ns1", b.Namespace) + assert.Equal(t, "e19af8b3-9060-4051-812d-7597d19adfb9", b.TransactionID.String()) + assert.Equal(t, "847d3bfd-0742-49ef-b65d-3fed15f5b0a6", b.BatchID.String()) + assert.Equal(t, "d71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", b.BatchHash.String()) + assert.Equal(t, "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", b.BatchPayloadRef) + assert.Equal(t, expectedSigningKeyRef, em.Calls[0].Arguments[1]) + assert.Len(t, b.Contexts, 2) + assert.Equal(t, "68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", b.Contexts[0].String()) + assert.Equal(t, "19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771", b.Contexts[1].String()) + + info1 := fftypes.JSONObject{ + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "logIndex": "50", + "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])", + "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "transactionIndex": "0x0", + "timestamp": "1620576488", + } + assert.Equal(t, info1, b.Event.Info) + +} + +func TestHandleMessageBatchPinMissingAddress(t *testing.T) { + data := fftypes.JSONAnyPtr(` +[ + { + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "data": { + "author": "0X91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "namespace": "ns1", + "uuids": "0xe19af8b390604051812d7597d19adfb9847d3bfd074249efb65d3fed15f5b0a6", + "batchHash": "0xd71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", + "payloadRef": "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", + "contexts": [ + "0x68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", + "0x19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771" + ] + }, + "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", + "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])", + "logIndex": "50", + "timestamp": "1620576488" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Ethereum{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + + var events []interface{} + err := json.Unmarshal(data.Bytes(), &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.Regexp(t, "FF10141", err) + +} + func TestHandleMessageBatchPinMissingAuthor(t *testing.T) { data := fftypes.JSONAnyPtr(` [ @@ -898,8 +1022,11 @@ func TestHandleMessageBatchPinMissingAuthor(t *testing.T) { ]`) e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } var events []interface{} err := json.Unmarshal(data.Bytes(), &events) @@ -940,8 +1067,11 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, @@ -1002,8 +1132,11 @@ func TestHandleMessageBatchPinExit(t *testing.T) { e := &Ethereum{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } em.On("BatchPinComplete", mock.Anything, expectedSigningKeyRef, mock.Anything).Return(fmt.Errorf("pop")) var events []interface{} @@ -1017,13 +1150,17 @@ func TestHandleMessageBatchPinExit(t *testing.T) { func TestHandleMessageBatchPinEmpty(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } var events []interface{} err := json.Unmarshal([]byte(` [ { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])" } @@ -1035,13 +1172,17 @@ func TestHandleMessageBatchPinEmpty(t *testing.T) { func TestHandleMessageBatchMissingData(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } var events []interface{} err := json.Unmarshal([]byte(` [ { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])", "timestamp": "1620576488" @@ -1054,8 +1195,12 @@ func TestHandleMessageBatchMissingData(t *testing.T) { func TestHandleMessageBatchPinBadTransactionID(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", @@ -1085,8 +1230,12 @@ func TestHandleMessageBatchPinBadTransactionID(t *testing.T) { func TestHandleMessageBatchPinBadIDentity(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", @@ -1116,8 +1265,12 @@ func TestHandleMessageBatchPinBadIDentity(t *testing.T) { func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", @@ -1147,8 +1300,12 @@ func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { func TestHandleMessageBatchPinBadPin(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", @@ -1547,8 +1704,12 @@ func TestHandleMessageContractEvent(t *testing.T) { e := &Ethereum{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { assert.Equal(t, "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", e.BlockchainTXID) assert.Equal(t, "000000038011/000000/000050", e.Event.ProtocolID) @@ -1609,8 +1770,12 @@ func TestHandleMessageContractEventError(t *testing.T) { e := &Ethereum{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop")) var events []interface{} @@ -2854,15 +3019,68 @@ func TestHandleNetworkAction(t *testing.T) { e := &Ethereum{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + + expectedSigningKeyRef := &core.VerifierRef{ + Type: core.VerifierTypeEthAddress, + Value: "0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635", + } + + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(nil) + + var events []interface{} + err := json.Unmarshal(data.Bytes(), &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + + em.AssertExpectations(t) + +} + +func TestHandleNetworkActionV2(t *testing.T) { + data := fftypes.JSONAnyPtr(` +[ + { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "data": { + "author": "0X91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "namespace": "firefly:terminate", + "uuids": "0x0000000000000000000000000000000000000000000000000000000000000000", + "batchHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "payloadRef": "", + "contexts": [] + }, + "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", + "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])", + "logIndex": "50", + "timestamp": "1620576488" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Ethereum{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 2, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635", } - em.On("BlockchainNetworkAction", "terminate", mock.Anything, expectedSigningKeyRef).Return(nil) + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(nil) var events []interface{} err := json.Unmarshal(data.Bytes(), &events) @@ -2901,15 +3119,18 @@ func TestHandleNetworkActionFail(t *testing.T) { e := &Ethereum{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635", } - em.On("BlockchainNetworkAction", "terminate", mock.Anything, expectedSigningKeyRef).Return(fmt.Errorf("pop")) + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(fmt.Errorf("pop")) var events []interface{} err := json.Unmarshal(data.Bytes(), &events) @@ -3316,6 +3537,43 @@ func TestAddFireflySubscriptionCreateError(t *testing.T) { httpmock.ActivateNonDefault(mockedClient) defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, eventStream{ID: "es12345"})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, subscription{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/", + httpmock.NewJsonResponderOrPanic(500, `pop`)) + + resetConf(e) + utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utEthconnectConf.Set(EthconnectConfigTopic, "topic1") + utConfig.AddKnownKey(FireFlyContractConfigKey+".0."+FireFlyContractAddress, "0x71C7656EC7ab88b098defB751B7401B5f6d8976F") + + err := e.Init(e.ctx, utConfig, e.metrics) + assert.NoError(t, err) + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x123", + }.String()) + + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.Regexp(t, "FF10111", err) +} + +func TestAddFireflySubscriptionGetVersionError(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + resetConf(e) + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", httpmock.NewJsonResponderOrPanic(200, []eventStream{})) httpmock.RegisterResponder("POST", "http://localhost:12345/eventstreams", @@ -3349,3 +3607,11 @@ func TestRemoveInvalidSubscription(t *testing.T) { err := e.RemoveFireflySubscription(e.ctx, "bad") assert.Regexp(t, "FF10412", err) } + +func TestCallbacksWrongNamespace(t *testing.T) { + e, _ := newTestEthereum() + nsOpID := "ns1:" + fftypes.NewUUID().String() + e.callbacks.BlockchainOpUpdate(context.Background(), e, nsOpID, core.OpStatusSucceeded, "tx123", "", nil) + e.callbacks.BatchPinComplete(context.Background(), &blockchain.BatchPin{Namespace: "ns1"}, nil) + e.callbacks.BlockchainNetworkAction(context.Background(), "ns1", "terminate", nil, nil, nil) +} diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index 31ed567bb2..d926f93740 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -59,39 +59,57 @@ type Fabric struct { closed chan struct{} metrics metrics.Manager fabconnectConf config.Section - subs map[string]string + subs map[string]subscriptionInfo +} + +type subscriptionInfo struct { + namespace string + channel string + version int } type callbacks struct { handlers map[string]blockchain.Callbacks } -func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { - for _, cb := range cb.handlers { - cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) +func (cb *callbacks) BlockchainOpUpdate(ctx context.Context, plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { + namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID) + if handler, ok := cb.handlers[namespace]; ok { + handler.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) + } else { + log.L(ctx).Errorf("No handler found for blockchain operation '%s'", nsOpID) } } -func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { - for _, cb := range cb.handlers { - if err := cb.BatchPinComplete(batch, signingKey); err != nil { - return err - } +func (cb *callbacks) BatchPinComplete(ctx context.Context, batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { + if handler, ok := cb.handlers[batch.Namespace]; ok { + return handler.BatchPinComplete(batch, signingKey) } + log.L(ctx).Errorf("No handler found for blockchain batch pin on namespace '%s'", batch.Namespace) return nil } -func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - for _, cb := range cb.handlers { - if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil { - return err +func (cb *callbacks) BlockchainNetworkAction(ctx context.Context, namespace, action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { + if namespace == "" { + // Older networks don't populate namespace, so deliver the event to every handler + for _, handler := range cb.handlers { + if err := handler.BlockchainNetworkAction(action, location, event, signingKey); err != nil { + return err + } } + } else { + if handler, ok := cb.handlers[namespace]; ok { + return handler.BlockchainNetworkAction(action, location, event, signingKey) + } + log.L(ctx).Errorf("No handler found for blockchain network action on namespace '%s'", namespace) } return nil } func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { for _, cb := range cb.handlers { + // Send the event to all handlers and let them match it to a contract listener + // TODO: can we push more listener/namespace knowledge down to this layer? if err := cb.BlockchainEvent(event); err != nil { return err } @@ -231,7 +249,7 @@ func (f *Fabric) Init(ctx context.Context, config config.Section, metrics metric return err } f.streamID = stream.ID - f.subs = make(map[string]string) + f.subs = make(map[string]subscriptionInfo) log.L(f.ctx).Infof("Event stream: %s", f.streamID) f.closed = make(chan struct{}) @@ -312,7 +330,7 @@ func (f *Fabric) parseBlockchainEvent(ctx context.Context, msgJSON fftypes.JSONO } } -func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { +func (f *Fabric) handleBatchPinEvent(ctx context.Context, location *fftypes.JSONAny, subInfo *subscriptionInfo, msgJSON fftypes.JSONObject) (err error) { event := f.parseBlockchainEvent(ctx, msgJSON) if event == nil { return nil // move on @@ -333,7 +351,24 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb // Check if this is actually an operator action if strings.HasPrefix(nsOrAction, blockchain.FireFlyActionPrefix) { action := nsOrAction[len(blockchain.FireFlyActionPrefix):] - return f.callbacks.BlockchainNetworkAction(action, event, verifier) + + // For V1 of the FireFly contract, action is sent to all namespaces + // For V2+, namespace is inferred from the subscription + var namespace string + if subInfo.version > 1 { + namespace = subInfo.namespace + } + + return f.callbacks.BlockchainNetworkAction(ctx, namespace, action, location, event, verifier) + } + + // For V1 of the FireFly contract, namespace is passed explicitly + // For V2+, namespace is inferred from the subscription + var namespace string + if subInfo.version == 1 { + namespace = nsOrAction + } else { + namespace = subInfo.namespace } hexUUIDs, err := hex.DecodeString(strings.TrimPrefix(sUUIDs, "0x")) @@ -365,7 +400,7 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb } batch := &blockchain.BatchPin{ - Namespace: nsOrAction, + Namespace: namespace, TransactionID: &txnID, BatchID: &batchID, BatchHash: &batchHash, @@ -375,7 +410,7 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb } // If there's an error dispatching the event, we must return the error and shutdown - return f.callbacks.BatchPinComplete(batch, verifier) + return f.callbacks.BatchPinComplete(ctx, batch, verifier) } func (f *Fabric) buildEventLocationString(chaincode string) string { @@ -410,7 +445,7 @@ func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) { updateType = core.OpStatusFailed } l.Infof("Fabconnect '%s' reply tx=%s (request=%s) %s", replyType, txHash, requestID, message) - f.callbacks.BlockchainOpUpdate(f, requestID, updateType, txHash, message, reply) + f.callbacks.BlockchainOpUpdate(ctx, f, requestID, updateType, txHash, message, reply) } func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace string, location *fftypes.JSONAny, firstEvent string) (string, error) { @@ -423,8 +458,17 @@ func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace string, l if err != nil { return "", err } - f.subs[sub.ID] = namespace + version, err := f.GetNetworkVersion(ctx, location) + if err != nil { + return "", err + } + + f.subs[sub.ID] = subscriptionInfo{ + namespace: namespace, + channel: fabricOnChainLocation.Channel, + version: version, + } return sub.ID, nil } @@ -459,10 +503,18 @@ func (f *Fabric) handleMessageBatch(ctx context.Context, messages []interface{}) l1.Tracef("Message: %+v", msgJSON) // Matches one of the active FireFly BatchPin subscriptions - if _, ok := f.subs[sub]; ok { + if subInfo, ok := f.subs[sub]; ok { + location, err := encodeContractLocation(ctx, &Location{ + Chaincode: msgJSON.GetString("chaincodeId"), + Channel: subInfo.channel, + }) + if err != nil { + return err + } + switch eventName { case broadcastBatchEventName: - if err := f.handleBatchPinEvent(ctx1, msgJSON); err != nil { + if err := f.handleBatchPinEvent(ctx1, location, &subInfo, msgJSON); err != nil { return err } default: @@ -760,11 +812,7 @@ func (f *Fabric) NormalizeContractLocation(ctx context.Context, location *fftype if err != nil { return nil, err } - normalized, err := json.Marshal(parsed) - if err == nil { - result = fftypes.JSONAnyPtrBytes(normalized) - } - return result, err + return encodeContractLocation(ctx, parsed) } func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Location, error) { @@ -781,6 +829,20 @@ func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Loc return &fabricLocation, nil } +func encodeContractLocation(ctx context.Context, location *Location) (result *fftypes.JSONAny, err error) { + if location.Channel == "" { + return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, "'channel' not set") + } + if location.Chaincode == "" { + return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, "'chaincode' not set") + } + normalized, err := json.Marshal(location) + if err == nil { + result = fftypes.JSONAnyPtrBytes(normalized) + } + return result, err +} + func (f *Fabric) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) error { location, err := parseContractLocation(ctx, listener.Location) if err != nil { @@ -844,13 +906,9 @@ func (f *Fabric) GetAndConvertDeprecatedContractConfig(ctx context.Context) (loc } fromBlock = string(core.SubOptsFirstEventOldest) - fabricOnChainLocation := &Location{ + location, err = encodeContractLocation(ctx, &Location{ Chaincode: chaincode, Channel: f.defaultChannel, - } - - normalized, _ := json.Marshal(fabricOnChainLocation) - location = fftypes.JSONAnyPtrBytes(normalized) - - return location, fromBlock, nil + }) + return location, fromBlock, err } diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 0d0d242eef..01bb34718c 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -220,6 +220,8 @@ func TestInitAllExistingStreams(t *testing.T) { httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin"}, })) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 1)) resetConf(e) utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") @@ -238,7 +240,7 @@ func TestInitAllExistingStreams(t *testing.T) { _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") assert.NoError(t, err) - assert.Equal(t, 2, httpmock.GetTotalCallCount()) + assert.Equal(t, 3, httpmock.GetTotalCallCount()) assert.Equal(t, "es12345", e.streamID) } @@ -273,6 +275,41 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { assert.Regexp(t, "pop", err) } +func TestAddFireflySubscriptionGetVersionError(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + resetConf(e) + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{ + {ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin"}, + })) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + httpmock.NewJsonResponderOrPanic(500, "pop")) + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + utFabconnectConf.Set(FabconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "channel": "firefly", + "chaincode": "simplestorage", + }.String()) + + err := e.Init(e.ctx, utConfig, &metricsmocks.Manager{}) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "newest") + assert.Regexp(t, "pop", err) +} + func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) { e, cancel := newTestFabric() defer cancel() @@ -287,6 +324,8 @@ func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) { httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, })) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 1)) resetConf(e) utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") @@ -305,7 +344,7 @@ func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) { subID, err := e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") assert.NoError(t, err) - assert.Equal(t, 2, httpmock.GetTotalCallCount()) + assert.Equal(t, 3, httpmock.GetTotalCallCount()) assert.Equal(t, "es12345", e.streamID) err = e.RemoveFireflySubscription(e.ctx, subID) @@ -736,8 +775,12 @@ func TestHandleMessageBatchPinOK(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeMSPIdentity, @@ -767,6 +810,110 @@ func TestHandleMessageBatchPinOK(t *testing.T) { } +func TestHandleMessageBatchPinV2(t *testing.T) { + payload := base64.StdEncoding.EncodeToString([]byte(fftypes.JSONObject{ + "signer": "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", + "timestamp": fftypes.JSONObject{"seconds": 1630031667, "nanos": 791499000}, + "namespace": "", + "uuids": "0xe19af8b390604051812d7597d19adfb9847d3bfd074249efb65d3fed15f5b0a6", + "batchHash": "0xd71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", + "payloadRef": "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", + "contexts": []string{"0x68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", "0x19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771"}, + }.String())) + data := []byte(` +[ + { + "chaincodeId": "firefly", + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "transactionIndex": 2, + "eventIndex": 50, + "eventName": "BatchPin", + "payload": "` + payload + `", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Fabric{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 2, + } + + expectedSigningKeyRef := &core.VerifierRef{ + Type: core.VerifierTypeMSPIdentity, + Value: "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", + } + + em.On("BatchPinComplete", mock.Anything, expectedSigningKeyRef).Return(nil) + + var events []interface{} + err := json.Unmarshal(data, &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + + b := em.Calls[0].Arguments[0].(*blockchain.BatchPin) + assert.Equal(t, "ns1", b.Namespace) + assert.Equal(t, "e19af8b3-9060-4051-812d-7597d19adfb9", b.TransactionID.String()) + assert.Equal(t, "847d3bfd-0742-49ef-b65d-3fed15f5b0a6", b.BatchID.String()) + assert.Equal(t, "d71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", b.BatchHash.String()) + assert.Equal(t, "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", b.BatchPayloadRef) + assert.Len(t, b.Contexts, 2) + assert.Equal(t, "68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", b.Contexts[0].String()) + assert.Equal(t, "19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771", b.Contexts[1].String()) + + em.AssertExpectations(t) + +} + +func TestHandleMessageBatchPinMissingChaincodeID(t *testing.T) { + payload := base64.StdEncoding.EncodeToString([]byte(fftypes.JSONObject{ + "signer": "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", + "timestamp": fftypes.JSONObject{"seconds": 1630031667, "nanos": 791499000}, + "namespace": "ns1", + "uuids": "0xe19af8b390604051812d7597d19adfb9847d3bfd074249efb65d3fed15f5b0a6", + "batchHash": "0xd71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", + "payloadRef": "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", + "contexts": []string{"0x68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", "0x19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771"}, + }.String())) + data := []byte(` +[ + { + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "transactionIndex": 2, + "eventIndex": 50, + "eventName": "BatchPin", + "payload": "` + payload + `", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Fabric{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + + var events []interface{} + err := json.Unmarshal(data, &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.Regexp(t, "FF10310", err) + +} + func TestHandleMessageEmptyPayloadRef(t *testing.T) { data := []byte(` [ @@ -784,8 +931,12 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeMSPIdentity, @@ -832,8 +983,12 @@ func TestHandleMessageBatchPinExit(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeMSPIdentity, @@ -866,8 +1021,12 @@ func TestHandleMessageBatchPinEmpty(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } var events []interface{} err := json.Unmarshal(data, &events) @@ -893,8 +1052,12 @@ func TestHandleMessageUnknownEventName(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } var events []interface{} err := json.Unmarshal(data, &events) @@ -909,8 +1072,12 @@ func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } data := []byte(`[{ "chaincodeId": "firefly", @@ -933,8 +1100,13 @@ func TestHandleMessageBatchPinBadPin(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + data := []byte(`[{ "chaincodeId": "firefly", "blockNumber": 91, @@ -956,8 +1128,12 @@ func TestHandleMessageBatchPinBadPayloadEncoding(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } data := []byte(`[{ "chaincodeId": "firefly", @@ -980,8 +1156,13 @@ func TestHandleMessageBatchPinBadPayloadUUIDs(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + data := []byte(`[{ "chaincodeId": "firefly", "blockNumber": 91, @@ -1349,8 +1530,13 @@ func TestHandleMessageContractEvent(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { assert.Equal(t, "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", e.BlockchainTXID) assert.Equal(t, "000000000010/000020/000030", e.Event.ProtocolID) @@ -1408,8 +1594,12 @@ func TestHandleMessageContractEventNoPayload(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } var events []interface{} err := json.Unmarshal(data, &events) @@ -1435,8 +1625,12 @@ func TestHandleMessageContractEventBadPayload(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-cb37cc07-e873-4f58-44ab-55add6bba320"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-cb37cc07-e873-4f58-44ab-55add6bba320"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } var events []interface{} err := json.Unmarshal(data, &events) @@ -1464,8 +1658,12 @@ func TestHandleMessageContractEventError(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop")) @@ -1840,15 +2038,63 @@ func TestHandleNetworkAction(t *testing.T) { callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + + expectedSigningKeyRef := &core.VerifierRef{ + Type: core.VerifierTypeMSPIdentity, + Value: "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", + } + + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(nil) + + var events []interface{} + err := json.Unmarshal(data, &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + + em.AssertExpectations(t) + +} + +func TestHandleNetworkActionV2(t *testing.T) { + data := []byte(` +[ + { + "chaincodeId": "firefly", + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "transactionIndex": 2, + "eventIndex": 50, + "eventName": "BatchPin", + "payload": "eyJzaWduZXIiOiJ1MHZnd3U5czAwLXg1MDk6OkNOPXVzZXIyLE9VPWNsaWVudDo6Q049ZmFicmljLWNhLXNlcnZlciIsInRpbWVzdGFtcCI6eyJzZWNvbmRzIjoxNjMwMDMxNjY3LCJuYW5vcyI6NzkxNDk5MDAwfSwibmFtZXNwYWNlIjoiZmlyZWZseTp0ZXJtaW5hdGUiLCJ1dWlkcyI6IjB4MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMCIsImJhdGNoSGFzaCI6IjB4MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMCIsInBheWxvYWRSZWYiOiIiLCJjb250ZXh0cyI6W119", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Fabric{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 2, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeMSPIdentity, Value: "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", } - em.On("BlockchainNetworkAction", "terminate", mock.Anything, expectedSigningKeyRef).Return(nil) + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(nil) var events []interface{} err := json.Unmarshal(data, &events) @@ -1879,15 +2125,19 @@ func TestHandleNetworkActionFail(t *testing.T) { e := &Fabric{ callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeMSPIdentity, Value: "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", } - em.On("BlockchainNetworkAction", "terminate", mock.Anything, expectedSigningKeyRef).Return(fmt.Errorf("pop")) + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(fmt.Errorf("pop")) var events []interface{} err := json.Unmarshal(data, &events) @@ -2019,6 +2269,18 @@ func TestConvertDeprecatedContractConfigNoChaincode(t *testing.T) { assert.Regexp(t, "F10138", err) } +func TestConvertDeprecatedContractConfigNoChannel(t *testing.T) { + e, _ := newTestFabric() + resetConf(e) + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "Firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + + e.defaultChannel = "" + _, _, err := e.GetAndConvertDeprecatedContractConfig(e.ctx) + assert.Regexp(t, "FF10310", err) +} + func TestSubmitNetworkAction(t *testing.T) { e, cancel := newTestFabric() @@ -2074,3 +2336,11 @@ func TestSubmitNetworkActionBadLocation(t *testing.T) { err := e.SubmitNetworkAction(context.Background(), "", signer, core.NetworkActionTerminate, location) assert.Regexp(t, "FF10310", err) } + +func TestCallbacksWrongNamespace(t *testing.T) { + e, _ := newTestFabric() + nsOpID := "ns1:" + fftypes.NewUUID().String() + e.callbacks.BlockchainOpUpdate(context.Background(), e, nsOpID, core.OpStatusSucceeded, "tx123", "", nil) + e.callbacks.BatchPinComplete(context.Background(), &blockchain.BatchPin{Namespace: "ns1"}, nil) + e.callbacks.BlockchainNetworkAction(context.Background(), "ns1", "terminate", nil, nil, nil) +} diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 56a7c40012..48fd0b3557 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -67,7 +67,7 @@ type EventManager interface { // Bound blockchain callbacks BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error BlockchainEvent(event *blockchain.EventWithSubscription) error - BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error + BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error // Bound dataexchange callbacks DXEvent(dx dataexchange.Plugin, event dataexchange.DXEvent) diff --git a/internal/events/network_action.go b/internal/events/network_action.go index 8337a48c8c..756bea86ec 100644 --- a/internal/events/network_action.go +++ b/internal/events/network_action.go @@ -19,18 +19,18 @@ package events import ( "context" + "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/log" - "github.com/hyperledger/firefly/internal/multiparty" "github.com/hyperledger/firefly/pkg/blockchain" "github.com/hyperledger/firefly/pkg/core" ) -func (em *eventManager) actionTerminate(mm multiparty.Manager, event *blockchain.Event) error { +func (em *eventManager) actionTerminate(location *fftypes.JSONAny, event *blockchain.Event) error { namespace, err := em.database.GetNamespace(em.ctx, em.namespace) if err != nil { return err } - if err := mm.TerminateContract(em.ctx, &namespace.Contracts, event); err != nil { + if err := em.multiparty.TerminateContract(em.ctx, &namespace.Contracts, location, event); err != nil { return err } // Currently, a termination event is implied to apply to ALL namespaces @@ -42,7 +42,7 @@ func (em *eventManager) actionTerminate(mm multiparty.Manager, event *blockchain }) } -func (em *eventManager) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { +func (em *eventManager) BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { return em.retry.Do(em.ctx, "handle network action", func(attempt int) (retry bool, err error) { // Verify that the action came from a registered root org resolvedAuthor, err := em.identity.FindIdentityForVerifier(em.ctx, []core.IdentityType{core.IdentityTypeOrg}, signingKey) @@ -59,7 +59,7 @@ func (em *eventManager) BlockchainNetworkAction(action string, event *blockchain } if action == core.NetworkActionTerminate.String() { - err = em.actionTerminate(em.multiparty, event) + err = em.actionTerminate(location, event) } else { log.L(em.ctx).Errorf("Ignoring unrecognized network action: %s", action) return false, nil diff --git a/internal/events/network_action_test.go b/internal/events/network_action_test.go index b9b3fbab45..079a192b04 100644 --- a/internal/events/network_action_test.go +++ b/internal/events/network_action_test.go @@ -35,6 +35,7 @@ func TestNetworkAction(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() + location := fftypes.JSONAnyPtr("{}") event := &blockchain.Event{ProtocolID: "0001"} verifier := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, @@ -54,9 +55,9 @@ func TestNetworkAction(t *testing.T) { mdi.On("InsertEvent", em.ctx, mock.Anything).Return(nil) mdi.On("GetNamespace", em.ctx, "ns1").Return(&core.Namespace{}, nil) mdi.On("UpsertNamespace", em.ctx, mock.AnythingOfType("*core.Namespace"), true).Return(nil) - mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), mock.AnythingOfType("*blockchain.Event")).Return(nil) + mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), location, mock.AnythingOfType("*blockchain.Event")).Return(nil) - err := em.BlockchainNetworkAction("terminate", event, verifier) + err := em.BlockchainNetworkAction("terminate", location, event, verifier) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -68,6 +69,7 @@ func TestNetworkActionUnknownIdentity(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() + location := fftypes.JSONAnyPtr("{}") verifier := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x1234", @@ -78,7 +80,7 @@ func TestNetworkActionUnknownIdentity(t *testing.T) { mii.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeOrg}, verifier).Return(nil, fmt.Errorf("pop")).Once() mii.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeOrg}, verifier).Return(nil, nil).Once() - err := em.BlockchainNetworkAction("terminate", &blockchain.Event{}, verifier) + err := em.BlockchainNetworkAction("terminate", location, &blockchain.Event{}, verifier) assert.NoError(t, err) mii.AssertExpectations(t) @@ -88,6 +90,7 @@ func TestNetworkActionNonRootIdentity(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() + location := fftypes.JSONAnyPtr("{}") verifier := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x1234", @@ -101,7 +104,7 @@ func TestNetworkActionNonRootIdentity(t *testing.T) { }, }, nil) - err := em.BlockchainNetworkAction("terminate", &blockchain.Event{}, verifier) + err := em.BlockchainNetworkAction("terminate", location, &blockchain.Event{}, verifier) assert.NoError(t, err) mii.AssertExpectations(t) @@ -111,6 +114,7 @@ func TestNetworkActionUnknown(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() + location := fftypes.JSONAnyPtr("{}") verifier := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x1234", @@ -120,7 +124,7 @@ func TestNetworkActionUnknown(t *testing.T) { mii.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeOrg}, verifier).Return(&core.Identity{}, nil) - err := em.BlockchainNetworkAction("bad", &blockchain.Event{}, verifier) + err := em.BlockchainNetworkAction("bad", location, &blockchain.Event{}, verifier) assert.NoError(t, err) mii.AssertExpectations(t) @@ -130,15 +134,15 @@ func TestActionTerminateQueryFail(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() - mmp := &multipartymocks.Manager{} + location := fftypes.JSONAnyPtr("{}") + mdi := em.database.(*databasemocks.Plugin) mdi.On("GetNamespace", em.ctx, "ns1").Return(nil, fmt.Errorf("pop")) - err := em.actionTerminate(mmp, &blockchain.Event{}) + err := em.actionTerminate(location, &blockchain.Event{}) assert.EqualError(t, err, "pop") - mmp.AssertExpectations(t) mdi.AssertExpectations(t) } @@ -146,13 +150,15 @@ func TestActionTerminateFail(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() - mmp := &multipartymocks.Manager{} + location := fftypes.JSONAnyPtr("{}") + + mmp := em.multiparty.(*multipartymocks.Manager) mdi := em.database.(*databasemocks.Plugin) mdi.On("GetNamespace", em.ctx, "ns1").Return(&core.Namespace{}, nil) - mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), mock.AnythingOfType("*blockchain.Event")).Return(fmt.Errorf("pop")) + mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), location, mock.AnythingOfType("*blockchain.Event")).Return(fmt.Errorf("pop")) - err := em.actionTerminate(mmp, &blockchain.Event{}) + err := em.actionTerminate(location, &blockchain.Event{}) assert.EqualError(t, err, "pop") mmp.AssertExpectations(t) @@ -163,14 +169,16 @@ func TestActionTerminateUpsertFail(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() - mmp := &multipartymocks.Manager{} + location := fftypes.JSONAnyPtr("{}") + + mmp := em.multiparty.(*multipartymocks.Manager) mdi := em.database.(*databasemocks.Plugin) mdi.On("GetNamespace", em.ctx, "ns1").Return(&core.Namespace{}, nil) mdi.On("UpsertNamespace", em.ctx, mock.AnythingOfType("*core.Namespace"), true).Return(fmt.Errorf("pop")) - mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), mock.AnythingOfType("*blockchain.Event")).Return(nil) + mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), location, mock.AnythingOfType("*blockchain.Event")).Return(nil) - err := em.actionTerminate(mmp, &blockchain.Event{}) + err := em.actionTerminate(location, &blockchain.Event{}) assert.EqualError(t, err, "pop") mmp.AssertExpectations(t) diff --git a/internal/multiparty/manager.go b/internal/multiparty/manager.go index c05200a558..b0ad7c8ade 100644 --- a/internal/multiparty/manager.go +++ b/internal/multiparty/manager.go @@ -47,7 +47,7 @@ type Manager interface { // - Validates that the event came from the currently active FireFly contract // - Re-initializes the plugin against the next configured FireFly contract // - Updates the provided contract info to record the point of termination and the newly active contract - TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, termination *blockchain.Event) (err error) + TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, location *fftypes.JSONAny, termination *blockchain.Event) (err error) // GetNetworkVersion returns the network version of the active FireFly contract GetNetworkVersion() int @@ -170,9 +170,13 @@ func (mm *multipartyManager) resolveFireFlyContract(ctx context.Context, contrac return location, firstEvent, err } -func (mm *multipartyManager) TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, termination *blockchain.Event) (err error) { +func (mm *multipartyManager) TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, location *fftypes.JSONAny, termination *blockchain.Event) (err error) { // TODO: Investigate if it better to consolidate DB termination here - log.L(ctx).Infof("Processing termination of contract at index %d", contracts.Active.Index) + if contracts.Active.Location.String() != location.String() { + log.L(ctx).Warnf("Ignoring termination event from contract at '%s', which does not match active '%s'", location, contracts.Active.Location) + return nil + } + log.L(ctx).Infof("Processing termination of contract #%d at '%s'", contracts.Active.Index, contracts.Active.Location) contracts.Active.FinalEvent = termination.ProtocolID contracts.Terminated = append(contracts.Terminated, contracts.Active) contracts.Active = core.FireFlyContractInfo{Index: contracts.Active.Index + 1} @@ -188,12 +192,7 @@ func (mm *multipartyManager) GetNetworkVersion() int { } func (mm *multipartyManager) SubmitNetworkAction(ctx context.Context, signingKey string, action *core.NetworkAction) error { - if action.Type == core.NetworkActionTerminate { - if mm.namespace != core.LegacySystemNamespace { - // For now, "terminate" only works on ff_system - return i18n.NewError(ctx, coremsgs.MsgTerminateNotSupported, mm.namespace) - } - } else { + if action.Type != core.NetworkActionTerminate { return i18n.NewError(ctx, coremsgs.MsgUnrecognizedNetworkAction, action.Type) } diff --git a/internal/multiparty/manager_test.go b/internal/multiparty/manager_test.go index db23501775..c1c417108a 100644 --- a/internal/multiparty/manager_test.go +++ b/internal/multiparty/manager_test.go @@ -401,34 +401,6 @@ func TestSubmitNetworkActionBadType(t *testing.T) { mp.mbi.AssertExpectations(t) } -func TestSubmitNetworkActionBadNS(t *testing.T) { - contracts := make([]Contract, 1) - location := fftypes.JSONAnyPtr(fftypes.JSONObject{ - "address": "0x123", - }.String()) - contract := Contract{ - FirstEvent: "0", - Location: location, - } - - contracts[0] = contract - mp := newTestMultipartyManager() - mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) - mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) - mp.multipartyManager.config.Contracts = contracts - - cf := &core.FireFlyContracts{ - Active: core.FireFlyContractInfo{Index: 0}, - } - - err := mp.ConfigureContract(context.Background(), cf) - assert.NoError(t, err) - err = mp.SubmitNetworkAction(context.Background(), "0x123", &core.NetworkAction{Type: core.NetworkActionTerminate}) - assert.Regexp(t, "FF10399", err) - - mp.mbi.AssertExpectations(t) -} - func TestSubmitBatchPinOk(t *testing.T) { mp := newTestMultipartyManager() defer mp.cleanup(t) @@ -584,7 +556,7 @@ func TestConfgureAndTerminateContract(t *testing.T) { err := mp.ConfigureContract(context.Background(), cf) assert.NoError(t, err) - err = mp.TerminateContract(context.Background(), cf, &blockchain.Event{}) + err = mp.TerminateContract(context.Background(), cf, location, &blockchain.Event{}) assert.NoError(t, err) } @@ -593,10 +565,29 @@ func TestTerminateContractError(t *testing.T) { mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("RemoveFireflySubscription", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x123", + }.String()) cf := &core.FireFlyContracts{ - Active: core.FireFlyContractInfo{Index: 0}, + Active: core.FireFlyContractInfo{Index: 0, Location: location}, } - err := mp.TerminateContract(context.Background(), cf, &blockchain.Event{}) + err := mp.TerminateContract(context.Background(), cf, location, &blockchain.Event{}) assert.Regexp(t, "pop", err) } + +func TestTerminateContractWrongAddress(t *testing.T) { + mp := newTestMultipartyManager() + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) + mp.mbi.On("RemoveFireflySubscription", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x123", + }.String()) + cf := &core.FireFlyContracts{ + Active: core.FireFlyContractInfo{Index: 0, Location: location}, + } + + err := mp.TerminateContract(context.Background(), cf, fftypes.JSONAnyPtr("{}"), &blockchain.Event{}) + assert.NoError(t, err) +} diff --git a/internal/namespace/manager_test.go b/internal/namespace/manager_test.go index 9ce16f3598..87907ae6ad 100644 --- a/internal/namespace/manager_test.go +++ b/internal/namespace/manager_test.go @@ -261,7 +261,7 @@ func TestInitOrchestratorFail(t *testing.T) { nm.mdi.On("SetHandler", "default", mock.Anything).Return() nm.mdi.On("GetIdentities", mock.Anything, "default", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) - nm.mbi.On("SetHandler", mock.Anything).Return() + nm.mbi.On("SetHandler", "default", mock.Anything).Return() nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("SetHandler", "default", mock.Anything).Return() diff --git a/internal/orchestrator/bound_callbacks.go b/internal/orchestrator/bound_callbacks.go index 9acaa17014..ffa7916c77 100644 --- a/internal/orchestrator/bound_callbacks.go +++ b/internal/orchestrator/bound_callbacks.go @@ -60,8 +60,8 @@ func (bc *boundCallbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKe return bc.ei.BatchPinComplete(batch, signingKey) } -func (bc *boundCallbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - return bc.ei.BlockchainNetworkAction(action, event, signingKey) +func (bc *boundCallbacks) BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { + return bc.ei.BlockchainNetworkAction(action, location, event, signingKey) } func (bc *boundCallbacks) DXEvent(event dataexchange.DXEvent) { diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index 187027a676..0d52a6bf4c 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -52,6 +52,7 @@ func TestBoundCallbacks(t *testing.T) { pool := &tokens.TokenPool{} transfer := &tokens.TokenTransfer{} approval := &tokens.TokenApproval{} + location := fftypes.JSONAnyPtr("{}") event := &blockchain.Event{} hash := fftypes.NewRandB32() opID := fftypes.NewUUID() @@ -60,8 +61,8 @@ func TestBoundCallbacks(t *testing.T) { err := bc.BatchPinComplete(batch, &core.VerifierRef{Value: "0x12345", Type: core.VerifierTypeEthAddress}) assert.EqualError(t, err, "pop") - mei.On("BlockchainNetworkAction", "terminate", event, &core.VerifierRef{Value: "0x12345", Type: core.VerifierTypeEthAddress}).Return(fmt.Errorf("pop")) - err = bc.BlockchainNetworkAction("terminate", event, &core.VerifierRef{Value: "0x12345", Type: core.VerifierTypeEthAddress}) + mei.On("BlockchainNetworkAction", "terminate", location, event, &core.VerifierRef{Value: "0x12345", Type: core.VerifierTypeEthAddress}).Return(fmt.Errorf("pop")) + err = bc.BlockchainNetworkAction("terminate", location, event, &core.VerifierRef{Value: "0x12345", Type: core.VerifierTypeEthAddress}) assert.EqualError(t, err, "pop") nsOpID := "ns1:" + opID.String() diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index ec423492ee..72c6f0260d 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -46,23 +46,26 @@ type callbacks struct { handlers map[string]tokens.Callbacks } -func (cb *callbacks) TokenOpUpdate(plugin tokens.Plugin, nsOpID string, txState core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { - for _, cb := range cb.handlers { - cb.TokenOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) +func (cb *callbacks) TokenOpUpdate(ctx context.Context, plugin tokens.Plugin, nsOpID string, txState core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { + namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID) + if handler, ok := cb.handlers[namespace]; ok { + handler.TokenOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) + } else { + log.L(ctx).Errorf("No handler found for token operation '%s'", nsOpID) } } func (cb *callbacks) TokenPoolCreated(namespace string, plugin tokens.Plugin, pool *tokens.TokenPool) error { if namespace == "" { - // Older token subscriptions don't populate namespace, so deliver the event to every listener + // Older token subscriptions don't populate namespace, so deliver the event to every handler for _, cb := range cb.handlers { if err := cb.TokenPoolCreated(plugin, pool); err != nil { return err } } } else { - if listener, ok := cb.handlers[namespace]; ok { - return listener.TokenPoolCreated(plugin, pool) + if handler, ok := cb.handlers[namespace]; ok { + return handler.TokenPoolCreated(plugin, pool) } } return nil @@ -70,15 +73,15 @@ func (cb *callbacks) TokenPoolCreated(namespace string, plugin tokens.Plugin, po func (cb *callbacks) TokensTransferred(namespace string, plugin tokens.Plugin, transfer *tokens.TokenTransfer) error { if namespace == "" { - // Older token subscriptions don't populate namespace, so deliver the event to every listener + // Older token subscriptions don't populate namespace, so deliver the event to every handler for _, cb := range cb.handlers { if err := cb.TokensTransferred(plugin, transfer); err != nil { return err } } } else { - if listener, ok := cb.handlers[namespace]; ok { - return listener.TokensTransferred(plugin, transfer) + if handler, ok := cb.handlers[namespace]; ok { + return handler.TokensTransferred(plugin, transfer) } } return nil @@ -86,15 +89,15 @@ func (cb *callbacks) TokensTransferred(namespace string, plugin tokens.Plugin, t func (cb *callbacks) TokensApproved(namespace string, plugin tokens.Plugin, approval *tokens.TokenApproval) error { if namespace == "" { - // Older token subscriptions don't populate namespace, so deliver the event to every listener + // Older token subscriptions don't populate namespace, so deliver the event to every handler for _, cb := range cb.handlers { if err := cb.TokensApproved(plugin, approval); err != nil { return err } } } else { - if listener, ok := cb.handlers[namespace]; ok { - return listener.TokensApproved(plugin, approval) + if handler, ok := cb.handlers[namespace]; ok { + return handler.TokensApproved(plugin, approval) } } return nil @@ -260,7 +263,7 @@ func (ft *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject) replyType = core.OpStatusFailed } l.Infof("Tokens '%s' reply: request=%s message=%s", replyType, requestID, message) - ft.callbacks.TokenOpUpdate(ft, requestID, replyType, transactionHash, message, data) + ft.callbacks.TokenOpUpdate(ctx, ft, requestID, replyType, transactionHash, message, data) } func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSONObject) (err error) { diff --git a/mocks/blockchainmocks/callbacks.go b/mocks/blockchainmocks/callbacks.go index 3230242117..75d48a403d 100644 --- a/mocks/blockchainmocks/callbacks.go +++ b/mocks/blockchainmocks/callbacks.go @@ -44,13 +44,13 @@ func (_m *Callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) er return r0 } -// BlockchainNetworkAction provides a mock function with given fields: action, event, signingKey -func (_m *Callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - ret := _m.Called(action, event, signingKey) +// BlockchainNetworkAction provides a mock function with given fields: action, location, event, signingKey +func (_m *Callbacks) BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { + ret := _m.Called(action, location, event, signingKey) var r0 error - if rf, ok := ret.Get(0).(func(string, *blockchain.Event, *core.VerifierRef) error); ok { - r0 = rf(action, event, signingKey) + if rf, ok := ret.Get(0).(func(string, *fftypes.JSONAny, *blockchain.Event, *core.VerifierRef) error); ok { + r0 = rf(action, location, event, signingKey) } else { r0 = ret.Error(0) } diff --git a/mocks/eventmocks/event_manager.go b/mocks/eventmocks/event_manager.go index da4e2f52bb..05706a61e7 100644 --- a/mocks/eventmocks/event_manager.go +++ b/mocks/eventmocks/event_manager.go @@ -69,13 +69,13 @@ func (_m *EventManager) BlockchainEvent(event *blockchain.EventWithSubscription) return r0 } -// BlockchainNetworkAction provides a mock function with given fields: action, event, signingKey -func (_m *EventManager) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - ret := _m.Called(action, event, signingKey) +// BlockchainNetworkAction provides a mock function with given fields: action, location, event, signingKey +func (_m *EventManager) BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { + ret := _m.Called(action, location, event, signingKey) var r0 error - if rf, ok := ret.Get(0).(func(string, *blockchain.Event, *core.VerifierRef) error); ok { - r0 = rf(action, event, signingKey) + if rf, ok := ret.Get(0).(func(string, *fftypes.JSONAny, *blockchain.Event, *core.VerifierRef) error); ok { + r0 = rf(action, location, event, signingKey) } else { r0 = ret.Error(0) } diff --git a/mocks/multipartymocks/manager.go b/mocks/multipartymocks/manager.go index 23ba3ac3ee..91ecd050a6 100644 --- a/mocks/multipartymocks/manager.go +++ b/mocks/multipartymocks/manager.go @@ -158,13 +158,13 @@ func (_m *Manager) SubmitNetworkAction(ctx context.Context, signingKey string, a return r0 } -// TerminateContract provides a mock function with given fields: ctx, contracts, termination -func (_m *Manager) TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, termination *blockchain.Event) error { - ret := _m.Called(ctx, contracts, termination) +// TerminateContract provides a mock function with given fields: ctx, contracts, location, termination +func (_m *Manager) TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, location *fftypes.JSONAny, termination *blockchain.Event) error { + ret := _m.Called(ctx, contracts, location, termination) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *core.FireFlyContracts, *blockchain.Event) error); ok { - r0 = rf(ctx, contracts, termination) + if rf, ok := ret.Get(0).(func(context.Context, *core.FireFlyContracts, *fftypes.JSONAny, *blockchain.Event) error); ok { + r0 = rf(ctx, contracts, location, termination) } else { r0 = ret.Error(0) } diff --git a/pkg/blockchain/plugin.go b/pkg/blockchain/plugin.go index 42e129a146..14c8680b1a 100644 --- a/pkg/blockchain/plugin.go +++ b/pkg/blockchain/plugin.go @@ -120,7 +120,7 @@ type Callbacks interface { // BlockchainNetworkAction notifies on the arrival of a network operator action // // Error should only be returned in shutdown scenarios - BlockchainNetworkAction(action string, event *Event, signingKey *core.VerifierRef) error + BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *Event, signingKey *core.VerifierRef) error // BlockchainEvent notifies on the arrival of any event from a user-created subscription. BlockchainEvent(event *EventWithSubscription) error From 91a7c8c07bc4cc5cfe911c8ae338717ce564e576 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Wed, 29 Jun 2022 15:15:07 -0400 Subject: [PATCH 3/5] Only send namespace on V1 batch pin actions Signed-off-by: Andrew Richardson --- internal/multiparty/operations.go | 9 +++++++- internal/multiparty/operations_test.go | 32 ++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/internal/multiparty/operations.go b/internal/multiparty/operations.go index 5c569225ef..e80bd0ecca 100644 --- a/internal/multiparty/operations.go +++ b/internal/multiparty/operations.go @@ -108,8 +108,15 @@ func (mm *multipartyManager) RunOperation(ctx context.Context, op *core.Prepared switch data := op.Data.(type) { case batchPinData: batch := data.Batch + + // Only include namespace for V1 networks + var namespace string + if mm.activeContract.networkVersion == 1 { + namespace = batch.Namespace + } + return nil, false, mm.blockchain.SubmitBatchPin(ctx, op.NamespacedIDString(), batch.Key, &blockchain.BatchPin{ - Namespace: batch.Namespace, + Namespace: namespace, TransactionID: batch.TX.ID, BatchID: batch.ID, BatchHash: batch.Hash, diff --git a/internal/multiparty/operations_test.go b/internal/multiparty/operations_test.go index b2a5393856..406bec72ee 100644 --- a/internal/multiparty/operations_test.go +++ b/internal/multiparty/operations_test.go @@ -175,6 +175,38 @@ func TestPrepareOperationBatchPinNotFound(t *testing.T) { assert.Regexp(t, "FF10109", err) } +func TestRunBatchPinV1(t *testing.T) { + mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.activeContract.networkVersion = 1 + + op := &core.Operation{ + Type: core.OpTypeBlockchainPinBatch, + ID: fftypes.NewUUID(), + Namespace: "ns1", + } + batch := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: fftypes.NewUUID(), + SignerRef: core.SignerRef{ + Key: "0x123", + }, + }, + } + contexts := []*fftypes.Bytes32{ + fftypes.NewRandB32(), + fftypes.NewRandB32(), + } + addBatchPinInputs(op, batch.ID, contexts, "payload1") + + mp.mbi.On("SubmitBatchPin", context.Background(), "ns1:"+op.ID.String(), "0x123", mock.Anything, mock.Anything).Return(nil) + + _, complete, err := mp.RunOperation(context.Background(), opBatchPin(op, batch, contexts, "payload1")) + + assert.False(t, complete) + assert.NoError(t, err) +} + func TestOperationUpdate(t *testing.T) { mp := newTestMultipartyManager() defer mp.cleanup(t) From 5d31178c72cd0749df64b7639f40889fdadf7501 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Wed, 29 Jun 2022 15:23:41 -0400 Subject: [PATCH 4/5] Add error logs for unhandled token events Signed-off-by: Andrew Richardson --- internal/tokens/fftokens/fftokens.go | 15 +++++++++------ internal/tokens/fftokens/fftokens_test.go | 10 ++++++++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 72c6f0260d..81b3ebc5b0 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -55,7 +55,7 @@ func (cb *callbacks) TokenOpUpdate(ctx context.Context, plugin tokens.Plugin, ns } } -func (cb *callbacks) TokenPoolCreated(namespace string, plugin tokens.Plugin, pool *tokens.TokenPool) error { +func (cb *callbacks) TokenPoolCreated(ctx context.Context, namespace string, plugin tokens.Plugin, pool *tokens.TokenPool) error { if namespace == "" { // Older token subscriptions don't populate namespace, so deliver the event to every handler for _, cb := range cb.handlers { @@ -67,11 +67,12 @@ func (cb *callbacks) TokenPoolCreated(namespace string, plugin tokens.Plugin, po if handler, ok := cb.handlers[namespace]; ok { return handler.TokenPoolCreated(plugin, pool) } + log.L(ctx).Errorf("No handler found for token pool event on namespace '%s'", namespace) } return nil } -func (cb *callbacks) TokensTransferred(namespace string, plugin tokens.Plugin, transfer *tokens.TokenTransfer) error { +func (cb *callbacks) TokensTransferred(ctx context.Context, namespace string, plugin tokens.Plugin, transfer *tokens.TokenTransfer) error { if namespace == "" { // Older token subscriptions don't populate namespace, so deliver the event to every handler for _, cb := range cb.handlers { @@ -83,11 +84,12 @@ func (cb *callbacks) TokensTransferred(namespace string, plugin tokens.Plugin, t if handler, ok := cb.handlers[namespace]; ok { return handler.TokensTransferred(plugin, transfer) } + log.L(ctx).Errorf("No handler found for token transfer event on namespace '%s'", namespace) } return nil } -func (cb *callbacks) TokensApproved(namespace string, plugin tokens.Plugin, approval *tokens.TokenApproval) error { +func (cb *callbacks) TokensApproved(ctx context.Context, namespace string, plugin tokens.Plugin, approval *tokens.TokenApproval) error { if namespace == "" { // Older token subscriptions don't populate namespace, so deliver the event to every handler for _, cb := range cb.handlers { @@ -99,6 +101,7 @@ func (cb *callbacks) TokensApproved(namespace string, plugin tokens.Plugin, appr if handler, ok := cb.handlers[namespace]; ok { return handler.TokensApproved(plugin, approval) } + log.L(ctx).Errorf("No handler found for token approval event on namespace '%s'", namespace) } return nil } @@ -336,7 +339,7 @@ func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSON } // If there's an error dispatching the event, we must return the error and shutdown - return ft.callbacks.TokenPoolCreated(namespace, ft, pool) + return ft.callbacks.TokenPoolCreated(ctx, namespace, ft, pool) } func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t core.TokenTransferType, data fftypes.JSONObject) (err error) { @@ -425,7 +428,7 @@ func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t core.TokenTransfe } // If there's an error dispatching the event, we must return the error and shutdown - return ft.callbacks.TokensTransferred(namespace, ft, transfer) + return ft.callbacks.TokensTransferred(ctx, namespace, ft, transfer) } func (ft *FFTokens) handleTokenApproval(ctx context.Context, data fftypes.JSONObject) (err error) { @@ -500,7 +503,7 @@ func (ft *FFTokens) handleTokenApproval(ctx context.Context, data fftypes.JSONOb }, } - return ft.callbacks.TokensApproved(namespace, ft, approval) + return ft.callbacks.TokensApproved(ctx, namespace, ft, approval) } func (ft *FFTokens) handleMessage(ctx context.Context, msgBytes []byte) (err error) { diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index 4081672df4..5f06b3cfe7 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -1291,3 +1291,13 @@ func TestEventLoopClosedContext(t *testing.T) { wsm.On("Receive").Return((<-chan []byte)(r)) h.eventLoop() // we're simply looking for it exiting } + +func TestCallbacksWrongNamespace(t *testing.T) { + h, _, _, _, done := newTestFFTokens(t) + defer done() + nsOpID := "ns1:" + fftypes.NewUUID().String() + h.callbacks.TokenOpUpdate(context.Background(), h, nsOpID, core.OpStatusSucceeded, "tx123", "", nil) + h.callbacks.TokenPoolCreated(context.Background(), "ns1", h, nil) + h.callbacks.TokensTransferred(context.Background(), "ns1", h, nil) + h.callbacks.TokensApproved(context.Background(), "ns1", h, nil) +} From 6b89dd0e158048c91014ba9d0ba07bca290e39b6 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Sun, 3 Jul 2022 13:18:53 -0400 Subject: [PATCH 5/5] Clean up blockchain subscription creation Signed-off-by: Andrew Richardson --- internal/blockchain/ethereum/ethereum.go | 8 +- internal/blockchain/ethereum/ethereum_test.go | 75 ++++++++++++ internal/blockchain/ethereum/eventstream.go | 31 +++-- internal/blockchain/fabric/eventstream.go | 26 ++-- internal/blockchain/fabric/fabric.go | 8 +- internal/blockchain/fabric/fabric_test.go | 113 ++++++++++++++++++ internal/coremsgs/en_error_messages.go | 1 + internal/multiparty/manager.go | 1 + internal/multiparty/manager_test.go | 34 +++++- pkg/blockchain/plugin.go | 2 +- 10 files changed, 265 insertions(+), 34 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 2b86a400d0..9b301a657c 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -282,7 +282,7 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace string, firstEvent = "latest" } - sub, err := e.streams.ensureFireFlySubscription(ctx, namespace, ethLocation.Address, firstEvent, e.streamID, batchPinEventABI) + sub, subNS, err := e.streams.ensureFireFlySubscription(ctx, namespace, ethLocation.Address, firstEvent, e.streamID, batchPinEventABI) if err != nil { return "", err } @@ -292,8 +292,12 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace string, return "", err } + if version > 1 && subNS == "" { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidSubscriptionForNetwork, sub.Name, version) + } + e.subs[sub.ID] = subscriptionInfo{ - namespace: namespace, + namespace: subNS, version: version, } return sub.ID, nil diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index a97ebef878..cfa76e1bc6 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -462,6 +462,45 @@ func TestInitAllExistingStreams(t *testing.T) { httpmock.ActivateNonDefault(mockedClient) defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{ + {ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin_3078373143373635" /* this is the subname for our combo of instance path and BatchPin */}, + })) + httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345", + httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}})) + httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 1)) + httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, subscription{})) + + resetConf(e) + utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utEthconnectConf.Set(EthconnectConfigInstanceDeprecated, "0x71C7656EC7ab88b098defB751B7401B5f6d8976F") + utEthconnectConf.Set(EthconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x71C7656EC7ab88b098defB751B7401B5f6d8976F", + }.String()) + + err := e.Init(e.ctx, utConfig, e.metrics) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.NoError(t, err) + + assert.Equal(t, 4, httpmock.GetTotalCallCount()) + assert.Equal(t, "es12345", e.streamID) +} + +func TestInitAllExistingStreamsOld(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", @@ -493,6 +532,42 @@ func TestInitAllExistingStreams(t *testing.T) { assert.Equal(t, "es12345", e.streamID) } +func TestInitAllExistingStreamsInvalidName(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{ + {ID: "sub12345", Stream: "es12345", Name: "BatchPin_3078373143373635" /* this is the subname for our combo of instance path and BatchPin */}, + })) + httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345", + httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}})) + httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 2)) + httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, subscription{})) + + resetConf(e) + utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utEthconnectConf.Set(EthconnectConfigInstanceDeprecated, "0x71C7656EC7ab88b098defB751B7401B5f6d8976F") + utEthconnectConf.Set(EthconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x71C7656EC7ab88b098defB751B7401B5f6d8976F", + }.String()) + + err := e.Init(e.ctx, utConfig, e.metrics) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.Regexp(t, "FF10413", err) +} + func TestSubQueryError(t *testing.T) { e, cancel := newTestEthereum() diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index 1f687a3e0d..a88ca159e7 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -171,7 +171,7 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string) er return nil } -func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, instancePath, fromBlock, stream string, abi *abi.Entry) (sub *subscription, err error) { +func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, instancePath, fromBlock, stream string, abi *abi.Entry) (sub *subscription, subNS string, err error) { // Include a hash of the instance path in the subscription, so if we ever point at a different // contract configuration, we re-subscribe from block 0. // We don't need full strength hashing, so just use the first 16 chars for readability. @@ -179,20 +179,24 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace existingSubs, err := s.getSubscriptions(ctx) if err != nil { - return nil, err + return nil, "", err } - subName := fmt.Sprintf("%s_%s_%s", namespace, abi.Name, instanceUniqueHash) + oldName1 := abi.Name + oldName2 := fmt.Sprintf("%s_%s", abi.Name, instanceUniqueHash) + currentName := fmt.Sprintf("%s_%s_%s", namespace, abi.Name, instanceUniqueHash) for _, s := range existingSubs { - if s.Stream == stream && (s.Name == subName || - /* Check for the deprecates names, before adding namespace uniqueness qualifier. - NOTE: If one of these very early environments needed a new subscription, the existing one would need to + if s.Stream == stream { + /* Check for the deprecated names, before adding namespace uniqueness qualifier. + NOTE: If one of these early environments needed a new subscription, the existing one would need to be deleted manually. */ - s.Name == abi.Name || s.Name == fmt.Sprintf("%s_%s", abi.Name, instanceUniqueHash)) { - sub = s - if s.Name == abi.Name || s.Name == fmt.Sprintf("%s_%s", abi.Name, instanceUniqueHash) { - log.L(ctx).Warnf("Subscription %s uses deprecated functionality, please upgrade to utilize multiple namespaces.", s.Name) + if s.Name == oldName1 || s.Name == oldName2 { + log.L(ctx).Warnf("Subscription %s uses a legacy name format '%s' and may not support multiple namespaces. Expected '%s' instead.", s.ID, s.Name, currentName) + sub = s + } else if s.Name == currentName { + sub = s + subNS = namespace } } } @@ -202,11 +206,12 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace } if sub == nil { - if sub, err = s.createSubscription(ctx, location, stream, subName, fromBlock, abi); err != nil { - return nil, err + if sub, err = s.createSubscription(ctx, location, stream, currentName, fromBlock, abi); err != nil { + return nil, "", err } + subNS = namespace } log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID) - return sub, nil + return sub, subNS, nil } diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index ccc6ba194c..2109f38632 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -151,28 +151,34 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string) er return nil } -func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, location *Location, fromBlock, stream, event string) (sub *subscription, err error) { +func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, location *Location, fromBlock, stream, event string) (sub *subscription, subNS string, err error) { existingSubs, err := s.getSubscriptions(ctx) if err != nil { - return nil, err + return nil, "", err } - subName := fmt.Sprintf("%s_%s", namespace, event) + oldName := event + currentName := fmt.Sprintf("%s_%s", namespace, event) + for _, s := range existingSubs { - if s.Stream == stream && (s.Name == subName || s.Name == event) { - sub = s - if s.Name == event { - log.L(ctx).Warnf("Subscription %s uses deprecated functionality, please upgrade to utilize multiple namespaces.", s.Name) + if s.Stream == stream { + if s.Name == oldName { + log.L(ctx).Warnf("Subscription %s uses a legacy name format '%s' and may not support multiple namespaces. Expected '%s' instead.", s.ID, s.Name, currentName) + sub = s + } else if s.Name == currentName { + sub = s + subNS = namespace } } } if sub == nil { - if sub, err = s.createSubscription(ctx, location, stream, subName, event, fromBlock); err != nil { - return nil, err + if sub, err = s.createSubscription(ctx, location, stream, currentName, event, fromBlock); err != nil { + return nil, "", err } + subNS = namespace } log.L(ctx).Infof("%s subscription: %s", event, sub.ID) - return sub, nil + return sub, subNS, nil } diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index d926f93740..3bbd3a0e1d 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -454,7 +454,7 @@ func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace string, l return "", err } - sub, err := f.streams.ensureFireFlySubscription(ctx, namespace, fabricOnChainLocation, firstEvent, f.streamID, batchPinEvent) + sub, subNS, err := f.streams.ensureFireFlySubscription(ctx, namespace, fabricOnChainLocation, firstEvent, f.streamID, batchPinEvent) if err != nil { return "", err } @@ -464,8 +464,12 @@ func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace string, l return "", err } + if version > 1 && subNS == "" { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidSubscriptionForNetwork, sub.Name, version) + } + f.subs[sub.ID] = subscriptionInfo{ - namespace: namespace, + namespace: subNS, channel: fabricOnChainLocation.Channel, version: version, } diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 01bb34718c..ac2dd967d6 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -244,6 +244,44 @@ func TestInitAllExistingStreams(t *testing.T) { assert.Equal(t, "es12345", e.streamID) } +func TestInitAllExistingStreamsOld(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{ + {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, + })) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 1)) + + resetConf(e) + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + utFabconnectConf.Set(FabconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "channel": "firefly", + "chaincode": "simplestorage", + }.String()) + + err := e.Init(e.ctx, utConfig, &metricsmocks.Manager{}) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.NoError(t, err) + + assert.Equal(t, 3, httpmock.GetTotalCallCount()) + assert.Equal(t, "es12345", e.streamID) +} + func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { e, cancel := newTestFabric() defer cancel() @@ -351,6 +389,41 @@ func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) { assert.NoError(t, err) } +func TestAddFireflySubscriptionInvalidSubName(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{ + {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, + })) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 2)) + + resetConf(e) + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + utFabconnectConf.Set(FabconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "channel": "firefly", + "chaincode": "simplestorage", + }.String()) + + err := e.Init(e.ctx, utConfig, &metricsmocks.Manager{}) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.Regexp(t, "FF10413", err) +} + func TestRemoveUnknownFireflySubscription(t *testing.T) { e, _ := newTestFabric() @@ -482,6 +555,46 @@ func TestSubQueryCreateError(t *testing.T) { } +func TestSubQueryCreate(t *testing.T) { + + e, cancel := newTestFabric() + defer cancel() + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, eventStream{ID: "es12345"})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, subscription{ID: "sb-123"})) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 1)) + + resetConf(e) + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(ffresty.HTTPConfigRetryEnabled, false) + utFabconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + utFabconnectConf.Set(FabconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "channel": "firefly", + "chaincode": "simplestorage", + }.String()) + + err := e.Init(e.ctx, utConfig, &metricsmocks.Manager{}) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.NoError(t, err) + +} + func TestSubmitBatchPinOK(t *testing.T) { e, cancel := newTestFabric() diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index 64dfd5840a..11cdd76f77 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -252,4 +252,5 @@ var ( MsgDefRejectedHashMismatch = ffe("FF10410", "Rejected %s '%s' - hash mismatch: %s != %s") MsgInvalidNamespaceUUID = ffe("FF10411", "Expected 'namespace:' prefix on ID '%s'", 400) MsgSubscriptionIDInvalid = ffe("FF10412", "Invalid subscription ID: %s") + MsgInvalidSubscriptionForNetwork = ffe("FF10413", "Subscription name '%s' is invalid according to multiparty network rules in effect (network version=%d)") ) diff --git a/internal/multiparty/manager.go b/internal/multiparty/manager.go index b0ad7c8ade..ccb473b2d4 100644 --- a/internal/multiparty/manager.go +++ b/internal/multiparty/manager.go @@ -131,6 +131,7 @@ func (mm *multipartyManager) ConfigureContract(ctx context.Context, contracts *c if err != nil { return err } + version, err := mm.blockchain.GetNetworkVersion(ctx, location) if err != nil { return err diff --git a/internal/multiparty/manager_test.go b/internal/multiparty/manager_test.go index c1c417108a..3535d892a8 100644 --- a/internal/multiparty/manager_test.go +++ b/internal/multiparty/manager_test.go @@ -105,6 +105,8 @@ func TestConfigureContract(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts @@ -129,6 +131,8 @@ func TestConfigureContractOldestBlock(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts @@ -155,6 +159,8 @@ func TestConfigureContractNewestBlock(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts @@ -171,6 +177,7 @@ func TestConfigureContractNewestBlock(t *testing.T) { func TestResolveContractDeprecatedConfig(t *testing.T) { mp := newTestMultipartyManager() + defer mp.cleanup(t) mp.mbi.On("GetAndConvertDeprecatedContractConfig", context.Background()).Return(fftypes.JSONAnyPtr(fftypes.JSONObject{ "address": "0x123", @@ -188,6 +195,7 @@ func TestResolveContractDeprecatedConfig(t *testing.T) { func TestResolveContractDeprecatedConfigError(t *testing.T) { mp := newTestMultipartyManager() + defer mp.cleanup(t) mp.mbi.On("GetAndConvertDeprecatedContractConfig", context.Background()).Return(nil, "", fmt.Errorf("pop")) @@ -197,6 +205,7 @@ func TestResolveContractDeprecatedConfigError(t *testing.T) { func TestResolveContractDeprecatedConfigNewestBlock(t *testing.T) { mp := newTestMultipartyManager() + defer mp.cleanup(t) mp.mbi.On("GetAndConvertDeprecatedContractConfig", context.Background()).Return(fftypes.JSONAnyPtr(fftypes.JSONObject{ "address": "0x123", @@ -218,8 +227,8 @@ func TestConfigureContractBadIndex(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() - mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) - mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) + defer mp.cleanup(t) + mp.multipartyManager.config.Contracts = contracts cf := &core.FireFlyContracts{ @@ -242,8 +251,9 @@ func TestConfigureContractNetworkVersionFail(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(0, fmt.Errorf("pop")) - mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts cf := &core.FireFlyContracts{ @@ -267,6 +277,8 @@ func TestSubmitNetworkAction(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.multipartyManager.config.Contracts = contracts mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) @@ -313,6 +325,8 @@ func TestSubmitNetworkActionTXFail(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.multipartyManager.config.Contracts = contracts mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) @@ -347,6 +361,8 @@ func TestSubmitNetworkActionOpFail(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.multipartyManager.config.Contracts = contracts mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) @@ -383,6 +399,8 @@ func TestSubmitNetworkActionBadType(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts @@ -513,6 +531,8 @@ func TestGetNetworkVersion(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts @@ -545,6 +565,8 @@ func TestConfgureAndTerminateContract(t *testing.T) { contracts[0] = contract contracts[1] = contract2 mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.mbi.On("RemoveFireflySubscription", mock.Anything, mock.Anything).Return(nil) @@ -562,7 +584,8 @@ func TestConfgureAndTerminateContract(t *testing.T) { func TestTerminateContractError(t *testing.T) { mp := newTestMultipartyManager() - mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) + defer mp.cleanup(t) + mp.mbi.On("RemoveFireflySubscription", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) location := fftypes.JSONAnyPtr(fftypes.JSONObject{ @@ -578,8 +601,7 @@ func TestTerminateContractError(t *testing.T) { func TestTerminateContractWrongAddress(t *testing.T) { mp := newTestMultipartyManager() - mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) - mp.mbi.On("RemoveFireflySubscription", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + defer mp.cleanup(t) location := fftypes.JSONAnyPtr(fftypes.JSONObject{ "address": "0x123", diff --git a/pkg/blockchain/plugin.go b/pkg/blockchain/plugin.go index 14c8680b1a..46e8858214 100644 --- a/pkg/blockchain/plugin.go +++ b/pkg/blockchain/plugin.go @@ -90,7 +90,7 @@ type Plugin interface { GetAndConvertDeprecatedContractConfig(ctx context.Context) (location *fftypes.JSONAny, fromBlock string, err error) // AddFireflySubscription creates a FireFly BatchPin subscription for the provided location - AddFireflySubscription(ctx context.Context, namespace string, location *fftypes.JSONAny, firstEvent string) (string, error) + AddFireflySubscription(ctx context.Context, namespace string, location *fftypes.JSONAny, firstEvent string) (subID string, err error) // RemoveFireFlySubscription removes the provided FireFly subscription RemoveFireflySubscription(ctx context.Context, subID string) error