diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 9449a8ffcb..9b301a657c 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -68,39 +68,56 @@ type Ethereum struct { addressResolver *addressResolver metrics metrics.Manager ethconnectConf config.Section - subs map[string]string + subs map[string]subscriptionInfo +} + +type subscriptionInfo struct { + namespace string + version int } type callbacks struct { - handlers []blockchain.Callbacks + handlers map[string]blockchain.Callbacks } -func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { - for _, cb := range cb.handlers { - cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) +func (cb *callbacks) BlockchainOpUpdate(ctx context.Context, plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { + namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID) + if handler, ok := cb.handlers[namespace]; ok { + handler.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) + return } + log.L(ctx).Errorf("No handler found for blockchain operation '%s'", nsOpID) } -func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { - for _, cb := range cb.handlers { - if err := cb.BatchPinComplete(batch, signingKey); err != nil { - return err - } +func (cb *callbacks) BatchPinComplete(ctx context.Context, batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { + if handler, ok := cb.handlers[batch.Namespace]; ok { + return handler.BatchPinComplete(batch, signingKey) } + log.L(ctx).Errorf("No handler found for blockchain batch pin on namespace '%s'", batch.Namespace) return nil } -func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - for _, cb := range cb.handlers { - if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil { - return err +func (cb *callbacks) BlockchainNetworkAction(ctx context.Context, namespace, action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { + if namespace == "" { + // V1 networks don't populate namespace, so deliver the event to every handler + for _, handler := range cb.handlers { + if err := handler.BlockchainNetworkAction(action, location, event, signingKey); err != nil { + return err + } + } + } else { + if handler, ok := cb.handlers[namespace]; ok { + return handler.BlockchainNetworkAction(action, location, event, signingKey) } + log.L(ctx).Errorf("No handler found for blockchain network action on namespace '%s'", namespace) } return nil } func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { for _, cb := range cb.handlers { + // Send the event to all handlers and let them match it to a contract listener + // TODO: can we push more listener/namespace knowledge down to this layer? if err := cb.BlockchainEvent(event); err != nil { return err } @@ -190,6 +207,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr e.ctx = log.WithLogField(ctx, "proto", "ethereum") e.metrics = metrics e.capabilities = &blockchain.Capabilities{} + e.callbacks.handlers = make(map[string]blockchain.Callbacks) if addressResolverConf.GetString(AddressResolverURLTemplate) != "" { if e.addressResolver, err = newAddressResolver(ctx, addressResolverConf); err != nil { @@ -230,7 +248,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr return err } e.streamID = stream.ID - e.subs = make(map[string]string) + e.subs = make(map[string]subscriptionInfo) log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.streamID, e.topic) e.closed = make(chan struct{}) @@ -239,8 +257,8 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr return nil } -func (e *Ethereum) SetHandler(handler blockchain.Callbacks) { - e.callbacks.handlers = append(e.callbacks.handlers, handler) +func (e *Ethereum) SetHandler(namespace string, handler blockchain.Callbacks) { + e.callbacks.handlers[namespace] = handler } func (e *Ethereum) Start() (err error) { @@ -264,15 +282,24 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace string, firstEvent = "latest" } - sub, err := e.streams.ensureFireFlySubscription(ctx, namespace, ethLocation.Address, firstEvent, e.streamID, batchPinEventABI) + sub, subNS, err := e.streams.ensureFireFlySubscription(ctx, namespace, ethLocation.Address, firstEvent, e.streamID, batchPinEventABI) if err != nil { return "", err } - // TODO: We will probably need to save the namespace AND network version here - // Ultimately there needs to be a logic branch in the event handling, where for "V1" we expect to receive a namespace in every - // BatchPin event, but for "V2" we infer the namespace based on which subscription ID produced it. - e.subs[sub.ID] = namespace + version, err := e.GetNetworkVersion(ctx, location) + if err != nil { + return "", err + } + + if version > 1 && subNS == "" { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidSubscriptionForNetwork, sub.Name, version) + } + + e.subs[sub.ID] = subscriptionInfo{ + namespace: subNS, + version: version, + } return sub.ID, nil } @@ -346,7 +373,7 @@ func (e *Ethereum) parseBlockchainEvent(ctx context.Context, msgJSON fftypes.JSO } } -func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { +func (e *Ethereum) handleBatchPinEvent(ctx context.Context, location *fftypes.JSONAny, subInfo *subscriptionInfo, msgJSON fftypes.JSONObject) (err error) { event := e.parseBlockchainEvent(ctx, msgJSON) if event == nil { return nil // move on @@ -377,7 +404,24 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON // Check if this is actually an operator action if strings.HasPrefix(nsOrAction, blockchain.FireFlyActionPrefix) { action := nsOrAction[len(blockchain.FireFlyActionPrefix):] - return e.callbacks.BlockchainNetworkAction(action, event, verifier) + + // For V1 of the FireFly contract, action is sent to all namespaces + // For V2+, namespace is inferred from the subscription + var namespace string + if subInfo.version > 1 { + namespace = subInfo.namespace + } + + return e.callbacks.BlockchainNetworkAction(ctx, namespace, action, location, event, verifier) + } + + // For V1 of the FireFly contract, namespace is passed explicitly + // For V2+, namespace is inferred from the subscription + var namespace string + if subInfo.version == 1 { + namespace = nsOrAction + } else { + namespace = subInfo.namespace } hexUUIDs, err := hex.DecodeString(strings.TrimPrefix(sUUIDs, "0x")) @@ -409,7 +453,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON } batch := &blockchain.BatchPin{ - Namespace: nsOrAction, + Namespace: namespace, TransactionID: &txnID, BatchID: &batchID, BatchHash: &batchHash, @@ -419,7 +463,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON } // If there's an error dispatching the event, we must return the error and shutdown - return e.callbacks.BatchPinComplete(batch, verifier) + return e.callbacks.BatchPinComplete(ctx, batch, verifier) } func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { @@ -450,7 +494,7 @@ func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject) updateType = core.OpStatusFailed } l.Infof("Ethconnect '%s' reply: request=%s tx=%s message=%s", replyType, requestID, txHash, message) - e.callbacks.BlockchainOpUpdate(e, requestID, updateType, txHash, message, reply) + e.callbacks.BlockchainOpUpdate(ctx, e, requestID, updateType, txHash, message, reply) } func (e *Ethereum) buildEventLocationString(msgJSON fftypes.JSONObject) string { @@ -476,10 +520,17 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{ l1.Tracef("Message: %+v", msgJSON) // Matches one of the active FireFly BatchPin subscriptions - if _, ok := e.subs[sub]; ok { + if subInfo, ok := e.subs[sub]; ok { + location, err := encodeContractLocation(ctx, &Location{ + Address: msgJSON.GetString("address"), + }) + if err != nil { + return err + } + switch signature { case broadcastBatchEventSignature: - if err := e.handleBatchPinEvent(ctx1, msgJSON); err != nil { + if err := e.handleBatchPinEvent(ctx1, location, &subInfo, msgJSON); err != nil { return err } default: @@ -719,15 +770,7 @@ func (e *Ethereum) NormalizeContractLocation(ctx context.Context, location *ffty if err != nil { return nil, err } - parsed.Address, err = validateEthAddress(ctx, parsed.Address) - if err != nil { - return nil, err - } - normalized, err := json.Marshal(parsed) - if err == nil { - result = fftypes.JSONAnyPtrBytes(normalized) - } - return result, err + return encodeContractLocation(ctx, parsed) } func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Location, error) { @@ -741,6 +784,18 @@ func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Loc return ðLocation, nil } +func encodeContractLocation(ctx context.Context, location *Location) (result *fftypes.JSONAny, err error) { + location.Address, err = validateEthAddress(ctx, location.Address) + if err != nil { + return nil, err + } + normalized, err := json.Marshal(location) + if err == nil { + result = fftypes.JSONAnyPtrBytes(normalized) + } + return result, err +} + func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) error { location, err := parseContractLocation(ctx, listener.Location) if err != nil { @@ -1157,16 +1212,9 @@ func (e *Ethereum) GetAndConvertDeprecatedContractConfig(ctx context.Context) (l } else if strings.HasPrefix(address, "/instances/") { address = strings.Replace(address, "/instances/", "", 1) } - address, err = validateEthAddress(ctx, address) - if err != nil { - return nil, "", err - } - ethLocation := &Location{ + location, err = encodeContractLocation(ctx, &Location{ Address: address, - } - normalized, _ := json.Marshal(ethLocation) - location = fftypes.JSONAnyPtrBytes(normalized) - - return location, fromBlock, nil + }) + return location, fromBlock, err } diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 9ae9304a5b..86f5629a74 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -92,6 +92,7 @@ func newTestEthereum() (*Ethereum, func()) { wsconn: wsm, metrics: mm, } + e.callbacks.handlers = make(map[string]blockchain.Callbacks) return e, func() { cancel() if e.closed != nil { @@ -461,6 +462,45 @@ func TestInitAllExistingStreams(t *testing.T) { httpmock.ActivateNonDefault(mockedClient) defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{ + {ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin_3078373143373635" /* this is the subname for our combo of instance path and BatchPin */}, + })) + httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345", + httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}})) + httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 1)) + httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, subscription{})) + + resetConf(e) + utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utEthconnectConf.Set(EthconnectConfigInstanceDeprecated, "0x71C7656EC7ab88b098defB751B7401B5f6d8976F") + utEthconnectConf.Set(EthconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x71C7656EC7ab88b098defB751B7401B5f6d8976F", + }.String()) + + err := e.Init(e.ctx, utConfig, e.metrics) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.NoError(t, err) + + assert.Equal(t, 4, httpmock.GetTotalCallCount()) + assert.Equal(t, "es12345", e.streamID) +} + +func TestInitAllExistingStreamsOld(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", @@ -488,10 +528,46 @@ func TestInitAllExistingStreams(t *testing.T) { _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") assert.NoError(t, err) - assert.Equal(t, 3, httpmock.GetTotalCallCount()) + assert.Equal(t, 4, httpmock.GetTotalCallCount()) assert.Equal(t, "es12345", e.streamID) } +func TestInitAllExistingStreamsInvalidName(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{ + {ID: "sub12345", Stream: "es12345", Name: "BatchPin_3078373143373635" /* this is the subname for our combo of instance path and BatchPin */}, + })) + httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345", + httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}})) + httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 2)) + httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, subscription{})) + + resetConf(e) + utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utEthconnectConf.Set(EthconnectConfigInstanceDeprecated, "0x71C7656EC7ab88b098defB751B7401B5f6d8976F") + utEthconnectConf.Set(EthconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x71C7656EC7ab88b098defB751B7401B5f6d8976F", + }.String()) + + err := e.Init(e.ctx, utConfig, e.metrics) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.Regexp(t, "FF10416", err) +} + func TestSubQueryError(t *testing.T) { e, cancel := newTestEthereum() @@ -811,11 +887,14 @@ func TestHandleMessageBatchPinOK(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, @@ -870,6 +949,127 @@ func TestHandleMessageBatchPinOK(t *testing.T) { } +func TestHandleMessageBatchPinV2(t *testing.T) { + data := fftypes.JSONAnyPtr(` +[ + { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "data": { + "author": "0X91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "namespace": "", + "uuids": "0xe19af8b390604051812d7597d19adfb9847d3bfd074249efb65d3fed15f5b0a6", + "batchHash": "0xd71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", + "payloadRef": "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", + "contexts": [ + "0x68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", + "0x19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771" + ] + }, + "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", + "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])", + "logIndex": "50", + "timestamp": "1620576488" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Ethereum{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 2, + } + + expectedSigningKeyRef := &core.VerifierRef{ + Type: core.VerifierTypeEthAddress, + Value: "0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635", + } + + em.On("BatchPinComplete", mock.Anything, expectedSigningKeyRef, mock.Anything).Return(nil) + + var events []interface{} + err := json.Unmarshal(data.Bytes(), &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + + em.AssertExpectations(t) + + b := em.Calls[0].Arguments[0].(*blockchain.BatchPin) + assert.Equal(t, "ns1", b.Namespace) + assert.Equal(t, "e19af8b3-9060-4051-812d-7597d19adfb9", b.TransactionID.String()) + assert.Equal(t, "847d3bfd-0742-49ef-b65d-3fed15f5b0a6", b.BatchID.String()) + assert.Equal(t, "d71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", b.BatchHash.String()) + assert.Equal(t, "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", b.BatchPayloadRef) + assert.Equal(t, expectedSigningKeyRef, em.Calls[0].Arguments[1]) + assert.Len(t, b.Contexts, 2) + assert.Equal(t, "68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", b.Contexts[0].String()) + assert.Equal(t, "19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771", b.Contexts[1].String()) + + info1 := fftypes.JSONObject{ + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "logIndex": "50", + "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])", + "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "transactionIndex": "0x0", + "timestamp": "1620576488", + } + assert.Equal(t, info1, b.Event.Info) + +} + +func TestHandleMessageBatchPinMissingAddress(t *testing.T) { + data := fftypes.JSONAnyPtr(` +[ + { + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "data": { + "author": "0X91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "namespace": "ns1", + "uuids": "0xe19af8b390604051812d7597d19adfb9847d3bfd074249efb65d3fed15f5b0a6", + "batchHash": "0xd71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", + "payloadRef": "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", + "contexts": [ + "0x68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", + "0x19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771" + ] + }, + "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", + "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])", + "logIndex": "50", + "timestamp": "1620576488" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Ethereum{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + + var events []interface{} + err := json.Unmarshal(data.Bytes(), &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.Regexp(t, "FF10141", err) + +} + func TestHandleMessageBatchPinMissingAuthor(t *testing.T) { data := fftypes.JSONAnyPtr(` [ @@ -897,8 +1097,11 @@ func TestHandleMessageBatchPinMissingAuthor(t *testing.T) { ]`) e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } var events []interface{} err := json.Unmarshal(data.Bytes(), &events) @@ -936,11 +1139,14 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, @@ -999,10 +1205,13 @@ func TestHandleMessageBatchPinExit(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" em.On("BatchPinComplete", mock.Anything, expectedSigningKeyRef, mock.Anything).Return(fmt.Errorf("pop")) var events []interface{} @@ -1016,13 +1225,17 @@ func TestHandleMessageBatchPinExit(t *testing.T) { func TestHandleMessageBatchPinEmpty(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } var events []interface{} err := json.Unmarshal([]byte(` [ { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])" } @@ -1034,13 +1247,17 @@ func TestHandleMessageBatchPinEmpty(t *testing.T) { func TestHandleMessageBatchMissingData(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } var events []interface{} err := json.Unmarshal([]byte(` [ { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])", "timestamp": "1620576488" @@ -1053,8 +1270,12 @@ func TestHandleMessageBatchMissingData(t *testing.T) { func TestHandleMessageBatchPinBadTransactionID(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", @@ -1084,8 +1305,12 @@ func TestHandleMessageBatchPinBadTransactionID(t *testing.T) { func TestHandleMessageBatchPinBadIDentity(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", @@ -1115,8 +1340,12 @@ func TestHandleMessageBatchPinBadIDentity(t *testing.T) { func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", @@ -1146,8 +1375,12 @@ func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { func TestHandleMessageBatchPinBadPin(t *testing.T) { e := &Ethereum{} - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", @@ -1229,7 +1462,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { e := &Ethereum{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, wsconn: wsm, } @@ -1299,7 +1532,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) { }`) em := &blockchainmocks.Callbacks{} - e.SetHandler(em) + e.SetHandler("ns1", em) txsu := em.On("BlockchainOpUpdate", e, "ns1:"+operationID.String(), @@ -1544,10 +1777,14 @@ func TestHandleMessageContractEvent(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { assert.Equal(t, "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", e.BlockchainTXID) assert.Equal(t, "000000038011/000000/000050", e.Event.ProtocolID) @@ -1606,10 +1843,14 @@ func TestHandleMessageContractEventError(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop")) var events []interface{} @@ -2851,17 +3092,70 @@ func TestHandleNetworkAction(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + + expectedSigningKeyRef := &core.VerifierRef{ + Type: core.VerifierTypeEthAddress, + Value: "0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635", + } + + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(nil) + + var events []interface{} + err := json.Unmarshal(data.Bytes(), &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + + em.AssertExpectations(t) + +} + +func TestHandleNetworkActionV2(t *testing.T) { + data := fftypes.JSONAnyPtr(` +[ + { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "data": { + "author": "0X91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "namespace": "firefly:terminate", + "uuids": "0x0000000000000000000000000000000000000000000000000000000000000000", + "batchHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "payloadRef": "", + "contexts": [] + }, + "subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5", + "signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])", + "logIndex": "50", + "timestamp": "1620576488" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Ethereum{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 2, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635", } - em.On("BlockchainNetworkAction", "terminate", mock.Anything, expectedSigningKeyRef).Return(nil) + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(nil) var events []interface{} err := json.Unmarshal(data.Bytes(), &events) @@ -2898,17 +3192,20 @@ func TestHandleNetworkActionFail(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635", } - em.On("BlockchainNetworkAction", "terminate", mock.Anything, expectedSigningKeyRef).Return(fmt.Errorf("pop")) + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(fmt.Errorf("pop")) var events []interface{} err := json.Unmarshal(data.Bytes(), &events) @@ -3315,6 +3612,43 @@ func TestAddFireflySubscriptionCreateError(t *testing.T) { httpmock.ActivateNonDefault(mockedClient) defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, eventStream{ID: "es12345"})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, subscription{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/", + httpmock.NewJsonResponderOrPanic(500, `pop`)) + + resetConf(e) + utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utEthconnectConf.Set(EthconnectConfigTopic, "topic1") + utConfig.AddKnownKey(FireFlyContractConfigKey+".0."+FireFlyContractAddress, "0x71C7656EC7ab88b098defB751B7401B5f6d8976F") + + err := e.Init(e.ctx, utConfig, e.metrics) + assert.NoError(t, err) + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x123", + }.String()) + + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.Regexp(t, "FF10111", err) +} + +func TestAddFireflySubscriptionGetVersionError(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + resetConf(e) + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", httpmock.NewJsonResponderOrPanic(200, []eventStream{})) httpmock.RegisterResponder("POST", "http://localhost:12345/eventstreams", @@ -3348,3 +3682,11 @@ func TestRemoveInvalidSubscription(t *testing.T) { err := e.RemoveFireflySubscription(e.ctx, "bad") assert.Regexp(t, "FF10412", err) } + +func TestCallbacksWrongNamespace(t *testing.T) { + e, _ := newTestEthereum() + nsOpID := "ns1:" + fftypes.NewUUID().String() + e.callbacks.BlockchainOpUpdate(context.Background(), e, nsOpID, core.OpStatusSucceeded, "tx123", "", nil) + e.callbacks.BatchPinComplete(context.Background(), &blockchain.BatchPin{Namespace: "ns1"}, nil) + e.callbacks.BlockchainNetworkAction(context.Background(), "ns1", "terminate", nil, nil, nil) +} diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index 1f687a3e0d..a88ca159e7 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -171,7 +171,7 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string) er return nil } -func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, instancePath, fromBlock, stream string, abi *abi.Entry) (sub *subscription, err error) { +func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, instancePath, fromBlock, stream string, abi *abi.Entry) (sub *subscription, subNS string, err error) { // Include a hash of the instance path in the subscription, so if we ever point at a different // contract configuration, we re-subscribe from block 0. // We don't need full strength hashing, so just use the first 16 chars for readability. @@ -179,20 +179,24 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace existingSubs, err := s.getSubscriptions(ctx) if err != nil { - return nil, err + return nil, "", err } - subName := fmt.Sprintf("%s_%s_%s", namespace, abi.Name, instanceUniqueHash) + oldName1 := abi.Name + oldName2 := fmt.Sprintf("%s_%s", abi.Name, instanceUniqueHash) + currentName := fmt.Sprintf("%s_%s_%s", namespace, abi.Name, instanceUniqueHash) for _, s := range existingSubs { - if s.Stream == stream && (s.Name == subName || - /* Check for the deprecates names, before adding namespace uniqueness qualifier. - NOTE: If one of these very early environments needed a new subscription, the existing one would need to + if s.Stream == stream { + /* Check for the deprecated names, before adding namespace uniqueness qualifier. + NOTE: If one of these early environments needed a new subscription, the existing one would need to be deleted manually. */ - s.Name == abi.Name || s.Name == fmt.Sprintf("%s_%s", abi.Name, instanceUniqueHash)) { - sub = s - if s.Name == abi.Name || s.Name == fmt.Sprintf("%s_%s", abi.Name, instanceUniqueHash) { - log.L(ctx).Warnf("Subscription %s uses deprecated functionality, please upgrade to utilize multiple namespaces.", s.Name) + if s.Name == oldName1 || s.Name == oldName2 { + log.L(ctx).Warnf("Subscription %s uses a legacy name format '%s' and may not support multiple namespaces. Expected '%s' instead.", s.ID, s.Name, currentName) + sub = s + } else if s.Name == currentName { + sub = s + subNS = namespace } } } @@ -202,11 +206,12 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace } if sub == nil { - if sub, err = s.createSubscription(ctx, location, stream, subName, fromBlock, abi); err != nil { - return nil, err + if sub, err = s.createSubscription(ctx, location, stream, currentName, fromBlock, abi); err != nil { + return nil, "", err } + subNS = namespace } log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID) - return sub, nil + return sub, subNS, nil } diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index ccc6ba194c..2109f38632 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -151,28 +151,34 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string) er return nil } -func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, location *Location, fromBlock, stream, event string) (sub *subscription, err error) { +func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, location *Location, fromBlock, stream, event string) (sub *subscription, subNS string, err error) { existingSubs, err := s.getSubscriptions(ctx) if err != nil { - return nil, err + return nil, "", err } - subName := fmt.Sprintf("%s_%s", namespace, event) + oldName := event + currentName := fmt.Sprintf("%s_%s", namespace, event) + for _, s := range existingSubs { - if s.Stream == stream && (s.Name == subName || s.Name == event) { - sub = s - if s.Name == event { - log.L(ctx).Warnf("Subscription %s uses deprecated functionality, please upgrade to utilize multiple namespaces.", s.Name) + if s.Stream == stream { + if s.Name == oldName { + log.L(ctx).Warnf("Subscription %s uses a legacy name format '%s' and may not support multiple namespaces. Expected '%s' instead.", s.ID, s.Name, currentName) + sub = s + } else if s.Name == currentName { + sub = s + subNS = namespace } } } if sub == nil { - if sub, err = s.createSubscription(ctx, location, stream, subName, event, fromBlock); err != nil { - return nil, err + if sub, err = s.createSubscription(ctx, location, stream, currentName, event, fromBlock); err != nil { + return nil, "", err } + subNS = namespace } log.L(ctx).Infof("%s subscription: %s", event, sub.ID) - return sub, nil + return sub, subNS, nil } diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index a0436face1..3bbd3a0e1d 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -59,39 +59,57 @@ type Fabric struct { closed chan struct{} metrics metrics.Manager fabconnectConf config.Section - subs map[string]string + subs map[string]subscriptionInfo +} + +type subscriptionInfo struct { + namespace string + channel string + version int } type callbacks struct { - handlers []blockchain.Callbacks + handlers map[string]blockchain.Callbacks } -func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { - for _, cb := range cb.handlers { - cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) +func (cb *callbacks) BlockchainOpUpdate(ctx context.Context, plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { + namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID) + if handler, ok := cb.handlers[namespace]; ok { + handler.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) + } else { + log.L(ctx).Errorf("No handler found for blockchain operation '%s'", nsOpID) } } -func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { - for _, cb := range cb.handlers { - if err := cb.BatchPinComplete(batch, signingKey); err != nil { - return err - } +func (cb *callbacks) BatchPinComplete(ctx context.Context, batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { + if handler, ok := cb.handlers[batch.Namespace]; ok { + return handler.BatchPinComplete(batch, signingKey) } + log.L(ctx).Errorf("No handler found for blockchain batch pin on namespace '%s'", batch.Namespace) return nil } -func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - for _, cb := range cb.handlers { - if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil { - return err +func (cb *callbacks) BlockchainNetworkAction(ctx context.Context, namespace, action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { + if namespace == "" { + // Older networks don't populate namespace, so deliver the event to every handler + for _, handler := range cb.handlers { + if err := handler.BlockchainNetworkAction(action, location, event, signingKey); err != nil { + return err + } + } + } else { + if handler, ok := cb.handlers[namespace]; ok { + return handler.BlockchainNetworkAction(action, location, event, signingKey) } + log.L(ctx).Errorf("No handler found for blockchain network action on namespace '%s'", namespace) } return nil } func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { for _, cb := range cb.handlers { + // Send the event to all handlers and let them match it to a contract listener + // TODO: can we push more listener/namespace knowledge down to this layer? if err := cb.BlockchainEvent(event); err != nil { return err } @@ -197,6 +215,7 @@ func (f *Fabric) Init(ctx context.Context, config config.Section, metrics metric f.idCache = make(map[string]*fabIdentity) f.metrics = metrics f.capabilities = &blockchain.Capabilities{} + f.callbacks.handlers = make(map[string]blockchain.Callbacks) if fabconnectConf.GetString(ffresty.HTTPConfigURL) == "" { return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "blockchain.fabric.fabconnect") @@ -230,7 +249,7 @@ func (f *Fabric) Init(ctx context.Context, config config.Section, metrics metric return err } f.streamID = stream.ID - f.subs = make(map[string]string) + f.subs = make(map[string]subscriptionInfo) log.L(f.ctx).Infof("Event stream: %s", f.streamID) f.closed = make(chan struct{}) @@ -239,8 +258,8 @@ func (f *Fabric) Init(ctx context.Context, config config.Section, metrics metric return nil } -func (f *Fabric) SetHandler(handler blockchain.Callbacks) { - f.callbacks.handlers = append(f.callbacks.handlers, handler) +func (f *Fabric) SetHandler(namespace string, handler blockchain.Callbacks) { + f.callbacks.handlers[namespace] = handler } func (f *Fabric) Start() (err error) { @@ -311,7 +330,7 @@ func (f *Fabric) parseBlockchainEvent(ctx context.Context, msgJSON fftypes.JSONO } } -func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { +func (f *Fabric) handleBatchPinEvent(ctx context.Context, location *fftypes.JSONAny, subInfo *subscriptionInfo, msgJSON fftypes.JSONObject) (err error) { event := f.parseBlockchainEvent(ctx, msgJSON) if event == nil { return nil // move on @@ -332,7 +351,24 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb // Check if this is actually an operator action if strings.HasPrefix(nsOrAction, blockchain.FireFlyActionPrefix) { action := nsOrAction[len(blockchain.FireFlyActionPrefix):] - return f.callbacks.BlockchainNetworkAction(action, event, verifier) + + // For V1 of the FireFly contract, action is sent to all namespaces + // For V2+, namespace is inferred from the subscription + var namespace string + if subInfo.version > 1 { + namespace = subInfo.namespace + } + + return f.callbacks.BlockchainNetworkAction(ctx, namespace, action, location, event, verifier) + } + + // For V1 of the FireFly contract, namespace is passed explicitly + // For V2+, namespace is inferred from the subscription + var namespace string + if subInfo.version == 1 { + namespace = nsOrAction + } else { + namespace = subInfo.namespace } hexUUIDs, err := hex.DecodeString(strings.TrimPrefix(sUUIDs, "0x")) @@ -364,7 +400,7 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb } batch := &blockchain.BatchPin{ - Namespace: nsOrAction, + Namespace: namespace, TransactionID: &txnID, BatchID: &batchID, BatchHash: &batchHash, @@ -374,7 +410,7 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb } // If there's an error dispatching the event, we must return the error and shutdown - return f.callbacks.BatchPinComplete(batch, verifier) + return f.callbacks.BatchPinComplete(ctx, batch, verifier) } func (f *Fabric) buildEventLocationString(chaincode string) string { @@ -409,7 +445,7 @@ func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) { updateType = core.OpStatusFailed } l.Infof("Fabconnect '%s' reply tx=%s (request=%s) %s", replyType, txHash, requestID, message) - f.callbacks.BlockchainOpUpdate(f, requestID, updateType, txHash, message, reply) + f.callbacks.BlockchainOpUpdate(ctx, f, requestID, updateType, txHash, message, reply) } func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace string, location *fftypes.JSONAny, firstEvent string) (string, error) { @@ -418,12 +454,25 @@ func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace string, l return "", err } - sub, err := f.streams.ensureFireFlySubscription(ctx, namespace, fabricOnChainLocation, firstEvent, f.streamID, batchPinEvent) + sub, subNS, err := f.streams.ensureFireFlySubscription(ctx, namespace, fabricOnChainLocation, firstEvent, f.streamID, batchPinEvent) + if err != nil { + return "", err + } + + version, err := f.GetNetworkVersion(ctx, location) if err != nil { return "", err } - f.subs[sub.ID] = namespace + if version > 1 && subNS == "" { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidSubscriptionForNetwork, sub.Name, version) + } + + f.subs[sub.ID] = subscriptionInfo{ + namespace: subNS, + channel: fabricOnChainLocation.Channel, + version: version, + } return sub.ID, nil } @@ -458,10 +507,18 @@ func (f *Fabric) handleMessageBatch(ctx context.Context, messages []interface{}) l1.Tracef("Message: %+v", msgJSON) // Matches one of the active FireFly BatchPin subscriptions - if _, ok := f.subs[sub]; ok { + if subInfo, ok := f.subs[sub]; ok { + location, err := encodeContractLocation(ctx, &Location{ + Chaincode: msgJSON.GetString("chaincodeId"), + Channel: subInfo.channel, + }) + if err != nil { + return err + } + switch eventName { case broadcastBatchEventName: - if err := f.handleBatchPinEvent(ctx1, msgJSON); err != nil { + if err := f.handleBatchPinEvent(ctx1, location, &subInfo, msgJSON); err != nil { return err } default: @@ -759,11 +816,7 @@ func (f *Fabric) NormalizeContractLocation(ctx context.Context, location *fftype if err != nil { return nil, err } - normalized, err := json.Marshal(parsed) - if err == nil { - result = fftypes.JSONAnyPtrBytes(normalized) - } - return result, err + return encodeContractLocation(ctx, parsed) } func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Location, error) { @@ -780,6 +833,20 @@ func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Loc return &fabricLocation, nil } +func encodeContractLocation(ctx context.Context, location *Location) (result *fftypes.JSONAny, err error) { + if location.Channel == "" { + return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, "'channel' not set") + } + if location.Chaincode == "" { + return nil, i18n.NewError(ctx, coremsgs.MsgContractLocationInvalid, "'chaincode' not set") + } + normalized, err := json.Marshal(location) + if err == nil { + result = fftypes.JSONAnyPtrBytes(normalized) + } + return result, err +} + func (f *Fabric) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) error { location, err := parseContractLocation(ctx, listener.Location) if err != nil { @@ -843,13 +910,9 @@ func (f *Fabric) GetAndConvertDeprecatedContractConfig(ctx context.Context) (loc } fromBlock = string(core.SubOptsFirstEventOldest) - fabricOnChainLocation := &Location{ + location, err = encodeContractLocation(ctx, &Location{ Chaincode: chaincode, Channel: f.defaultChannel, - } - - normalized, _ := json.Marshal(fabricOnChainLocation) - location = fftypes.JSONAnyPtrBytes(normalized) - - return location, fromBlock, nil + }) + return location, fromBlock, err } diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 8958e2f0b6..903b87e20f 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -63,6 +63,7 @@ func newTestFabric() (*Fabric, func()) { prefixLong: defaultPrefixLong, wsconn: wsm, } + e.callbacks.handlers = make(map[string]blockchain.Callbacks) return e, func() { cancel() if e.closed != nil { @@ -219,6 +220,8 @@ func TestInitAllExistingStreams(t *testing.T) { httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin"}, })) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 1)) resetConf(e) utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") @@ -237,7 +240,45 @@ func TestInitAllExistingStreams(t *testing.T) { _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") assert.NoError(t, err) - assert.Equal(t, 2, httpmock.GetTotalCallCount()) + assert.Equal(t, 3, httpmock.GetTotalCallCount()) + assert.Equal(t, "es12345", e.streamID) +} + +func TestInitAllExistingStreamsOld(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{ + {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, + })) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 1)) + + resetConf(e) + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + utFabconnectConf.Set(FabconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "channel": "firefly", + "chaincode": "simplestorage", + }.String()) + + err := e.Init(e.ctx, utConfig, &metricsmocks.Manager{}) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.NoError(t, err) + + assert.Equal(t, 3, httpmock.GetTotalCallCount()) assert.Equal(t, "es12345", e.streamID) } @@ -272,6 +313,41 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { assert.Regexp(t, "pop", err) } +func TestAddFireflySubscriptionGetVersionError(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + resetConf(e) + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{ + {ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin"}, + })) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + httpmock.NewJsonResponderOrPanic(500, "pop")) + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + utFabconnectConf.Set(FabconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "channel": "firefly", + "chaincode": "simplestorage", + }.String()) + + err := e.Init(e.ctx, utConfig, &metricsmocks.Manager{}) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "newest") + assert.Regexp(t, "pop", err) +} + func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) { e, cancel := newTestFabric() defer cancel() @@ -286,6 +362,8 @@ func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) { httpmock.NewJsonResponderOrPanic(200, []subscription{ {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, })) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 1)) resetConf(e) utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") @@ -304,13 +382,48 @@ func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) { subID, err := e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") assert.NoError(t, err) - assert.Equal(t, 2, httpmock.GetTotalCallCount()) + assert.Equal(t, 3, httpmock.GetTotalCallCount()) assert.Equal(t, "es12345", e.streamID) err = e.RemoveFireflySubscription(e.ctx, subID) assert.NoError(t, err) } +func TestAddFireflySubscriptionInvalidSubName(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{ + {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, + })) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 2)) + + resetConf(e) + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + utFabconnectConf.Set(FabconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "channel": "firefly", + "chaincode": "simplestorage", + }.String()) + + err := e.Init(e.ctx, utConfig, &metricsmocks.Manager{}) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.Regexp(t, "FF10416", err) +} + func TestRemoveUnknownFireflySubscription(t *testing.T) { e, _ := newTestFabric() @@ -442,6 +555,46 @@ func TestSubQueryCreateError(t *testing.T) { } +func TestSubQueryCreate(t *testing.T) { + + e, cancel := newTestFabric() + defer cancel() + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, eventStream{ID: "es12345"})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, subscription{ID: "sb-123"})) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 1)) + + resetConf(e) + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(ffresty.HTTPConfigRetryEnabled, false) + utFabconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + utFabconnectConf.Set(FabconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "channel": "firefly", + "chaincode": "simplestorage", + }.String()) + + err := e.Init(e.ctx, utConfig, &metricsmocks.Manager{}) + assert.NoError(t, err) + _, err = e.AddFireflySubscription(e.ctx, "ns1", location, "oldest") + assert.NoError(t, err) + +} + func TestSubmitBatchPinOK(t *testing.T) { e, cancel := newTestFabric() @@ -733,10 +886,14 @@ func TestHandleMessageBatchPinOK(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeMSPIdentity, @@ -766,6 +923,110 @@ func TestHandleMessageBatchPinOK(t *testing.T) { } +func TestHandleMessageBatchPinV2(t *testing.T) { + payload := base64.StdEncoding.EncodeToString([]byte(fftypes.JSONObject{ + "signer": "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", + "timestamp": fftypes.JSONObject{"seconds": 1630031667, "nanos": 791499000}, + "namespace": "", + "uuids": "0xe19af8b390604051812d7597d19adfb9847d3bfd074249efb65d3fed15f5b0a6", + "batchHash": "0xd71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", + "payloadRef": "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", + "contexts": []string{"0x68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", "0x19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771"}, + }.String())) + data := []byte(` +[ + { + "chaincodeId": "firefly", + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "transactionIndex": 2, + "eventIndex": 50, + "eventName": "BatchPin", + "payload": "` + payload + `", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Fabric{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 2, + } + + expectedSigningKeyRef := &core.VerifierRef{ + Type: core.VerifierTypeMSPIdentity, + Value: "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", + } + + em.On("BatchPinComplete", mock.Anything, expectedSigningKeyRef).Return(nil) + + var events []interface{} + err := json.Unmarshal(data, &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + + b := em.Calls[0].Arguments[0].(*blockchain.BatchPin) + assert.Equal(t, "ns1", b.Namespace) + assert.Equal(t, "e19af8b3-9060-4051-812d-7597d19adfb9", b.TransactionID.String()) + assert.Equal(t, "847d3bfd-0742-49ef-b65d-3fed15f5b0a6", b.BatchID.String()) + assert.Equal(t, "d71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", b.BatchHash.String()) + assert.Equal(t, "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", b.BatchPayloadRef) + assert.Len(t, b.Contexts, 2) + assert.Equal(t, "68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", b.Contexts[0].String()) + assert.Equal(t, "19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771", b.Contexts[1].String()) + + em.AssertExpectations(t) + +} + +func TestHandleMessageBatchPinMissingChaincodeID(t *testing.T) { + payload := base64.StdEncoding.EncodeToString([]byte(fftypes.JSONObject{ + "signer": "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", + "timestamp": fftypes.JSONObject{"seconds": 1630031667, "nanos": 791499000}, + "namespace": "ns1", + "uuids": "0xe19af8b390604051812d7597d19adfb9847d3bfd074249efb65d3fed15f5b0a6", + "batchHash": "0xd71eb138d74c229a388eb0e1abc03f4c7cbb21d4fc4b839fbf0ec73e4263f6be", + "payloadRef": "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", + "contexts": []string{"0x68e4da79f805bca5b912bcda9c63d03e6e867108dabb9b944109aea541ef522a", "0x19b82093de5ce92a01e333048e877e2374354bf846dd034864ef6ffbd6438771"}, + }.String())) + data := []byte(` +[ + { + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "transactionIndex": 2, + "eventIndex": 50, + "eventName": "BatchPin", + "payload": "` + payload + `", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Fabric{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + + var events []interface{} + err := json.Unmarshal(data, &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.Regexp(t, "FF10310", err) + +} + func TestHandleMessageEmptyPayloadRef(t *testing.T) { data := []byte(` [ @@ -781,10 +1042,14 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeMSPIdentity, @@ -829,10 +1094,14 @@ func TestHandleMessageBatchPinExit(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeMSPIdentity, @@ -863,10 +1132,14 @@ func TestHandleMessageBatchPinEmpty(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" var events []interface{} err := json.Unmarshal(data, &events) @@ -890,10 +1163,14 @@ func TestHandleMessageUnknownEventName(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" var events []interface{} err := json.Unmarshal(data, &events) @@ -906,10 +1183,14 @@ func TestHandleMessageUnknownEventName(t *testing.T) { func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" data := []byte(`[{ "chaincodeId": "firefly", @@ -930,10 +1211,15 @@ func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { func TestHandleMessageBatchPinBadPin(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + data := []byte(`[{ "chaincodeId": "firefly", "blockNumber": 91, @@ -953,10 +1239,14 @@ func TestHandleMessageBatchPinBadPin(t *testing.T) { func TestHandleMessageBatchPinBadPayloadEncoding(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" data := []byte(`[{ "chaincodeId": "firefly", @@ -977,10 +1267,15 @@ func TestHandleMessageBatchPinBadPayloadEncoding(t *testing.T) { func TestHandleMessageBatchPinBadPayloadUUIDs(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + data := []byte(`[{ "chaincodeId": "firefly", "blockNumber": 91, @@ -1000,7 +1295,7 @@ func TestHandleMessageBatchPinBadPayloadUUIDs(t *testing.T) { func TestHandleMessageBatchBadJSON(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } err := e.handleMessageBatch(context.Background(), []interface{}{10, 20}) assert.NoError(t, err) @@ -1071,7 +1366,7 @@ func TestEventLoopUnexpectedMessage(t *testing.T) { "requestPayload": "{\"from\":\"0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635\",\"gas\":0,\"gasPrice\":0,\"headers\":{\"id\":\"6fb94fff-81d3-4094-567d-e031b1871694\",\"type\":\"SendTransaction\"},\"method\":{\"inputs\":[{\"internalType\":\"bytes32\",\"name\":\"txnId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"batchId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"payloadRef\",\"type\":\"bytes32\"}],\"name\":\"broadcastBatch\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},\"params\":[\"12345\",\"!\",\"!\"],\"to\":\"0xd3266a857285fb75eb7df37353b4a15c8bb828f5\",\"value\":0}" }`) em := &blockchainmocks.Callbacks{} - e.SetHandler(em) + e.SetHandler("ns1", em) txsu := em.On("BlockchainOpUpdate", e, "ns1:"+operationID.String(), @@ -1097,7 +1392,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, wsconn: wsm, } @@ -1137,7 +1432,7 @@ func TestHandleReceiptNoRequestID(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, wsconn: wsm, } @@ -1154,7 +1449,7 @@ func TestHandleReceiptFailedTx(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, wsconn: wsm, } @@ -1346,10 +1641,15 @@ func TestHandleMessageContractEvent(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { assert.Equal(t, "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", e.BlockchainTXID) assert.Equal(t, "000000000010/000020/000030", e.Event.ProtocolID) @@ -1405,10 +1705,14 @@ func TestHandleMessageContractEventNoPayload(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" var events []interface{} err := json.Unmarshal(data, &events) @@ -1432,10 +1736,14 @@ func TestHandleMessageContractEventBadPayload(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-cb37cc07-e873-4f58-44ab-55add6bba320"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-cb37cc07-e873-4f58-44ab-55add6bba320"] = "ns1" var events []interface{} err := json.Unmarshal(data, &events) @@ -1461,10 +1769,14 @@ func TestHandleMessageContractEventError(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = "ns1" em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop")) @@ -1836,18 +2148,66 @@ func TestHandleNetworkAction(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + + expectedSigningKeyRef := &core.VerifierRef{ + Type: core.VerifierTypeMSPIdentity, + Value: "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", + } + + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(nil) + + var events []interface{} + err := json.Unmarshal(data, &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + + em.AssertExpectations(t) + +} + +func TestHandleNetworkActionV2(t *testing.T) { + data := []byte(` +[ + { + "chaincodeId": "firefly", + "blockNumber": 91, + "transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2", + "transactionIndex": 2, + "eventIndex": 50, + "eventName": "BatchPin", + "payload": "eyJzaWduZXIiOiJ1MHZnd3U5czAwLXg1MDk6OkNOPXVzZXIyLE9VPWNsaWVudDo6Q049ZmFicmljLWNhLXNlcnZlciIsInRpbWVzdGFtcCI6eyJzZWNvbmRzIjoxNjMwMDMxNjY3LCJuYW5vcyI6NzkxNDk5MDAwfSwibmFtZXNwYWNlIjoiZmlyZWZseTp0ZXJtaW5hdGUiLCJ1dWlkcyI6IjB4MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMCIsImJhdGNoSGFzaCI6IjB4MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMCIsInBheWxvYWRSZWYiOiIiLCJjb250ZXh0cyI6W119", + "subId": "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" + } +]`) + + em := &blockchainmocks.Callbacks{} + e := &Fabric{ + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 2, + } expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeMSPIdentity, Value: "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", } - em.On("BlockchainNetworkAction", "terminate", mock.Anything, expectedSigningKeyRef).Return(nil) + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(nil) var events []interface{} err := json.Unmarshal(data, &events) @@ -1876,17 +2236,21 @@ func TestHandleNetworkActionFail(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + } + e.subs = map[string]subscriptionInfo{} + e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } - e.subs = map[string]string{} - e.subs["sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e"] = "ns1" expectedSigningKeyRef := &core.VerifierRef{ Type: core.VerifierTypeMSPIdentity, Value: "u0vgwu9s00-x509::CN=user2,OU=client::CN=fabric-ca-server", } - em.On("BlockchainNetworkAction", "terminate", mock.Anything, expectedSigningKeyRef).Return(fmt.Errorf("pop")) + em.On("BlockchainNetworkAction", "terminate", mock.AnythingOfType("*fftypes.JSONAny"), mock.AnythingOfType("*blockchain.Event"), expectedSigningKeyRef).Return(fmt.Errorf("pop")) var events []interface{} err := json.Unmarshal(data, &events) @@ -2018,6 +2382,18 @@ func TestConvertDeprecatedContractConfigNoChaincode(t *testing.T) { assert.Regexp(t, "F10138", err) } +func TestConvertDeprecatedContractConfigNoChannel(t *testing.T) { + e, _ := newTestFabric() + resetConf(e) + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "Firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + + e.defaultChannel = "" + _, _, err := e.GetAndConvertDeprecatedContractConfig(e.ctx) + assert.Regexp(t, "FF10310", err) +} + func TestSubmitNetworkAction(t *testing.T) { e, cancel := newTestFabric() @@ -2073,3 +2449,11 @@ func TestSubmitNetworkActionBadLocation(t *testing.T) { err := e.SubmitNetworkAction(context.Background(), "", signer, core.NetworkActionTerminate, location) assert.Regexp(t, "FF10310", err) } + +func TestCallbacksWrongNamespace(t *testing.T) { + e, _ := newTestFabric() + nsOpID := "ns1:" + fftypes.NewUUID().String() + e.callbacks.BlockchainOpUpdate(context.Background(), e, nsOpID, core.OpStatusSucceeded, "tx123", "", nil) + e.callbacks.BatchPinComplete(context.Background(), &blockchain.BatchPin{Namespace: "ns1"}, nil) + e.callbacks.BlockchainNetworkAction(context.Background(), "ns1", "terminate", nil, nil, nil) +} diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index 313846a16a..8da0bcfe23 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -255,4 +255,5 @@ var ( MsgDefinitionRejected = ffe("FF10413", "Definition rejected") MsgActionNotSupported = ffe("FF10414", "This action is not supported in this namespace", 400) MsgMessagesNotSupported = ffe("FF10415", "Messages are not supported in this namespace", 400) + MsgInvalidSubscriptionForNetwork = ffe("FF10416", "Subscription name '%s' is invalid according to multiparty network rules in effect (network version=%d)") ) diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index d7fa66e55c..cae7efb1ad 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -67,7 +67,7 @@ type EventManager interface { // Bound blockchain callbacks BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error BlockchainEvent(event *blockchain.EventWithSubscription) error - BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error + BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error // Bound dataexchange callbacks DXEvent(dx dataexchange.Plugin, event dataexchange.DXEvent) diff --git a/internal/events/network_action.go b/internal/events/network_action.go index 35f826f5bd..dee23aa3b5 100644 --- a/internal/events/network_action.go +++ b/internal/events/network_action.go @@ -19,18 +19,18 @@ package events import ( "context" + "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/log" - "github.com/hyperledger/firefly/internal/multiparty" "github.com/hyperledger/firefly/pkg/blockchain" "github.com/hyperledger/firefly/pkg/core" ) -func (em *eventManager) actionTerminate(mm multiparty.Manager, event *blockchain.Event) error { +func (em *eventManager) actionTerminate(location *fftypes.JSONAny, event *blockchain.Event) error { namespace, err := em.database.GetNamespace(em.ctx, em.namespace) if err != nil { return err } - if err := mm.TerminateContract(em.ctx, &namespace.Contracts, event); err != nil { + if err := em.multiparty.TerminateContract(em.ctx, &namespace.Contracts, location, event); err != nil { return err } // Currently, a termination event is implied to apply to ALL namespaces @@ -42,7 +42,7 @@ func (em *eventManager) actionTerminate(mm multiparty.Manager, event *blockchain }) } -func (em *eventManager) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { +func (em *eventManager) BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { if em.multiparty == nil { log.L(em.ctx).Errorf("Ignoring network action from non-multiparty network!") return nil @@ -64,7 +64,7 @@ func (em *eventManager) BlockchainNetworkAction(action string, event *blockchain } if action == core.NetworkActionTerminate.String() { - err = em.actionTerminate(em.multiparty, event) + err = em.actionTerminate(location, event) } else { log.L(em.ctx).Errorf("Ignoring unrecognized network action: %s", action) return false, nil diff --git a/internal/events/network_action_test.go b/internal/events/network_action_test.go index 671eba4462..dd78a90c51 100644 --- a/internal/events/network_action_test.go +++ b/internal/events/network_action_test.go @@ -35,6 +35,7 @@ func TestNetworkAction(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() + location := fftypes.JSONAnyPtr("{}") event := &blockchain.Event{ProtocolID: "0001"} verifier := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, @@ -54,9 +55,9 @@ func TestNetworkAction(t *testing.T) { mdi.On("InsertEvent", em.ctx, mock.Anything).Return(nil) mdi.On("GetNamespace", em.ctx, "ns1").Return(&core.Namespace{}, nil) mdi.On("UpsertNamespace", em.ctx, mock.AnythingOfType("*core.Namespace"), true).Return(nil) - mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), mock.AnythingOfType("*blockchain.Event")).Return(nil) + mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), location, mock.AnythingOfType("*blockchain.Event")).Return(nil) - err := em.BlockchainNetworkAction("terminate", event, verifier) + err := em.BlockchainNetworkAction("terminate", location, event, verifier) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -68,6 +69,7 @@ func TestNetworkActionUnknownIdentity(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() + location := fftypes.JSONAnyPtr("{}") verifier := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x1234", @@ -78,7 +80,7 @@ func TestNetworkActionUnknownIdentity(t *testing.T) { mii.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeOrg}, verifier).Return(nil, fmt.Errorf("pop")).Once() mii.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeOrg}, verifier).Return(nil, nil).Once() - err := em.BlockchainNetworkAction("terminate", &blockchain.Event{}, verifier) + err := em.BlockchainNetworkAction("terminate", location, &blockchain.Event{}, verifier) assert.NoError(t, err) mii.AssertExpectations(t) @@ -88,6 +90,7 @@ func TestNetworkActionNonRootIdentity(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() + location := fftypes.JSONAnyPtr("{}") verifier := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x1234", @@ -101,7 +104,7 @@ func TestNetworkActionNonRootIdentity(t *testing.T) { }, }, nil) - err := em.BlockchainNetworkAction("terminate", &blockchain.Event{}, verifier) + err := em.BlockchainNetworkAction("terminate", location, &blockchain.Event{}, verifier) assert.NoError(t, err) mii.AssertExpectations(t) @@ -112,12 +115,13 @@ func TestNetworkActionNonMultiparty(t *testing.T) { defer cancel() em.multiparty = nil + location := fftypes.JSONAnyPtr("{}") verifier := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x1234", } - err := em.BlockchainNetworkAction("terminate", &blockchain.Event{}, verifier) + err := em.BlockchainNetworkAction("terminate", location, &blockchain.Event{}, verifier) assert.NoError(t, err) } @@ -125,6 +129,7 @@ func TestNetworkActionUnknown(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() + location := fftypes.JSONAnyPtr("{}") verifier := &core.VerifierRef{ Type: core.VerifierTypeEthAddress, Value: "0x1234", @@ -134,7 +139,7 @@ func TestNetworkActionUnknown(t *testing.T) { mii.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeOrg}, verifier).Return(&core.Identity{}, nil) - err := em.BlockchainNetworkAction("bad", &blockchain.Event{}, verifier) + err := em.BlockchainNetworkAction("bad", location, &blockchain.Event{}, verifier) assert.NoError(t, err) mii.AssertExpectations(t) @@ -144,15 +149,15 @@ func TestActionTerminateQueryFail(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() - mmp := &multipartymocks.Manager{} + location := fftypes.JSONAnyPtr("{}") + mdi := em.database.(*databasemocks.Plugin) mdi.On("GetNamespace", em.ctx, "ns1").Return(nil, fmt.Errorf("pop")) - err := em.actionTerminate(mmp, &blockchain.Event{}) + err := em.actionTerminate(location, &blockchain.Event{}) assert.EqualError(t, err, "pop") - mmp.AssertExpectations(t) mdi.AssertExpectations(t) } @@ -160,13 +165,15 @@ func TestActionTerminateFail(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() - mmp := &multipartymocks.Manager{} + location := fftypes.JSONAnyPtr("{}") + + mmp := em.multiparty.(*multipartymocks.Manager) mdi := em.database.(*databasemocks.Plugin) mdi.On("GetNamespace", em.ctx, "ns1").Return(&core.Namespace{}, nil) - mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), mock.AnythingOfType("*blockchain.Event")).Return(fmt.Errorf("pop")) + mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), location, mock.AnythingOfType("*blockchain.Event")).Return(fmt.Errorf("pop")) - err := em.actionTerminate(mmp, &blockchain.Event{}) + err := em.actionTerminate(location, &blockchain.Event{}) assert.EqualError(t, err, "pop") mmp.AssertExpectations(t) @@ -177,14 +184,16 @@ func TestActionTerminateUpsertFail(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() - mmp := &multipartymocks.Manager{} + location := fftypes.JSONAnyPtr("{}") + + mmp := em.multiparty.(*multipartymocks.Manager) mdi := em.database.(*databasemocks.Plugin) mdi.On("GetNamespace", em.ctx, "ns1").Return(&core.Namespace{}, nil) mdi.On("UpsertNamespace", em.ctx, mock.AnythingOfType("*core.Namespace"), true).Return(fmt.Errorf("pop")) - mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), mock.AnythingOfType("*blockchain.Event")).Return(nil) + mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.FireFlyContracts"), location, mock.AnythingOfType("*blockchain.Event")).Return(nil) - err := em.actionTerminate(mmp, &blockchain.Event{}) + err := em.actionTerminate(location, &blockchain.Event{}) assert.EqualError(t, err, "pop") mmp.AssertExpectations(t) diff --git a/internal/multiparty/manager.go b/internal/multiparty/manager.go index c05200a558..ccb473b2d4 100644 --- a/internal/multiparty/manager.go +++ b/internal/multiparty/manager.go @@ -47,7 +47,7 @@ type Manager interface { // - Validates that the event came from the currently active FireFly contract // - Re-initializes the plugin against the next configured FireFly contract // - Updates the provided contract info to record the point of termination and the newly active contract - TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, termination *blockchain.Event) (err error) + TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, location *fftypes.JSONAny, termination *blockchain.Event) (err error) // GetNetworkVersion returns the network version of the active FireFly contract GetNetworkVersion() int @@ -131,6 +131,7 @@ func (mm *multipartyManager) ConfigureContract(ctx context.Context, contracts *c if err != nil { return err } + version, err := mm.blockchain.GetNetworkVersion(ctx, location) if err != nil { return err @@ -170,9 +171,13 @@ func (mm *multipartyManager) resolveFireFlyContract(ctx context.Context, contrac return location, firstEvent, err } -func (mm *multipartyManager) TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, termination *blockchain.Event) (err error) { +func (mm *multipartyManager) TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, location *fftypes.JSONAny, termination *blockchain.Event) (err error) { // TODO: Investigate if it better to consolidate DB termination here - log.L(ctx).Infof("Processing termination of contract at index %d", contracts.Active.Index) + if contracts.Active.Location.String() != location.String() { + log.L(ctx).Warnf("Ignoring termination event from contract at '%s', which does not match active '%s'", location, contracts.Active.Location) + return nil + } + log.L(ctx).Infof("Processing termination of contract #%d at '%s'", contracts.Active.Index, contracts.Active.Location) contracts.Active.FinalEvent = termination.ProtocolID contracts.Terminated = append(contracts.Terminated, contracts.Active) contracts.Active = core.FireFlyContractInfo{Index: contracts.Active.Index + 1} @@ -188,12 +193,7 @@ func (mm *multipartyManager) GetNetworkVersion() int { } func (mm *multipartyManager) SubmitNetworkAction(ctx context.Context, signingKey string, action *core.NetworkAction) error { - if action.Type == core.NetworkActionTerminate { - if mm.namespace != core.LegacySystemNamespace { - // For now, "terminate" only works on ff_system - return i18n.NewError(ctx, coremsgs.MsgTerminateNotSupported, mm.namespace) - } - } else { + if action.Type != core.NetworkActionTerminate { return i18n.NewError(ctx, coremsgs.MsgUnrecognizedNetworkAction, action.Type) } diff --git a/internal/multiparty/manager_test.go b/internal/multiparty/manager_test.go index db23501775..3535d892a8 100644 --- a/internal/multiparty/manager_test.go +++ b/internal/multiparty/manager_test.go @@ -105,6 +105,8 @@ func TestConfigureContract(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts @@ -129,6 +131,8 @@ func TestConfigureContractOldestBlock(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts @@ -155,6 +159,8 @@ func TestConfigureContractNewestBlock(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts @@ -171,6 +177,7 @@ func TestConfigureContractNewestBlock(t *testing.T) { func TestResolveContractDeprecatedConfig(t *testing.T) { mp := newTestMultipartyManager() + defer mp.cleanup(t) mp.mbi.On("GetAndConvertDeprecatedContractConfig", context.Background()).Return(fftypes.JSONAnyPtr(fftypes.JSONObject{ "address": "0x123", @@ -188,6 +195,7 @@ func TestResolveContractDeprecatedConfig(t *testing.T) { func TestResolveContractDeprecatedConfigError(t *testing.T) { mp := newTestMultipartyManager() + defer mp.cleanup(t) mp.mbi.On("GetAndConvertDeprecatedContractConfig", context.Background()).Return(nil, "", fmt.Errorf("pop")) @@ -197,6 +205,7 @@ func TestResolveContractDeprecatedConfigError(t *testing.T) { func TestResolveContractDeprecatedConfigNewestBlock(t *testing.T) { mp := newTestMultipartyManager() + defer mp.cleanup(t) mp.mbi.On("GetAndConvertDeprecatedContractConfig", context.Background()).Return(fftypes.JSONAnyPtr(fftypes.JSONObject{ "address": "0x123", @@ -218,8 +227,8 @@ func TestConfigureContractBadIndex(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() - mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) - mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) + defer mp.cleanup(t) + mp.multipartyManager.config.Contracts = contracts cf := &core.FireFlyContracts{ @@ -242,8 +251,9 @@ func TestConfigureContractNetworkVersionFail(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(0, fmt.Errorf("pop")) - mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts cf := &core.FireFlyContracts{ @@ -267,6 +277,8 @@ func TestSubmitNetworkAction(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.multipartyManager.config.Contracts = contracts mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) @@ -313,6 +325,8 @@ func TestSubmitNetworkActionTXFail(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.multipartyManager.config.Contracts = contracts mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) @@ -347,6 +361,8 @@ func TestSubmitNetworkActionOpFail(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.multipartyManager.config.Contracts = contracts mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) @@ -383,6 +399,8 @@ func TestSubmitNetworkActionBadType(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts @@ -401,34 +419,6 @@ func TestSubmitNetworkActionBadType(t *testing.T) { mp.mbi.AssertExpectations(t) } -func TestSubmitNetworkActionBadNS(t *testing.T) { - contracts := make([]Contract, 1) - location := fftypes.JSONAnyPtr(fftypes.JSONObject{ - "address": "0x123", - }.String()) - contract := Contract{ - FirstEvent: "0", - Location: location, - } - - contracts[0] = contract - mp := newTestMultipartyManager() - mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) - mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) - mp.multipartyManager.config.Contracts = contracts - - cf := &core.FireFlyContracts{ - Active: core.FireFlyContractInfo{Index: 0}, - } - - err := mp.ConfigureContract(context.Background(), cf) - assert.NoError(t, err) - err = mp.SubmitNetworkAction(context.Background(), "0x123", &core.NetworkAction{Type: core.NetworkActionTerminate}) - assert.Regexp(t, "FF10399", err) - - mp.mbi.AssertExpectations(t) -} - func TestSubmitBatchPinOk(t *testing.T) { mp := newTestMultipartyManager() defer mp.cleanup(t) @@ -541,6 +531,8 @@ func TestGetNetworkVersion(t *testing.T) { contracts[0] = contract mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.multipartyManager.config.Contracts = contracts @@ -573,6 +565,8 @@ func TestConfgureAndTerminateContract(t *testing.T) { contracts[0] = contract contracts[1] = contract2 mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.mbi.On("RemoveFireflySubscription", mock.Anything, mock.Anything).Return(nil) @@ -584,19 +578,38 @@ func TestConfgureAndTerminateContract(t *testing.T) { err := mp.ConfigureContract(context.Background(), cf) assert.NoError(t, err) - err = mp.TerminateContract(context.Background(), cf, &blockchain.Event{}) + err = mp.TerminateContract(context.Background(), cf, location, &blockchain.Event{}) assert.NoError(t, err) } func TestTerminateContractError(t *testing.T) { mp := newTestMultipartyManager() - mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) + defer mp.cleanup(t) + mp.mbi.On("RemoveFireflySubscription", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x123", + }.String()) cf := &core.FireFlyContracts{ - Active: core.FireFlyContractInfo{Index: 0}, + Active: core.FireFlyContractInfo{Index: 0, Location: location}, } - err := mp.TerminateContract(context.Background(), cf, &blockchain.Event{}) + err := mp.TerminateContract(context.Background(), cf, location, &blockchain.Event{}) assert.Regexp(t, "pop", err) } + +func TestTerminateContractWrongAddress(t *testing.T) { + mp := newTestMultipartyManager() + defer mp.cleanup(t) + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x123", + }.String()) + cf := &core.FireFlyContracts{ + Active: core.FireFlyContractInfo{Index: 0, Location: location}, + } + + err := mp.TerminateContract(context.Background(), cf, fftypes.JSONAnyPtr("{}"), &blockchain.Event{}) + assert.NoError(t, err) +} diff --git a/internal/multiparty/operations.go b/internal/multiparty/operations.go index 5c569225ef..e80bd0ecca 100644 --- a/internal/multiparty/operations.go +++ b/internal/multiparty/operations.go @@ -108,8 +108,15 @@ func (mm *multipartyManager) RunOperation(ctx context.Context, op *core.Prepared switch data := op.Data.(type) { case batchPinData: batch := data.Batch + + // Only include namespace for V1 networks + var namespace string + if mm.activeContract.networkVersion == 1 { + namespace = batch.Namespace + } + return nil, false, mm.blockchain.SubmitBatchPin(ctx, op.NamespacedIDString(), batch.Key, &blockchain.BatchPin{ - Namespace: batch.Namespace, + Namespace: namespace, TransactionID: batch.TX.ID, BatchID: batch.ID, BatchHash: batch.Hash, diff --git a/internal/multiparty/operations_test.go b/internal/multiparty/operations_test.go index b2a5393856..406bec72ee 100644 --- a/internal/multiparty/operations_test.go +++ b/internal/multiparty/operations_test.go @@ -175,6 +175,38 @@ func TestPrepareOperationBatchPinNotFound(t *testing.T) { assert.Regexp(t, "FF10109", err) } +func TestRunBatchPinV1(t *testing.T) { + mp := newTestMultipartyManager() + defer mp.cleanup(t) + mp.activeContract.networkVersion = 1 + + op := &core.Operation{ + Type: core.OpTypeBlockchainPinBatch, + ID: fftypes.NewUUID(), + Namespace: "ns1", + } + batch := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: fftypes.NewUUID(), + SignerRef: core.SignerRef{ + Key: "0x123", + }, + }, + } + contexts := []*fftypes.Bytes32{ + fftypes.NewRandB32(), + fftypes.NewRandB32(), + } + addBatchPinInputs(op, batch.ID, contexts, "payload1") + + mp.mbi.On("SubmitBatchPin", context.Background(), "ns1:"+op.ID.String(), "0x123", mock.Anything, mock.Anything).Return(nil) + + _, complete, err := mp.RunOperation(context.Background(), opBatchPin(op, batch, contexts, "payload1")) + + assert.False(t, complete) + assert.NoError(t, err) +} + func TestOperationUpdate(t *testing.T) { mp := newTestMultipartyManager() defer mp.cleanup(t) diff --git a/internal/namespace/manager_test.go b/internal/namespace/manager_test.go index 1ca9242126..037d540d7f 100644 --- a/internal/namespace/manager_test.go +++ b/internal/namespace/manager_test.go @@ -262,7 +262,7 @@ func TestInitOrchestratorFail(t *testing.T) { nm.mdi.On("SetHandler", "default", mock.Anything).Return() nm.mdi.On("GetIdentities", mock.Anything, "default", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) - nm.mbi.On("SetHandler", mock.Anything).Return() + nm.mbi.On("SetHandler", "default", mock.Anything).Return() nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("SetHandler", "default", mock.Anything).Return() diff --git a/internal/orchestrator/bound_callbacks.go b/internal/orchestrator/bound_callbacks.go index fdc9689464..291bdb87ed 100644 --- a/internal/orchestrator/bound_callbacks.go +++ b/internal/orchestrator/bound_callbacks.go @@ -58,8 +58,8 @@ func (bc *boundCallbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKe return bc.ei.BatchPinComplete(batch, signingKey) } -func (bc *boundCallbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - return bc.ei.BlockchainNetworkAction(action, event, signingKey) +func (bc *boundCallbacks) BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { + return bc.ei.BlockchainNetworkAction(action, location, event, signingKey) } func (bc *boundCallbacks) DXEvent(event dataexchange.DXEvent) { diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index 0ddf0a1edd..468b4d08c8 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -50,6 +50,7 @@ func TestBoundCallbacks(t *testing.T) { pool := &tokens.TokenPool{} transfer := &tokens.TokenTransfer{} approval := &tokens.TokenApproval{} + location := fftypes.JSONAnyPtr("{}") event := &blockchain.Event{} hash := fftypes.NewRandB32() opID := fftypes.NewUUID() @@ -58,8 +59,8 @@ func TestBoundCallbacks(t *testing.T) { err := bc.BatchPinComplete(batch, &core.VerifierRef{Value: "0x12345", Type: core.VerifierTypeEthAddress}) assert.EqualError(t, err, "pop") - mei.On("BlockchainNetworkAction", "terminate", event, &core.VerifierRef{Value: "0x12345", Type: core.VerifierTypeEthAddress}).Return(fmt.Errorf("pop")) - err = bc.BlockchainNetworkAction("terminate", event, &core.VerifierRef{Value: "0x12345", Type: core.VerifierTypeEthAddress}) + mei.On("BlockchainNetworkAction", "terminate", location, event, &core.VerifierRef{Value: "0x12345", Type: core.VerifierTypeEthAddress}).Return(fmt.Errorf("pop")) + err = bc.BlockchainNetworkAction("terminate", location, event, &core.VerifierRef{Value: "0x12345", Type: core.VerifierTypeEthAddress}) assert.EqualError(t, err, "pop") nsOpID := "ns1:" + opID.String() diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 35d47e703a..dc556a3b69 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -351,7 +351,7 @@ func (or *orchestrator) initPlugins(ctx context.Context) (err error) { or.plugins.Database.Plugin.SetHandler(or.namespace, or) if or.plugins.Blockchain.Plugin != nil { - or.plugins.Blockchain.Plugin.SetHandler(&or.bc) + or.plugins.Blockchain.Plugin.SetHandler(or.namespace, &or.bc) } if or.plugins.SharedStorage.Plugin != nil { diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 3851a6a2b3..2663698277 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -192,7 +192,7 @@ func TestInitOK(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) or.mdi.On("SetHandler", "ns", mock.Anything).Return() - or.mbi.On("SetHandler", mock.Anything).Return() + or.mbi.On("SetHandler", "ns", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return([]*core.Identity{{}}, nil, nil) or.mdx.On("SetHandler", "ns", mock.Anything).Return() or.mdx.On("SetNodes", mock.Anything).Return() @@ -218,7 +218,7 @@ func TestInitTokenListenerFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) or.mdi.On("SetHandler", "ns", mock.Anything).Return() - or.mbi.On("SetHandler", mock.Anything).Return() + or.mbi.On("SetHandler", "ns", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return([]*core.Identity{{}}, nil, nil) or.mdx.On("SetHandler", "ns", mock.Anything).Return() or.mdx.On("SetNodes", mock.Anything).Return() @@ -232,7 +232,7 @@ func TestInitDataexchangeNodesFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) or.mdi.On("SetHandler", "ns", mock.Anything).Return() - or.mbi.On("SetHandler", mock.Anything).Return() + or.mbi.On("SetHandler", "ns", mock.Anything).Return() or.mps.On("SetHandler", "ns", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) ctx := context.Background() diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 80686d0e66..ee389a7260 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -46,56 +46,62 @@ type callbacks struct { handlers map[string]tokens.Callbacks } -func (cb *callbacks) TokenOpUpdate(plugin tokens.Plugin, nsOpID string, txState core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { - for _, cb := range cb.handlers { - cb.TokenOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) +func (cb *callbacks) TokenOpUpdate(ctx context.Context, plugin tokens.Plugin, nsOpID string, txState core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { + namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID) + if handler, ok := cb.handlers[namespace]; ok { + handler.TokenOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) + } else { + log.L(ctx).Errorf("No handler found for token operation '%s'", nsOpID) } } -func (cb *callbacks) TokenPoolCreated(namespace string, plugin tokens.Plugin, pool *tokens.TokenPool) error { +func (cb *callbacks) TokenPoolCreated(ctx context.Context, namespace string, plugin tokens.Plugin, pool *tokens.TokenPool) error { if namespace == "" { - // Older token subscriptions don't populate namespace, so deliver the event to every listener + // Older token subscriptions don't populate namespace, so deliver the event to every handler for _, cb := range cb.handlers { if err := cb.TokenPoolCreated(plugin, pool); err != nil { return err } } } else { - if listener, ok := cb.handlers[namespace]; ok { - return listener.TokenPoolCreated(plugin, pool) + if handler, ok := cb.handlers[namespace]; ok { + return handler.TokenPoolCreated(plugin, pool) } + log.L(ctx).Errorf("No handler found for token pool event on namespace '%s'", namespace) } return nil } -func (cb *callbacks) TokensTransferred(namespace string, plugin tokens.Plugin, transfer *tokens.TokenTransfer) error { +func (cb *callbacks) TokensTransferred(ctx context.Context, namespace string, plugin tokens.Plugin, transfer *tokens.TokenTransfer) error { if namespace == "" { - // Older token subscriptions don't populate namespace, so deliver the event to every listener + // Older token subscriptions don't populate namespace, so deliver the event to every handler for _, cb := range cb.handlers { if err := cb.TokensTransferred(plugin, transfer); err != nil { return err } } } else { - if listener, ok := cb.handlers[namespace]; ok { - return listener.TokensTransferred(plugin, transfer) + if handler, ok := cb.handlers[namespace]; ok { + return handler.TokensTransferred(plugin, transfer) } + log.L(ctx).Errorf("No handler found for token transfer event on namespace '%s'", namespace) } return nil } -func (cb *callbacks) TokensApproved(namespace string, plugin tokens.Plugin, approval *tokens.TokenApproval) error { +func (cb *callbacks) TokensApproved(ctx context.Context, namespace string, plugin tokens.Plugin, approval *tokens.TokenApproval) error { if namespace == "" { - // Older token subscriptions don't populate namespace, so deliver the event to every listener + // Older token subscriptions don't populate namespace, so deliver the event to every handler for _, cb := range cb.handlers { if err := cb.TokensApproved(plugin, approval); err != nil { return err } } } else { - if listener, ok := cb.handlers[namespace]; ok { - return listener.TokensApproved(plugin, approval) + if handler, ok := cb.handlers[namespace]; ok { + return handler.TokensApproved(plugin, approval) } + log.L(ctx).Errorf("No handler found for token approval event on namespace '%s'", namespace) } return nil } @@ -264,7 +270,7 @@ func (ft *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject) replyType = core.OpStatusFailed } l.Infof("Tokens '%s' reply: request=%s message=%s", replyType, requestID, message) - ft.callbacks.TokenOpUpdate(ft, requestID, replyType, transactionHash, message, data) + ft.callbacks.TokenOpUpdate(ctx, ft, requestID, replyType, transactionHash, message, data) } func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSONObject) (err error) { @@ -337,7 +343,7 @@ func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSON } // If there's an error dispatching the event, we must return the error and shutdown - return ft.callbacks.TokenPoolCreated(namespace, ft, pool) + return ft.callbacks.TokenPoolCreated(ctx, namespace, ft, pool) } func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t core.TokenTransferType, data fftypes.JSONObject) (err error) { @@ -426,7 +432,7 @@ func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t core.TokenTransfe } // If there's an error dispatching the event, we must return the error and shutdown - return ft.callbacks.TokensTransferred(namespace, ft, transfer) + return ft.callbacks.TokensTransferred(ctx, namespace, ft, transfer) } func (ft *FFTokens) handleTokenApproval(ctx context.Context, data fftypes.JSONObject) (err error) { @@ -501,7 +507,7 @@ func (ft *FFTokens) handleTokenApproval(ctx context.Context, data fftypes.JSONOb }, } - return ft.callbacks.TokensApproved(namespace, ft, approval) + return ft.callbacks.TokensApproved(ctx, namespace, ft, approval) } func (ft *FFTokens) handleMessage(ctx context.Context, msgBytes []byte) (err error) { diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index 583f6f8fba..31b6b12905 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -1311,3 +1311,13 @@ func TestEventLoopClosedContext(t *testing.T) { wsm.On("Receive").Return((<-chan []byte)(r)) h.eventLoop() // we're simply looking for it exiting } + +func TestCallbacksWrongNamespace(t *testing.T) { + h, _, _, _, done := newTestFFTokens(t) + defer done() + nsOpID := "ns1:" + fftypes.NewUUID().String() + h.callbacks.TokenOpUpdate(context.Background(), h, nsOpID, core.OpStatusSucceeded, "tx123", "", nil) + h.callbacks.TokenPoolCreated(context.Background(), "ns1", h, nil) + h.callbacks.TokensTransferred(context.Background(), "ns1", h, nil) + h.callbacks.TokensApproved(context.Background(), "ns1", h, nil) +} diff --git a/mocks/blockchainmocks/callbacks.go b/mocks/blockchainmocks/callbacks.go index 3230242117..75d48a403d 100644 --- a/mocks/blockchainmocks/callbacks.go +++ b/mocks/blockchainmocks/callbacks.go @@ -44,13 +44,13 @@ func (_m *Callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) er return r0 } -// BlockchainNetworkAction provides a mock function with given fields: action, event, signingKey -func (_m *Callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - ret := _m.Called(action, event, signingKey) +// BlockchainNetworkAction provides a mock function with given fields: action, location, event, signingKey +func (_m *Callbacks) BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { + ret := _m.Called(action, location, event, signingKey) var r0 error - if rf, ok := ret.Get(0).(func(string, *blockchain.Event, *core.VerifierRef) error); ok { - r0 = rf(action, event, signingKey) + if rf, ok := ret.Get(0).(func(string, *fftypes.JSONAny, *blockchain.Event, *core.VerifierRef) error); ok { + r0 = rf(action, location, event, signingKey) } else { r0 = ret.Error(0) } diff --git a/mocks/blockchainmocks/plugin.go b/mocks/blockchainmocks/plugin.go index e90c538821..ca4c4ae705 100644 --- a/mocks/blockchainmocks/plugin.go +++ b/mocks/blockchainmocks/plugin.go @@ -326,9 +326,9 @@ func (_m *Plugin) RemoveFireflySubscription(ctx context.Context, subID string) e return r0 } -// SetHandler provides a mock function with given fields: handler -func (_m *Plugin) SetHandler(handler blockchain.Callbacks) { - _m.Called(handler) +// SetHandler provides a mock function with given fields: namespace, handler +func (_m *Plugin) SetHandler(namespace string, handler blockchain.Callbacks) { + _m.Called(namespace, handler) } // Start provides a mock function with given fields: diff --git a/mocks/eventmocks/event_manager.go b/mocks/eventmocks/event_manager.go index da4e2f52bb..05706a61e7 100644 --- a/mocks/eventmocks/event_manager.go +++ b/mocks/eventmocks/event_manager.go @@ -69,13 +69,13 @@ func (_m *EventManager) BlockchainEvent(event *blockchain.EventWithSubscription) return r0 } -// BlockchainNetworkAction provides a mock function with given fields: action, event, signingKey -func (_m *EventManager) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - ret := _m.Called(action, event, signingKey) +// BlockchainNetworkAction provides a mock function with given fields: action, location, event, signingKey +func (_m *EventManager) BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { + ret := _m.Called(action, location, event, signingKey) var r0 error - if rf, ok := ret.Get(0).(func(string, *blockchain.Event, *core.VerifierRef) error); ok { - r0 = rf(action, event, signingKey) + if rf, ok := ret.Get(0).(func(string, *fftypes.JSONAny, *blockchain.Event, *core.VerifierRef) error); ok { + r0 = rf(action, location, event, signingKey) } else { r0 = ret.Error(0) } diff --git a/mocks/multipartymocks/manager.go b/mocks/multipartymocks/manager.go index 23ba3ac3ee..91ecd050a6 100644 --- a/mocks/multipartymocks/manager.go +++ b/mocks/multipartymocks/manager.go @@ -158,13 +158,13 @@ func (_m *Manager) SubmitNetworkAction(ctx context.Context, signingKey string, a return r0 } -// TerminateContract provides a mock function with given fields: ctx, contracts, termination -func (_m *Manager) TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, termination *blockchain.Event) error { - ret := _m.Called(ctx, contracts, termination) +// TerminateContract provides a mock function with given fields: ctx, contracts, location, termination +func (_m *Manager) TerminateContract(ctx context.Context, contracts *core.FireFlyContracts, location *fftypes.JSONAny, termination *blockchain.Event) error { + ret := _m.Called(ctx, contracts, location, termination) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *core.FireFlyContracts, *blockchain.Event) error); ok { - r0 = rf(ctx, contracts, termination) + if rf, ok := ret.Get(0).(func(context.Context, *core.FireFlyContracts, *fftypes.JSONAny, *blockchain.Event) error); ok { + r0 = rf(ctx, contracts, location, termination) } else { r0 = ret.Error(0) } diff --git a/pkg/blockchain/plugin.go b/pkg/blockchain/plugin.go index 90580c45f2..46e8858214 100644 --- a/pkg/blockchain/plugin.go +++ b/pkg/blockchain/plugin.go @@ -36,7 +36,8 @@ type Plugin interface { Init(ctx context.Context, config config.Section, metrics metrics.Manager) error // SetHandler registers a handler to receive callbacks - SetHandler(handler Callbacks) + // If namespace is set, plugin will attempt to deliver only events for that namespace + SetHandler(namespace string, handler Callbacks) // Blockchain interface must not deliver any events until start is called Start() error @@ -89,7 +90,7 @@ type Plugin interface { GetAndConvertDeprecatedContractConfig(ctx context.Context) (location *fftypes.JSONAny, fromBlock string, err error) // AddFireflySubscription creates a FireFly BatchPin subscription for the provided location - AddFireflySubscription(ctx context.Context, namespace string, location *fftypes.JSONAny, firstEvent string) (string, error) + AddFireflySubscription(ctx context.Context, namespace string, location *fftypes.JSONAny, firstEvent string) (subID string, err error) // RemoveFireFlySubscription removes the provided FireFly subscription RemoveFireflySubscription(ctx context.Context, subID string) error @@ -119,7 +120,7 @@ type Callbacks interface { // BlockchainNetworkAction notifies on the arrival of a network operator action // // Error should only be returned in shutdown scenarios - BlockchainNetworkAction(action string, event *Event, signingKey *core.VerifierRef) error + BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *Event, signingKey *core.VerifierRef) error // BlockchainEvent notifies on the arrival of any event from a user-created subscription. BlockchainEvent(event *EventWithSubscription) error