diff --git a/docs/reference/config.md b/docs/reference/config.md index 693d296b00..8cfa915a9e 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -280,6 +280,13 @@ nav_order: 2 |size|The maximum number of messages that can be packed into a batch|`int`|`` |timeout|The timeout to wait for a batch to fill, before sending|[`time.Duration`](https://pkg.go.dev/time#Duration)|`` +## cache.blockchain + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|size|Size of blockchain cache|`string`|`` +|ttl|Time to live for blockchain cache items|`string`|`` + ## cors |Key|Description|Type|Default Value| diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index f212c52a77..637e846f2d 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -106,13 +106,20 @@ func (cb *callbacks) BlockchainNetworkAction(ctx context.Context, namespace, act return nil } -func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { - for _, cb := range cb.handlers { - // Send the event to all handlers and let them match it to a contract listener - // TODO: can we push more listener/namespace knowledge down to this layer? - if err := cb.BlockchainEvent(event); err != nil { - return err +func (cb *callbacks) BlockchainEvent(ctx context.Context, namespace string, event *blockchain.EventWithSubscription) error { + if namespace == "" { + // Older token subscriptions don't populate namespace, so deliver the event to every handler + for _, cb := range cb.handlers { + // Send the event to all handlers and let them match it to a contract listener + if err := cb.BlockchainEvent(event); err != nil { + return err + } } + } else { + if handler, ok := cb.handlers[namespace]; ok { + return handler.BlockchainEvent(event) + } + log.L(ctx).Errorf("No handler found for blockchain event on namespace '%s'", namespace) } return nil } @@ -207,7 +214,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr return err } - e.streams = &streamManager{client: e.client} + e.streams = newStreamManager(e.client) batchSize := ethconnectConf.GetUint(EthconnectConfigBatchSize) batchTimeout := uint(ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds()) stream, err := e.streams.ensureEventStream(e.ctx, e.topic, batchSize, batchTimeout) @@ -434,9 +441,15 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, location *fftypes.JS } func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { + subName, err := e.streams.getSubscriptionName(ctx, msgJSON.GetString("subId")) + if err != nil { + return err + } + + namespace := e.streams.getNamespaceFromSubName(subName) event := e.parseBlockchainEvent(ctx, msgJSON) if event != nil { - err = e.callbacks.BlockchainEvent(&blockchain.EventWithSubscription{ + err = e.callbacks.BlockchainEvent(ctx, namespace, &blockchain.EventWithSubscription{ Event: *event, Subscription: msgJSON.GetString("subId"), }) @@ -773,7 +786,7 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid) } - subName := fmt.Sprintf("ff-sub-%s", listener.ID) + subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID) firstEvent := string(core.SubOptsFirstEventNewest) if listener.Options != nil { firstEvent = listener.Options.FirstEvent diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index ff491170c2..778f0d6230 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -1753,7 +1753,7 @@ func TestDeleteSubscriptionFail(t *testing.T) { assert.Regexp(t, "FF10111", err) } -func TestHandleMessageContractEvent(t *testing.T) { +func TestHandleMessageContractEventOldSubscription(t *testing.T) { data := fftypes.JSONAnyPtr(` [ { @@ -1773,14 +1773,23 @@ func TestHandleMessageContractEvent(t *testing.T) { ]`) em := &blockchainmocks.Callbacks{} - e := &Ethereum{ - callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, - } + e, cancel := newTestEthereum() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sub2", Stream: "es12345", Name: "ff-sub-1132312312312", + })) + + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} e.subs = map[string]subscriptionInfo{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ namespace: "ns1", version: 1, } + e.streams = newStreamManager(e.client) em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { assert.Equal(t, "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", e.BlockchainTXID) @@ -1819,7 +1828,7 @@ func TestHandleMessageContractEvent(t *testing.T) { em.AssertExpectations(t) } -func TestHandleMessageContractEventError(t *testing.T) { +func TestHandleMessageContractEventErrorOldSubscription(t *testing.T) { data := fftypes.JSONAnyPtr(` [ { @@ -1839,14 +1848,257 @@ func TestHandleMessageContractEventError(t *testing.T) { ]`) em := &blockchainmocks.Callbacks{} - e := &Ethereum{ - callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + e, cancel := newTestEthereum() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sub2", Stream: "es12345", Name: "ff-sub-1132312312312", + })) + + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, } + e.streams = newStreamManager(e.client) + em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop")) + + var events []interface{} + err := json.Unmarshal(data.Bytes(), &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.EqualError(t, err, "pop") + + em.AssertExpectations(t) +} + +func TestHandleMessageContractEventWithNamespace(t *testing.T) { + data := fftypes.JSONAnyPtr(` +[ + { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "data": { + "from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "value": "1" + }, + "subId": "sub2", + "signature": "Changed(address,uint256)", + "logIndex": "50", + "timestamp": "1640811383" + }, + { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72638", + "data": { + "from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "value": "1" + }, + "subId": "sub2", + "signature": "Changed(address,uint256)", + "logIndex": "50", + "timestamp": "1640811384" + } +]`) + + em := &blockchainmocks.Callbacks{} + e, cancel := newTestEthereum() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sub2", Stream: "es12345", Name: "ff-sub-ns1-1132312312312", + })) + + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + e.streams = newStreamManager(e.client) + + em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { + assert.Equal(t, "000000038011/000000/000050", e.Event.ProtocolID) + return true + })).Return(nil) + + var events []interface{} + err := json.Unmarshal(data.Bytes(), &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + + ev := em.Calls[0].Arguments[0].(*blockchain.EventWithSubscription) + assert.Equal(t, "sub2", ev.Subscription) + assert.Equal(t, "Changed", ev.Event.Name) + + outputs := fftypes.JSONObject{ + "from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "value": "1", + } + assert.Equal(t, outputs, ev.Event.Output) + + info := fftypes.JSONObject{ + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "logIndex": "50", + "signature": "Changed(address,uint256)", + "subId": "sub2", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "transactionIndex": "0x0", + "timestamp": "1640811383", + } + assert.Equal(t, info, ev.Event.Info) + + em.AssertExpectations(t) +} + +func TestHandleMessageContractEventNoNamespaceHandlers(t *testing.T) { + data := fftypes.JSONAnyPtr(` +[ + { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "data": { + "from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "value": "1" + }, + "subId": "sub2", + "signature": "Changed(address,uint256)", + "logIndex": "50", + "timestamp": "1640811383" + } +]`) + + em := &blockchainmocks.Callbacks{} + e, cancel := newTestEthereum() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sub2", Stream: "es12345", Name: "ff-sub-ns1-1132312312312", + })) + + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns2": em}} + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + e.streams = newStreamManager(e.client) + + em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { + assert.Equal(t, "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", e.BlockchainTXID) + assert.Equal(t, "000000038011/000000/000050", e.Event.ProtocolID) + return true + })).Return(nil) + + var events []interface{} + err := json.Unmarshal(data.Bytes(), &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + assert.Equal(t, 0, len(em.Calls)) +} + +func TestHandleMessageContractEventSubNameError(t *testing.T) { + data := fftypes.JSONAnyPtr(` +[ + { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "data": { + "from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "value": "1" + }, + "subId": "sub2", + "signature": "Changed(address,uint256)", + "logIndex": "50", + "timestamp": "1640811383" + } +]`) + em := &blockchainmocks.Callbacks{} + + e, cancel := newTestEthereum() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2", + httpmock.NewJsonResponderOrPanic(500, ethError{Error: "pop"})) + + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + version: 1, + } + e.streams = newStreamManager(e.client) + + var events []interface{} + err := json.Unmarshal(data.Bytes(), &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.Regexp(t, "FF10111", err) + + em.AssertExpectations(t) +} + +func TestHandleMessageContractEventError(t *testing.T) { + data := fftypes.JSONAnyPtr(` +[ + { + "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", + "blockNumber": "38011", + "transactionIndex": "0x0", + "transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", + "data": { + "from": "0x91D2B4381A4CD5C7C0F27565A7D4B829844C8635", + "value": "1" + }, + "subId": "sub2", + "signature": "Changed(address,uint256)", + "logIndex": "50", + "timestamp": "1640811383" + } +]`) + + em := &blockchainmocks.Callbacks{} + + e, cancel := newTestEthereum() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sub2", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sub2", Stream: "es12345", Name: "ff-sub-ns1-1132312312312", + })) + + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} e.subs = map[string]subscriptionInfo{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ namespace: "ns1", version: 1, } + e.streams = newStreamManager(e.client) em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop")) diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index a88ca159e7..da10034b56 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -21,17 +21,24 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "strings" + "time" "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/ffresty" "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly-signer/pkg/abi" + "github.com/hyperledger/firefly/internal/coreconfig" "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/pkg/core" + "github.com/karlseguin/ccache" ) type streamManager struct { - client *resty.Client + client *resty.Client + subscriptionNameCache *ccache.Cache + subscriptionNameCacheTTL time.Duration } type eventStream struct { @@ -54,6 +61,18 @@ type subscription struct { Event *abi.Entry `json:"event"` } +func newStreamManager(client *resty.Client) *streamManager { + manager := &streamManager{ + client: client, + subscriptionNameCacheTTL: config.GetDuration(coreconfig.CacheBlockchainTTL), + } + manager.subscriptionNameCache = ccache.New( + ccache.Configure(). + MaxSize(config.GetByteSize(coreconfig.CacheBlockchainSize)), + ) + return manager +} + func (s *streamManager) getEventStreams(ctx context.Context) (streams []*eventStream, err error) { res, err := s.client.R(). SetContext(ctx). @@ -135,6 +154,32 @@ func (s *streamManager) getSubscriptions(ctx context.Context) (subs []*subscript return subs, nil } +func (s *streamManager) getSubscription(ctx context.Context, subID string) (sub *subscription, err error) { + res, err := s.client.R(). + SetContext(ctx). + SetResult(&sub). + Get(fmt.Sprintf("/subscriptions/%s", subID)) + if err != nil || !res.IsSuccess() { + return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgEthconnectRESTErr) + } + return sub, nil +} + +func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (string, error) { + cached := s.subscriptionNameCache.Get(subID) + if cached != nil { + cached.Extend(s.subscriptionNameCacheTTL) + return cached.Value().(string), nil + } + + sub, err := s.getSubscription(ctx, subID) + if err != nil { + return "", err + } + s.subscriptionNameCache.Set(subID, sub.Name, s.subscriptionNameCacheTTL) + return sub.Name, nil +} + func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, fromBlock string, abi *abi.Entry) (*subscription, error) { // Map FireFly "firstEvent" values to Ethereum "fromBlock" values switch fromBlock { @@ -215,3 +260,13 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID) return sub, subNS, nil } + +func (s *streamManager) getNamespaceFromSubName(subName string) string { + var parts = strings.Split(subName, "-") + // Subscription names post version 1.1 are in the format `ff-sub--` + if len(parts) != 4 { + // Assume older subscription and return empty string + return "" + } + return parts[2] +} diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index 2109f38632..8a2a6b2c92 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -19,17 +19,24 @@ package fabric import ( "context" "fmt" + "strings" + "time" "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/ffresty" "github.com/hyperledger/firefly-common/pkg/log" + "github.com/hyperledger/firefly/internal/coreconfig" "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/pkg/core" + "github.com/karlseguin/ccache" ) type streamManager struct { - client *resty.Client - signer string + client *resty.Client + signer string + subscriptionNameCache *ccache.Cache + subscriptionNameCacheTTL time.Duration } type eventStream struct { @@ -58,6 +65,19 @@ type eventFilter struct { EventFilter string `json:"eventFilter"` } +func newStreamManager(client *resty.Client, signer string) *streamManager { + manager := &streamManager{ + client: client, + signer: signer, + subscriptionNameCacheTTL: config.GetDuration(coreconfig.CacheBlockchainTTL), + } + manager.subscriptionNameCache = ccache.New( + ccache.Configure(). + MaxSize(config.GetByteSize(coreconfig.CacheBlockchainSize)), + ) + return manager +} + func (s *streamManager) getEventStreams(ctx context.Context) (streams []*eventStream, err error) { res, err := s.client.R(). SetContext(ctx). @@ -114,6 +134,32 @@ func (s *streamManager) getSubscriptions(ctx context.Context) (subs []*subscript return subs, nil } +func (s *streamManager) getSubscription(ctx context.Context, subID string) (sub *subscription, err error) { + res, err := s.client.R(). + SetContext(ctx). + SetResult(&sub). + Get(fmt.Sprintf("/subscriptions/%s", subID)) + if err != nil || !res.IsSuccess() { + return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgFabconnectRESTErr) + } + return sub, nil +} + +func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (string, error) { + cached := s.subscriptionNameCache.Get(subID) + if cached != nil { + cached.Extend(s.subscriptionNameCacheTTL) + return cached.Value().(string), nil + } + + sub, err := s.getSubscription(ctx, subID) + if err != nil { + return "", err + } + s.subscriptionNameCache.Set(subID, sub.Name, s.subscriptionNameCacheTTL) + return sub.Name, nil +} + func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, fromBlock string) (*subscription, error) { // Map FireFly "firstEvent" values to Fabric "fromBlock" values if fromBlock == string(core.SubOptsFirstEventOldest) { @@ -182,3 +228,13 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace log.L(ctx).Infof("%s subscription: %s", event, sub.ID) return sub, subNS, nil } + +func (s *streamManager) getNamespaceFromSubName(subName string) string { + var parts = strings.Split(subName, "-") + // Subscription names post version 1.1 are in the format `ff-sub--` + if len(parts) != 4 { + // Assume older subscription and return empty string + return "" + } + return parts[2] +} diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index a8d2625ff7..59b7eb272d 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -106,13 +106,20 @@ func (cb *callbacks) BlockchainNetworkAction(ctx context.Context, namespace, act return nil } -func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { - for _, cb := range cb.handlers { - // Send the event to all handlers and let them match it to a contract listener - // TODO: can we push more listener/namespace knowledge down to this layer? - if err := cb.BlockchainEvent(event); err != nil { - return err +func (cb *callbacks) BlockchainEvent(ctx context.Context, namespace string, event *blockchain.EventWithSubscription) error { + if namespace == "" { + // Older token subscriptions don't populate namespace, so deliver the event to every handler + for _, cb := range cb.handlers { + // Send the event to all handlers and let them match it to a contract listener + if err := cb.BlockchainEvent(event); err != nil { + return err + } + } + } else { + if handler, ok := cb.handlers[namespace]; ok { + return handler.BlockchainEvent(event) } + log.L(ctx).Errorf("No handler found for blockchain event on namespace '%s'", namespace) } return nil } @@ -241,7 +248,7 @@ func (f *Fabric) Init(ctx context.Context, config config.Section, metrics metric return err } - f.streams = &streamManager{client: f.client, signer: f.signer} + f.streams = newStreamManager(f.client, f.signer) batchSize := f.fabconnectConf.GetUint(FabconnectConfigBatchSize) batchTimeout := uint(f.fabconnectConf.GetDuration(FabconnectConfigBatchTimeout).Milliseconds()) stream, err := f.streams.ensureEventStream(f.ctx, f.topic, batchSize, batchTimeout) @@ -418,11 +425,16 @@ func (f *Fabric) buildEventLocationString(chaincode string) string { } func (f *Fabric) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { + subName, err := f.streams.getSubscriptionName(ctx, msgJSON.GetString("subId")) + if err != nil { + return err + } + namespace := f.streams.getNamespaceFromSubName(subName) event := f.parseBlockchainEvent(ctx, msgJSON) if event == nil { return nil // move on } - return f.callbacks.BlockchainEvent(&blockchain.EventWithSubscription{ + return f.callbacks.BlockchainEvent(ctx, namespace, &blockchain.EventWithSubscription{ Event: *event, Subscription: msgJSON.GetString("subId"), }) @@ -852,7 +864,9 @@ func (f *Fabric) AddContractListener(ctx context.Context, listener *core.Contrac if err != nil { return err } - result, err := f.streams.createSubscription(ctx, location, f.streamID, "", listener.Event.Name, listener.Options.FirstEvent) + + subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID) + result, err := f.streams.createSubscription(ctx, location, f.streamID, subName, listener.Event.Name, listener.Options.FirstEvent) if err != nil { return err } diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index c099f50e75..5e5e55bfdb 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -1624,7 +1624,7 @@ func TestDeleteSubscriptionFail(t *testing.T) { assert.Regexp(t, "FF10284.*pop", err) } -func TestHandleMessageContractEvent(t *testing.T) { +func TestHandleMessageContractEventOldSubscription(t *testing.T) { data := []byte(` [ { @@ -1640,9 +1640,18 @@ func TestHandleMessageContractEvent(t *testing.T) { ]`) em := &blockchainmocks.Callbacks{} - e := &Fabric{ - callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, - } + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sb-cb37cc07-e873-4f58-44ab-55add6bba320", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sb-cb37cc07-e873-4f58-44ab-55add6bba320", Stream: "es12345", Name: "old-sub-name", + })) + + e.streams = newStreamManager(e.client, e.signer) + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} e.subs = map[string]subscriptionInfo{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ namespace: "ns1", @@ -1689,7 +1698,7 @@ func TestHandleMessageContractEvent(t *testing.T) { em.AssertExpectations(t) } -func TestHandleMessageContractEventNoPayload(t *testing.T) { +func TestHandleMessageContractEventNamespacedHandlers(t *testing.T) { data := []byte(` [ { @@ -1699,14 +1708,156 @@ func TestHandleMessageContractEventNoPayload(t *testing.T) { "transactionIndex": 20, "eventIndex": 30, "eventName": "AssetCreated", + "payload": "eyJBcHByYWlzZWRWYWx1ZSI6MTAsIkNvbG9yIjoicmVkIiwiSUQiOiIxMjM0IiwiT3duZXIiOiJtZSIsIlNpemUiOjN9", + "subId": "sb-cb37cc07-e873-4f58-44ab-55add6bba320" + }, + { + "chaincodeId": "basic", + "blockNumber": 10, + "transactionId": "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746f", + "transactionIndex": 20, + "eventIndex": 30, + "eventName": "AssetCreated", + "payload": "eyJBcHByYWlzZWRWYWx1ZSI6MTAsIkNvbG9yIjoicmVkIiwiSUQiOiIxMjM0IiwiT3duZXIiOiJtZSIsIlNpemUiOjN9", "subId": "sb-cb37cc07-e873-4f58-44ab-55add6bba320" } ]`) em := &blockchainmocks.Callbacks{} - e := &Fabric{ - callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sb-cb37cc07-e873-4f58-44ab-55add6bba320", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sb-cb37cc07-e873-4f58-44ab-55add6bba320", Stream: "es12345", Name: "ff-sub-ns1-11232312312", + })) + + e.streams = newStreamManager(e.client, e.signer) + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + + em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { + assert.Equal(t, "000000000010/000020/000030", e.Event.ProtocolID) + return true + })).Return(nil) + + var events []interface{} + err := json.Unmarshal(data, &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + + ev := em.Calls[0].Arguments[0].(*blockchain.EventWithSubscription) + assert.Equal(t, "sb-cb37cc07-e873-4f58-44ab-55add6bba320", ev.Subscription) + assert.Equal(t, "AssetCreated", ev.Event.Name) + + outputs := fftypes.JSONObject{ + "AppraisedValue": float64(10), + "Color": "red", + "ID": "1234", + "Owner": "me", + "Size": float64(3), + } + assert.Equal(t, outputs, ev.Event.Output) + + info := fftypes.JSONObject{ + "blockNumber": float64(10), + "chaincodeId": "basic", + "eventName": "AssetCreated", + "subId": "sb-cb37cc07-e873-4f58-44ab-55add6bba320", + "transactionId": "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", + "transactionIndex": float64(20), + "eventIndex": float64(30), + } + assert.Equal(t, info, ev.Event.Info) + + em.AssertExpectations(t) +} + +func TestHandleMessageContractEventNoNamespacedHandlers(t *testing.T) { + data := []byte(` +[ + { + "chaincodeId": "basic", + "blockNumber": 10, + "transactionId": "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", + "transactionIndex": 20, + "eventIndex": 30, + "eventName": "AssetCreated", + "payload": "eyJBcHByYWlzZWRWYWx1ZSI6MTAsIkNvbG9yIjoicmVkIiwiSUQiOiIxMjM0IiwiT3duZXIiOiJtZSIsIlNpemUiOjN9", + "subId": "sb-cb37cc07-e873-4f58-44ab-55add6bba320" } +]`) + + em := &blockchainmocks.Callbacks{} + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sb-cb37cc07-e873-4f58-44ab-55add6bba320", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sb-cb37cc07-e873-4f58-44ab-55add6bba320", Stream: "es12345", Name: "ff-sub-ns1-11232312312", + })) + + e.streams = newStreamManager(e.client, e.signer) + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns2": em}} + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + + em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool { + assert.Equal(t, "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", e.BlockchainTXID) + assert.Equal(t, "000000000010/000020/000030", e.Event.ProtocolID) + return true + })).Return(nil) + + var events []interface{} + err := json.Unmarshal(data, &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.NoError(t, err) + assert.Equal(t, 0, len(em.Calls)) +} + +func TestHandleMessageContractEventNoPayload(t *testing.T) { + data := []byte(` +[ + { + "chaincodeId": "basic", + "blockNumber": 10, + "transactionId": "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", + "transactionIndex": 20, + "eventIndex": 30, + "eventName": "AssetCreated", + "subId": "sb-cb37cc07-e873-4f58-44ab-55add6bba320" + } +]`) + + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sb-cb37cc07-e873-4f58-44ab-55add6bba320", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sb-cb37cc07-e873-4f58-44ab-55add6bba320", Stream: "es12345", Name: "ff-sub-ns1-11232312312", + })) + + em := &blockchainmocks.Callbacks{} + e.streams = newStreamManager(e.client, e.signer) + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} + e.subs = map[string]subscriptionInfo{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ namespace: "ns1", @@ -1754,7 +1905,7 @@ func TestHandleMessageContractEventBadPayload(t *testing.T) { em.AssertExpectations(t) } -func TestHandleMessageContractEventError(t *testing.T) { +func TestHandleMessageContractOldSubError(t *testing.T) { data := []byte(` [ { @@ -1767,10 +1918,63 @@ func TestHandleMessageContractEventError(t *testing.T) { } ]`) + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sb-cb37cc07-e873-4f58-44ab-55add6bba320", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sb-cb37cc07-e873-4f58-44ab-55add6bba320", Stream: "es12345", Name: "oldsubname", + })) + em := &blockchainmocks.Callbacks{} - e := &Fabric{ - callbacks: callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}}, + e.streams = newStreamManager(e.client, e.signer) + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, } + + em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop")) + + var events []interface{} + err := json.Unmarshal(data, &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.EqualError(t, err, "pop") + + em.AssertExpectations(t) +} + +func TestHandleMessageContractEventError(t *testing.T) { + data := []byte(` +[ + { + "chaincodeId": "basic", + "blockNumber": 10, + "transactionId": "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", + "eventName": "AssetCreated", + "payload": "eyJBcHByYWlzZWRWYWx1ZSI6MTAsIkNvbG9yIjoicmVkIiwiSUQiOiIxMjM0IiwiT3duZXIiOiJtZSIsIlNpemUiOjN9", + "subId": "sb-cb37cc07-e873-4f58-44ab-55add6bba320" + } +]`) + + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sb-cb37cc07-e873-4f58-44ab-55add6bba320", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sb-cb37cc07-e873-4f58-44ab-55add6bba320", Stream: "es12345", Name: "ff-sub-ns1-11232312312", + })) + + em := &blockchainmocks.Callbacks{} + e.streams = newStreamManager(e.client, e.signer) + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} e.subs = map[string]subscriptionInfo{} e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ namespace: "ns1", @@ -1789,6 +1993,46 @@ func TestHandleMessageContractEventError(t *testing.T) { em.AssertExpectations(t) } +func TestHandleMessageContractGetSubError(t *testing.T) { + data := []byte(` +[ + { + "chaincodeId": "basic", + "blockNumber": 10, + "transactionId": "4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d", + "eventName": "AssetCreated", + "payload": "eyJBcHByYWlzZWRWYWx1ZSI6MTAsIkNvbG9yIjoicmVkIiwiSUQiOiIxMjM0IiwiT3duZXIiOiJtZSIsIlNpemUiOjN9", + "subId": "sb-cb37cc07-e873-4f58-44ab-55add6bba320" + } +]`) + + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/sb-cb37cc07-e873-4f58-44ab-55add6bba320", + httpmock.NewJsonResponderOrPanic(500, fabError{Error: "pop"})) + + em := &blockchainmocks.Callbacks{} + e.streams = newStreamManager(e.client, e.signer) + e.callbacks = callbacks{handlers: map[string]blockchain.Callbacks{"ns1": em}} + e.subs = map[string]subscriptionInfo{} + e.subs["sb-b5b97a4e-a317-4053-6400-1474650efcb5"] = subscriptionInfo{ + namespace: "ns1", + channel: "firefly", + version: 1, + } + + var events []interface{} + err := json.Unmarshal(data, &events) + assert.NoError(t, err) + err = e.handleMessageBatch(context.Background(), events) + assert.Regexp(t, "FF10284", err) + + em.AssertExpectations(t) +} + func TestInvokeContractOK(t *testing.T) { e, cancel := newTestFabric() defer cancel() diff --git a/internal/coreconfig/coreconfig.go b/internal/coreconfig/coreconfig.go index 7b30281103..2b32d30e2a 100644 --- a/internal/coreconfig/coreconfig.go +++ b/internal/coreconfig/coreconfig.go @@ -112,6 +112,10 @@ var ( BroadcastBatchPayloadLimit = ffc("broadcast.batch.payloadLimit") // BroadcastBatchTimeout is the timeout to wait for a batch to fill, before sending BroadcastBatchTimeout = ffc("broadcast.batch.timeout") + // CacheBlockchainTTL size of cache for blockchain plugin caches + CacheBlockchainTTL = ffc("cache.blockchain.ttl") + // CacheBlockchainTTL time to live of cache for blockchain plugin caches + CacheBlockchainSize = ffc("cache.blockchain.size") // DownloadWorkerCount is the number of download workers created to pull data from shared storage to the local DX DownloadWorkerCount = ffc("download.worker.count") // DownloadWorkerQueueLength is the length of the work queue in the channel to the workers - defaults to 2x the worker count @@ -316,6 +320,8 @@ func setDefaults() { viper.SetDefault(string(BroadcastBatchSize), 200) viper.SetDefault(string(BroadcastBatchPayloadLimit), "800Kb") viper.SetDefault(string(BroadcastBatchTimeout), "1s") + viper.SetDefault(string(CacheBlockchainSize), "50Mb") + viper.SetDefault(string(CacheBlockchainTTL), "5m") viper.SetDefault(string(HistogramsMaxChartRows), 100) viper.SetDefault(string(DebugPort), -1) viper.SetDefault(string(DownloadWorkerCount), 10) diff --git a/internal/coremsgs/en_config_descriptions.go b/internal/coremsgs/en_config_descriptions.go index 49b7c04349..8dc95b788c 100644 --- a/internal/coremsgs/en_config_descriptions.go +++ b/internal/coremsgs/en_config_descriptions.go @@ -92,6 +92,9 @@ var ( ConfigBlockchainFabricFabconnectURL = ffc("config.blockchain.fabric.fabconnect.url", "The URL of the Fabconnect instance", "URL "+i18n.StringType) ConfigBlockchainFabricFabconnectProxyURL = ffc("config.blockchain.fabric.fabconnect.proxy.url", "Optional HTTP proxy server to use when connecting to Fabconnect", "URL "+i18n.StringType) + ConfigCacheBlockchainTTL = ffc("config.cache.blockchain.ttl", "Time to live for blockchain cache items", i18n.StringType) + ConfigCacheBlockchainSize = ffc("config.cache.blockchain.size", "Size of blockchain cache", i18n.StringType) + ConfigPluginDatabase = ffc("config.plugins.database", "The list of configured Database plugins", i18n.StringType) ConfigPluginDatabaseName = ffc("config.plugins.database[].name", "The name of the Database plugin", i18n.StringType) ConfigPluginDatabaseType = ffc("config.plugins.database[].type", "The type of the configured Database plugin", i18n.StringType)