diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 84cc7ff876..ced0be73b1 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -18,10 +18,8 @@ package ethereum import ( "context" - "crypto/sha256" "encoding/hex" "encoding/json" - "fmt" "regexp" "strings" @@ -57,28 +55,10 @@ type Ethereum struct { closed chan struct{} } -type eventStream struct { - ID string `json:"id"` - Name string `json:"name"` - ErrorHandling string `json:"errorHandling"` - BatchSize uint `json:"batchSize"` - BatchTimeoutMS uint `json:"batchTimeoutMS"` - Type string `json:"type"` - WebSocket eventStreamWebsocket `json:"websocket"` -} - type eventStreamWebsocket struct { Topic string `json:"topic"` } -type subscription struct { - ID string `json:"id"` - Description string `json:"description"` - Name string `json:"name"` - Stream string `json:"stream"` - FromBlock string `json:"fromBlock"` -} - type asyncTXSubmission struct { ID string `json:"id"` } @@ -96,8 +76,8 @@ type ethWSCommandPayload struct { Topic string `json:"topic,omitempty"` } -var requiredSubscriptions = map[string]string{ - "BatchPin": "Batch pin", +var requiredSubscriptions = []string{ + "BatchPin", } var addressVerify = regexp.MustCompile("^[0-9a-f]{40}$") @@ -144,7 +124,18 @@ func (e *Ethereum) Init(ctx context.Context, prefix config.Prefix, callbacks blo return err } - if err = e.ensureEventStreams(ethconnectConf); err != nil { + streams := streamManager{ + ctx: e.ctx, + client: e.client, + instancePath: e.instancePath, + } + batchSize := ethconnectConf.GetUint(EthconnectConfigBatchSize) + batchTimeout := uint(ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds()) + if e.initInfo.stream, err = streams.ensureEventStream(e.topic, batchSize, batchTimeout); err != nil { + return err + } + log.L(e.ctx).Infof("Event stream: %s", e.initInfo.stream.ID) + if e.initInfo.subs, err = streams.ensureSubscriptions(e.initInfo.stream.ID, requiredSubscriptions); err != nil { return err } @@ -162,41 +153,6 @@ func (e *Ethereum) Capabilities() *blockchain.Capabilities { return e.capabilities } -func (e *Ethereum) ensureEventStreams(ethconnectConf config.Prefix) error { - - var existingStreams []*eventStream - res, err := e.client.R().SetContext(e.ctx).SetResult(&existingStreams).Get("/eventstreams") - if err != nil || !res.IsSuccess() { - return restclient.WrapRestErr(e.ctx, res, err, i18n.MsgEthconnectRESTErr) - } - - for _, stream := range existingStreams { - if stream.WebSocket.Topic == e.topic { - e.initInfo.stream = stream - } - } - - if e.initInfo.stream == nil { - newStream := eventStream{ - Name: e.topic, - ErrorHandling: "block", - BatchSize: ethconnectConf.GetUint(EthconnectConfigBatchSize), - BatchTimeoutMS: uint(ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds()), - Type: "websocket", - } - newStream.WebSocket.Topic = e.topic - res, err = e.client.R().SetBody(&newStream).SetResult(&newStream).Post("/eventstreams") - if err != nil || !res.IsSuccess() { - return restclient.WrapRestErr(e.ctx, res, err, i18n.MsgEthconnectRESTErr) - } - e.initInfo.stream = &newStream - } - - log.L(e.ctx).Infof("Event stream: %s", e.initInfo.stream.ID) - - return e.ensureSubscriptions() -} - func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error { // Send a subscribe to our topic after each connect/reconnect b, _ := json.Marshal(ðWSCommandPayload{ @@ -213,57 +169,6 @@ func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error return err } -func (e *Ethereum) ensureSubscriptions() error { - // Include a hash of the instance path in the subscription, so if we ever point at a different - // contract configuration, we re-subscribe from block 0. - // We don't need full strength hashing, so just use the first 16 chars for readability. - instanceUniqueHash := hex.EncodeToString(sha256.New().Sum([]byte(e.instancePath)))[0:16] - - for eventType, subDesc := range requiredSubscriptions { - - var existingSubs []*subscription - res, err := e.client.R().SetResult(&existingSubs).Get("/subscriptions") - if err != nil || !res.IsSuccess() { - return restclient.WrapRestErr(e.ctx, res, err, i18n.MsgEthconnectRESTErr) - } - - var sub *subscription - subName := fmt.Sprintf("%s_%s", eventType, instanceUniqueHash) - for _, s := range existingSubs { - if s.Name == subName || - /* Check for the plain name we used to use originally, before adding uniqueness qualifier. - If one of these very early environments needed a new subscription, the existing one would need to - be deleted manually. */ - s.Name == eventType { - sub = s - } - } - - if sub == nil { - newSub := subscription{ - Name: subName, - Description: subDesc, - Stream: e.initInfo.stream.ID, - FromBlock: "0", - } - res, err = e.client.R(). - SetContext(e.ctx). - SetBody(&newSub). - SetResult(&newSub). - Post(fmt.Sprintf("%s/%s", e.instancePath, eventType)) - if err != nil || !res.IsSuccess() { - return restclient.WrapRestErr(e.ctx, res, err, i18n.MsgEthconnectRESTErr) - } - sub = &newSub - } - - log.L(e.ctx).Infof("%s subscription: %s", eventType, sub.ID) - e.initInfo.subs = append(e.initInfo.subs, sub) - - } - return nil -} - func ethHexFormatB32(b *fftypes.Bytes32) string { if b == nil { return "0x0000000000000000000000000000000000000000000000000000000000000000" diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go new file mode 100644 index 0000000000..c824db0980 --- /dev/null +++ b/internal/blockchain/ethereum/eventstream.go @@ -0,0 +1,160 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ethereum + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + + "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly/internal/i18n" + "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/restclient" +) + +type streamManager struct { + ctx context.Context + client *resty.Client + instancePath string +} + +type eventStream struct { + ID string `json:"id"` + Name string `json:"name"` + ErrorHandling string `json:"errorHandling"` + BatchSize uint `json:"batchSize"` + BatchTimeoutMS uint `json:"batchTimeoutMS"` + Type string `json:"type"` + WebSocket eventStreamWebsocket `json:"websocket"` +} + +type subscription struct { + ID string `json:"id"` + Name string `json:"name"` + Stream string `json:"stream"` + FromBlock string `json:"fromBlock"` +} + +func (s *streamManager) getEventStreams() (streams []*eventStream, err error) { + res, err := s.client.R(). + SetContext(s.ctx). + SetResult(&streams). + Get("/eventstreams") + if err != nil || !res.IsSuccess() { + return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgEthconnectRESTErr) + } + return streams, nil +} + +func (s *streamManager) createEventStream(topic string, batchSize, batchTimeout uint) (*eventStream, error) { + stream := eventStream{ + Name: topic, + ErrorHandling: "block", + BatchSize: batchSize, + BatchTimeoutMS: batchTimeout, + Type: "websocket", + WebSocket: eventStreamWebsocket{Topic: topic}, + } + res, err := s.client.R(). + SetContext(s.ctx). + SetBody(&stream). + SetResult(&stream). + Post("/eventstreams") + if err != nil || !res.IsSuccess() { + return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgEthconnectRESTErr) + } + return &stream, nil +} + +func (s *streamManager) ensureEventStream(topic string, batchSize, batchTimeout uint) (*eventStream, error) { + existingStreams, err := s.getEventStreams() + if err != nil { + return nil, err + } + for _, stream := range existingStreams { + if stream.WebSocket.Topic == topic { + return stream, nil + } + } + return s.createEventStream(topic, batchSize, batchTimeout) +} + +func (s *streamManager) getSubscriptions() (subs []*subscription, err error) { + res, err := s.client.R(). + SetContext(s.ctx). + SetResult(&subs). + Get("/subscriptions") + if err != nil || !res.IsSuccess() { + return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgEthconnectRESTErr) + } + return subs, nil +} + +func (s *streamManager) createSubscription(name, stream, event string) (*subscription, error) { + sub := subscription{ + Name: name, + Stream: stream, + FromBlock: "0", + } + res, err := s.client.R(). + SetContext(s.ctx). + SetBody(&sub). + SetResult(&sub). + Post(fmt.Sprintf("%s/%s", s.instancePath, event)) + if err != nil || !res.IsSuccess() { + return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgEthconnectRESTErr) + } + return &sub, nil +} + +func (s *streamManager) ensureSubscriptions(stream string, subscriptions []string) (subs []*subscription, err error) { + // Include a hash of the instance path in the subscription, so if we ever point at a different + // contract configuration, we re-subscribe from block 0. + // We don't need full strength hashing, so just use the first 16 chars for readability. + instanceUniqueHash := hex.EncodeToString(sha256.New().Sum([]byte(s.instancePath)))[0:16] + + existingSubs, err := s.getSubscriptions() + if err != nil { + return nil, err + } + + for _, eventType := range subscriptions { + var sub *subscription + subName := fmt.Sprintf("%s_%s", eventType, instanceUniqueHash) + for _, s := range existingSubs { + if s.Name == subName || + /* Check for the plain name we used to use originally, before adding uniqueness qualifier. + If one of these very early environments needed a new subscription, the existing one would need to + be deleted manually. */ + s.Name == eventType { + sub = s + } + } + + if sub == nil { + if sub, err = s.createSubscription(subName, stream, eventType); err != nil { + return nil, err + } + } + + log.L(s.ctx).Infof("%s subscription: %s", eventType, sub.ID) + subs = append(subs, sub) + } + return subs, nil +} diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go new file mode 100644 index 0000000000..f17fe08cf4 --- /dev/null +++ b/internal/blockchain/fabric/eventstream.go @@ -0,0 +1,158 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fabric + +import ( + "context" + + "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly/internal/i18n" + "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/restclient" +) + +type streamManager struct { + ctx context.Context + client *resty.Client + defaultChannel string + chaincode string + signer string +} + +type eventStream struct { + ID string `json:"id"` + Name string `json:"name"` + ErrorHandling string `json:"errorHandling"` + BatchSize uint `json:"batchSize"` + BatchTimeoutMS uint `json:"batchTimeoutMS"` + Type string `json:"type"` + WebSocket eventStreamWebsocket `json:"websocket"` +} + +type subscription struct { + ID string `json:"id"` + Name string `json:"name"` + Channel string `json:"channel"` + Signer string `json:"signer"` + Stream string `json:"stream"` + FromBlock string `json:"fromBlock"` + Filter eventFilter `json:"filter"` +} + +func (s *streamManager) getEventStreams() (streams []*eventStream, err error) { + res, err := s.client.R(). + SetContext(s.ctx). + SetResult(&streams). + Get("/eventstreams") + if err != nil || !res.IsSuccess() { + return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgFabconnectRESTErr) + } + return streams, nil +} + +func (s *streamManager) createEventStream(topic string, batchSize, batchTimeout uint) (*eventStream, error) { + stream := eventStream{ + Name: topic, + ErrorHandling: "block", + BatchSize: batchSize, + BatchTimeoutMS: batchTimeout, + Type: "websocket", + WebSocket: eventStreamWebsocket{Topic: topic}, + } + res, err := s.client.R(). + SetContext(s.ctx). + SetBody(&stream). + SetResult(&stream). + Post("/eventstreams") + if err != nil || !res.IsSuccess() { + return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgFabconnectRESTErr) + } + return &stream, nil +} + +func (s *streamManager) ensureEventStream(topic string, batchSize, batchTimeout uint) (*eventStream, error) { + existingStreams, err := s.getEventStreams() + if err != nil { + return nil, err + } + for _, stream := range existingStreams { + if stream.WebSocket.Topic == topic { + return stream, nil + } + } + return s.createEventStream(topic, batchSize, batchTimeout) +} + +func (s *streamManager) getSubscriptions() (subs []*subscription, err error) { + res, err := s.client.R(). + SetContext(s.ctx). + SetResult(&subs). + Get("/subscriptions") + if err != nil || !res.IsSuccess() { + return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgFabconnectRESTErr) + } + return subs, nil +} + +func (s *streamManager) createSubscription(name, stream, event string) (*subscription, error) { + sub := subscription{ + Name: name, + Channel: s.defaultChannel, + Signer: s.signer, + Stream: stream, + Filter: eventFilter{ + ChaincodeID: s.chaincode, + EventFilter: event, + }, + } + res, err := s.client.R(). + SetContext(s.ctx). + SetBody(&sub). + SetResult(&sub). + Post("/subscriptions") + if err != nil || !res.IsSuccess() { + return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgFabconnectRESTErr) + } + return &sub, nil +} + +func (s *streamManager) ensureSubscriptions(stream string, subscriptions []string) (subs []*subscription, err error) { + existingSubs, err := s.getSubscriptions() + if err != nil { + return nil, err + } + + for _, eventType := range subscriptions { + var sub *subscription + for _, s := range existingSubs { + if s.Name == eventType { + sub = s + } + } + + if sub == nil { + if sub, err = s.createSubscription(eventType, stream, eventType); err != nil { + return nil, err + } + } + + log.L(s.ctx).Infof("%s subscription: %s", eventType, sub.ID) + subs = append(subs, sub) + + } + return subs, nil +} diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index 58cf493a2e..0b4bc4f037 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -60,31 +60,10 @@ type Fabric struct { closed chan struct{} } -type eventStream struct { - ID string `json:"id"` - Name string `json:"name"` - ErrorHandling string `json:"errorHandling"` - BatchSize uint `json:"batchSize"` - BatchTimeoutMS uint `json:"batchTimeoutMS"` - Type string `json:"type"` - WebSocket eventStreamWebsocket `json:"websocket"` -} - type eventStreamWebsocket struct { Topic string `json:"topic"` } -type subscription struct { - ID string `json:"id"` - Description string `json:"description"` - Name string `json:"name"` - Channel string `json:"channel"` - Signer string `json:"signer"` - Stream string `json:"stream"` - FromBlock string `json:"fromBlock"` - Filter eventFilter `json:"filter"` -} - type eventFilter struct { ChaincodeID string `json:"chaincodeId"` EventFilter string `json:"eventFilter"` @@ -146,8 +125,8 @@ type fabIdentity struct { CACert string `json:"caCert"` } -var requiredSubscriptions = map[string]string{ - "BatchPin": "Batch pin", +var requiredSubscriptions = []string{ + "BatchPin", } var fullIdentityPattern = regexp.MustCompile(".+::x509::(.+)::.+") @@ -199,7 +178,20 @@ func (f *Fabric) Init(ctx context.Context, prefix config.Prefix, callbacks block return err } - if err = f.ensureEventStreams(fabconnectConf); err != nil { + streams := streamManager{ + ctx: f.ctx, + client: f.client, + defaultChannel: f.defaultChannel, + chaincode: f.chaincode, + signer: f.signer, + } + batchSize := fabconnectConf.GetUint(FabconnectConfigBatchSize) + batchTimeout := uint(fabconnectConf.GetDuration(FabconnectConfigBatchTimeout).Milliseconds()) + if f.initInfo.stream, err = streams.ensureEventStream(f.topic, batchSize, batchTimeout); err != nil { + return err + } + log.L(f.ctx).Infof("Event stream: %s", f.initInfo.stream.ID) + if f.initInfo.subs, err = streams.ensureSubscriptions(f.initInfo.stream.ID, requiredSubscriptions); err != nil { return err } @@ -216,42 +208,6 @@ func (f *Fabric) Start() error { func (f *Fabric) Capabilities() *blockchain.Capabilities { return f.capabilities } - -func (f *Fabric) ensureEventStreams(fabconnectConf config.Prefix) error { - - var existingStreams []*eventStream - res, err := f.client.R().SetContext(f.ctx).SetResult(&existingStreams).Get("/eventstreams") - if err != nil || !res.IsSuccess() { - return restclient.WrapRestErr(f.ctx, res, err, i18n.MsgFabconnectRESTErr) - } - - for _, stream := range existingStreams { - if stream.WebSocket.Topic == f.topic { - f.initInfo.stream = stream - } - } - - if f.initInfo.stream == nil { - newStream := eventStream{ - Name: f.topic, - ErrorHandling: "block", - BatchSize: fabconnectConf.GetUint(FabconnectConfigBatchSize), - BatchTimeoutMS: uint(fabconnectConf.GetDuration(FabconnectConfigBatchTimeout).Milliseconds()), - Type: "websocket", - } - newStream.WebSocket.Topic = f.topic - res, err = f.client.R().SetBody(&newStream).SetResult(&newStream).Post("/eventstreams") - if err != nil || !res.IsSuccess() { - return restclient.WrapRestErr(f.ctx, res, err, i18n.MsgFabconnectRESTErr) - } - f.initInfo.stream = &newStream - } - - log.L(f.ctx).Infof("Event stream: %s", f.initInfo.stream.ID) - - return f.ensureSusbscriptions(f.initInfo.stream.ID) -} - func (f *Fabric) afterConnect(ctx context.Context, w wsclient.WSClient) error { // Send a subscribe to our topic after each connect/reconnect b, _ := json.Marshal(&fabWSCommandPayload{ @@ -268,51 +224,6 @@ func (f *Fabric) afterConnect(ctx context.Context, w wsclient.WSClient) error { return err } -func (f *Fabric) ensureSusbscriptions(streamID string) error { - for eventType, subDesc := range requiredSubscriptions { - - var existingSubs []*subscription - res, err := f.client.R().SetResult(&existingSubs).Get("/subscriptions") - if err != nil || !res.IsSuccess() { - return restclient.WrapRestErr(f.ctx, res, err, i18n.MsgFabconnectRESTErr) - } - - var sub *subscription - for _, s := range existingSubs { - if s.Name == eventType { - sub = s - } - } - - if sub == nil { - newSub := subscription{ - Name: eventType, - Description: subDesc, - Channel: f.defaultChannel, - Signer: f.signer, - Stream: streamID, - } - newSub.Filter.ChaincodeID = f.chaincode - newSub.Filter.EventFilter = "BatchPin" - - res, err = f.client.R(). - SetContext(f.ctx). - SetBody(&newSub). - SetResult(&newSub). - Post("/subscriptions") - if err != nil || !res.IsSuccess() { - return restclient.WrapRestErr(f.ctx, res, err, i18n.MsgFabconnectRESTErr) - } - sub = &newSub - } - - log.L(f.ctx).Infof("%s subscription: %s", eventType, sub.ID) - f.initInfo.subs = append(f.initInfo.subs, sub) - - } - return nil -} - func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { sTransactionHash := msgJSON.GetString("transactionId") payloadString := msgJSON.GetString("payload")