Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 14 additions & 109 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package ethereum

import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"regexp"
"strings"

Expand Down Expand Up @@ -57,28 +55,10 @@ type Ethereum struct {
closed chan struct{}
}

type eventStream struct {
ID string `json:"id"`
Name string `json:"name"`
ErrorHandling string `json:"errorHandling"`
BatchSize uint `json:"batchSize"`
BatchTimeoutMS uint `json:"batchTimeoutMS"`
Type string `json:"type"`
WebSocket eventStreamWebsocket `json:"websocket"`
}

type eventStreamWebsocket struct {
Topic string `json:"topic"`
}

type subscription struct {
ID string `json:"id"`
Description string `json:"description"`
Name string `json:"name"`
Stream string `json:"stream"`
FromBlock string `json:"fromBlock"`
}

type asyncTXSubmission struct {
ID string `json:"id"`
}
Expand All @@ -96,8 +76,8 @@ type ethWSCommandPayload struct {
Topic string `json:"topic,omitempty"`
}

var requiredSubscriptions = map[string]string{
"BatchPin": "Batch pin",
var requiredSubscriptions = []string{
"BatchPin",
}

var addressVerify = regexp.MustCompile("^[0-9a-f]{40}$")
Expand Down Expand Up @@ -144,7 +124,18 @@ func (e *Ethereum) Init(ctx context.Context, prefix config.Prefix, callbacks blo
return err
}

if err = e.ensureEventStreams(ethconnectConf); err != nil {
streams := streamManager{
ctx: e.ctx,
client: e.client,
instancePath: e.instancePath,
}
batchSize := ethconnectConf.GetUint(EthconnectConfigBatchSize)
batchTimeout := uint(ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds())
if e.initInfo.stream, err = streams.ensureEventStream(e.topic, batchSize, batchTimeout); err != nil {
return err
}
log.L(e.ctx).Infof("Event stream: %s", e.initInfo.stream.ID)
if e.initInfo.subs, err = streams.ensureSubscriptions(e.initInfo.stream.ID, requiredSubscriptions); err != nil {
return err
}

Expand All @@ -162,41 +153,6 @@ func (e *Ethereum) Capabilities() *blockchain.Capabilities {
return e.capabilities
}

func (e *Ethereum) ensureEventStreams(ethconnectConf config.Prefix) error {

var existingStreams []*eventStream
res, err := e.client.R().SetContext(e.ctx).SetResult(&existingStreams).Get("/eventstreams")
if err != nil || !res.IsSuccess() {
return restclient.WrapRestErr(e.ctx, res, err, i18n.MsgEthconnectRESTErr)
}

for _, stream := range existingStreams {
if stream.WebSocket.Topic == e.topic {
e.initInfo.stream = stream
}
}

if e.initInfo.stream == nil {
newStream := eventStream{
Name: e.topic,
ErrorHandling: "block",
BatchSize: ethconnectConf.GetUint(EthconnectConfigBatchSize),
BatchTimeoutMS: uint(ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds()),
Type: "websocket",
}
newStream.WebSocket.Topic = e.topic
res, err = e.client.R().SetBody(&newStream).SetResult(&newStream).Post("/eventstreams")
if err != nil || !res.IsSuccess() {
return restclient.WrapRestErr(e.ctx, res, err, i18n.MsgEthconnectRESTErr)
}
e.initInfo.stream = &newStream
}

log.L(e.ctx).Infof("Event stream: %s", e.initInfo.stream.ID)

return e.ensureSubscriptions()
}

func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error {
// Send a subscribe to our topic after each connect/reconnect
b, _ := json.Marshal(&ethWSCommandPayload{
Expand All @@ -213,57 +169,6 @@ func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error
return err
}

func (e *Ethereum) ensureSubscriptions() error {
// Include a hash of the instance path in the subscription, so if we ever point at a different
// contract configuration, we re-subscribe from block 0.
// We don't need full strength hashing, so just use the first 16 chars for readability.
instanceUniqueHash := hex.EncodeToString(sha256.New().Sum([]byte(e.instancePath)))[0:16]

for eventType, subDesc := range requiredSubscriptions {

var existingSubs []*subscription
res, err := e.client.R().SetResult(&existingSubs).Get("/subscriptions")
if err != nil || !res.IsSuccess() {
return restclient.WrapRestErr(e.ctx, res, err, i18n.MsgEthconnectRESTErr)
}

var sub *subscription
subName := fmt.Sprintf("%s_%s", eventType, instanceUniqueHash)
for _, s := range existingSubs {
if s.Name == subName ||
/* Check for the plain name we used to use originally, before adding uniqueness qualifier.
If one of these very early environments needed a new subscription, the existing one would need to
be deleted manually. */
s.Name == eventType {
sub = s
}
}

if sub == nil {
newSub := subscription{
Name: subName,
Description: subDesc,
Stream: e.initInfo.stream.ID,
FromBlock: "0",
}
res, err = e.client.R().
SetContext(e.ctx).
SetBody(&newSub).
SetResult(&newSub).
Post(fmt.Sprintf("%s/%s", e.instancePath, eventType))
if err != nil || !res.IsSuccess() {
return restclient.WrapRestErr(e.ctx, res, err, i18n.MsgEthconnectRESTErr)
}
sub = &newSub
}

log.L(e.ctx).Infof("%s subscription: %s", eventType, sub.ID)
e.initInfo.subs = append(e.initInfo.subs, sub)

}
return nil
}

func ethHexFormatB32(b *fftypes.Bytes32) string {
if b == nil {
return "0x0000000000000000000000000000000000000000000000000000000000000000"
Expand Down
160 changes: 160 additions & 0 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright © 2021 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ethereum

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/restclient"
)

type streamManager struct {
ctx context.Context
client *resty.Client
instancePath string
}

type eventStream struct {
ID string `json:"id"`
Name string `json:"name"`
ErrorHandling string `json:"errorHandling"`
BatchSize uint `json:"batchSize"`
BatchTimeoutMS uint `json:"batchTimeoutMS"`
Type string `json:"type"`
WebSocket eventStreamWebsocket `json:"websocket"`
}

type subscription struct {
ID string `json:"id"`
Name string `json:"name"`
Stream string `json:"stream"`
FromBlock string `json:"fromBlock"`
}

func (s *streamManager) getEventStreams() (streams []*eventStream, err error) {
res, err := s.client.R().
SetContext(s.ctx).
SetResult(&streams).
Get("/eventstreams")
if err != nil || !res.IsSuccess() {
return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgEthconnectRESTErr)
}
return streams, nil
}

func (s *streamManager) createEventStream(topic string, batchSize, batchTimeout uint) (*eventStream, error) {
stream := eventStream{
Name: topic,
ErrorHandling: "block",
BatchSize: batchSize,
BatchTimeoutMS: batchTimeout,
Type: "websocket",
WebSocket: eventStreamWebsocket{Topic: topic},
}
res, err := s.client.R().
SetContext(s.ctx).
SetBody(&stream).
SetResult(&stream).
Post("/eventstreams")
if err != nil || !res.IsSuccess() {
return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgEthconnectRESTErr)
}
return &stream, nil
}

func (s *streamManager) ensureEventStream(topic string, batchSize, batchTimeout uint) (*eventStream, error) {
existingStreams, err := s.getEventStreams()
if err != nil {
return nil, err
}
for _, stream := range existingStreams {
if stream.WebSocket.Topic == topic {
return stream, nil
}
}
return s.createEventStream(topic, batchSize, batchTimeout)
}

func (s *streamManager) getSubscriptions() (subs []*subscription, err error) {
res, err := s.client.R().
SetContext(s.ctx).
SetResult(&subs).
Get("/subscriptions")
if err != nil || !res.IsSuccess() {
return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgEthconnectRESTErr)
}
return subs, nil
}

func (s *streamManager) createSubscription(name, stream, event string) (*subscription, error) {
sub := subscription{
Name: name,
Stream: stream,
FromBlock: "0",
}
res, err := s.client.R().
SetContext(s.ctx).
SetBody(&sub).
SetResult(&sub).
Post(fmt.Sprintf("%s/%s", s.instancePath, event))
if err != nil || !res.IsSuccess() {
return nil, restclient.WrapRestErr(s.ctx, res, err, i18n.MsgEthconnectRESTErr)
}
return &sub, nil
}

func (s *streamManager) ensureSubscriptions(stream string, subscriptions []string) (subs []*subscription, err error) {
// Include a hash of the instance path in the subscription, so if we ever point at a different
// contract configuration, we re-subscribe from block 0.
// We don't need full strength hashing, so just use the first 16 chars for readability.
instanceUniqueHash := hex.EncodeToString(sha256.New().Sum([]byte(s.instancePath)))[0:16]

existingSubs, err := s.getSubscriptions()
if err != nil {
return nil, err
}

for _, eventType := range subscriptions {
var sub *subscription
subName := fmt.Sprintf("%s_%s", eventType, instanceUniqueHash)
for _, s := range existingSubs {
if s.Name == subName ||
/* Check for the plain name we used to use originally, before adding uniqueness qualifier.
If one of these very early environments needed a new subscription, the existing one would need to
be deleted manually. */
s.Name == eventType {
sub = s
}
}

if sub == nil {
if sub, err = s.createSubscription(subName, stream, eventType); err != nil {
return nil, err
}
}

log.L(s.ctx).Infof("%s subscription: %s", eventType, sub.ID)
subs = append(subs, sub)
}
return subs, nil
}
Loading