From 8511bf01d00ee7a353c4046f046081049b28d9bc Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Thu, 14 Oct 2021 20:29:16 -0400 Subject: [PATCH 1/6] Use a single mutex, as there is a single keys map Signed-off-by: Peter Broadhurst --- internal/config/config.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 1f977d8d18..44d2334ec5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -393,8 +393,10 @@ func MergeConfig(configRecords []*fftypes.ConfigRecord) error { return nil } +var rootKeys = map[string]bool{} +var keysMutex sync.Mutex var root = &configPrefix{ - keys: map[string]bool{}, // All keys go here, including those defined in sub prefixies + keys: rootKeys, // All keys go here, including those defined in sub prefixies } // ark adds a root key, used to define the keys that are used within the core @@ -406,8 +408,8 @@ func rootKey(k string) RootKey { // GetKnownKeys gets the known keys func GetKnownKeys() []string { var keys []string - root.keysMutex.Lock() - defer root.keysMutex.Unlock() + keysMutex.Lock() + defer keysMutex.Unlock() for k := range root.keys { keys = append(keys, k) } @@ -417,9 +419,8 @@ func GetKnownKeys() []string { // configPrefix is the main config structure passed to plugins, and used for root to wrap viper type configPrefix struct { - prefix string - keys map[string]bool - keysMutex sync.Mutex + prefix string + keys map[string]bool } // configPrefixArray is a point in the config that supports an array @@ -440,8 +441,8 @@ func NewPluginConfig(prefix string) Prefix { } func (c *configPrefix) prefixKey(k string) string { - c.keysMutex.Lock() - defer c.keysMutex.Unlock() + keysMutex.Lock() + defer keysMutex.Unlock() key := c.prefix + k if !c.keys[key] { panic(fmt.Sprintf("Undefined configuration key '%s'", key)) @@ -495,8 +496,8 @@ func (c *configPrefixArray) ArrayEntry(i int) Prefix { func (c *configPrefixArray) AddKnownKey(k string, defValue ...interface{}) { // Put a simulated key in the known keys array, to pop into the help info. - root.keysMutex.Lock() - defer root.keysMutex.Unlock() + keysMutex.Lock() + defer keysMutex.Unlock() root.keys[fmt.Sprintf("%s[].%s", c.base, k)] = true c.defaults[k] = defValue } @@ -508,8 +509,8 @@ func (c *configPrefix) AddKnownKey(k string, defValue ...interface{}) { } else if len(defValue) > 0 { c.SetDefault(k, defValue) } - c.keysMutex.Lock() - defer c.keysMutex.Unlock() + keysMutex.Lock() + defer keysMutex.Unlock() c.keys[key] = true } From e1c215d56f4460dada6d6b13395bf69c2b89456f Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Thu, 14 Oct 2021 20:34:19 -0400 Subject: [PATCH 2/6] Make it clearer there is just one list of keys Signed-off-by: Peter Broadhurst --- internal/config/config.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 44d2334ec5..df4e0aff47 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -393,11 +393,9 @@ func MergeConfig(configRecords []*fftypes.ConfigRecord) error { return nil } -var rootKeys = map[string]bool{} +var knownKeys = map[string]bool{} // All keys go here, including those defined in sub prefixies var keysMutex sync.Mutex -var root = &configPrefix{ - keys: rootKeys, // All keys go here, including those defined in sub prefixies -} +var root = &configPrefix{} // ark adds a root key, used to define the keys that are used within the core func rootKey(k string) RootKey { @@ -410,7 +408,7 @@ func GetKnownKeys() []string { var keys []string keysMutex.Lock() defer keysMutex.Unlock() - for k := range root.keys { + for k := range knownKeys { keys = append(keys, k) } sort.Strings(keys) @@ -420,7 +418,6 @@ func GetKnownKeys() []string { // configPrefix is the main config structure passed to plugins, and used for root to wrap viper type configPrefix struct { prefix string - keys map[string]bool } // configPrefixArray is a point in the config that supports an array @@ -436,7 +433,6 @@ func NewPluginConfig(prefix string) Prefix { } return &configPrefix{ prefix: prefix, - keys: root.keys, } } @@ -444,7 +440,7 @@ func (c *configPrefix) prefixKey(k string) string { keysMutex.Lock() defer keysMutex.Unlock() key := c.prefix + k - if !c.keys[key] { + if !knownKeys[key] { panic(fmt.Sprintf("Undefined configuration key '%s'", key)) } return key @@ -453,7 +449,6 @@ func (c *configPrefix) prefixKey(k string) string { func (c *configPrefix) SubPrefix(suffix string) Prefix { return &configPrefix{ prefix: c.prefix + suffix + ".", - keys: root.keys, } } @@ -477,7 +472,6 @@ func (c *configPrefixArray) ArraySize() int { func (c *configPrefixArray) ArrayEntry(i int) Prefix { cp := &configPrefix{ prefix: c.base + fmt.Sprintf(".%d.", i), - keys: root.keys, } for knownKey, defValue := range c.defaults { cp.AddKnownKey(knownKey, defValue...) @@ -498,7 +492,7 @@ func (c *configPrefixArray) AddKnownKey(k string, defValue ...interface{}) { // Put a simulated key in the known keys array, to pop into the help info. keysMutex.Lock() defer keysMutex.Unlock() - root.keys[fmt.Sprintf("%s[].%s", c.base, k)] = true + knownKeys[fmt.Sprintf("%s[].%s", c.base, k)] = true c.defaults[k] = defValue } @@ -511,7 +505,7 @@ func (c *configPrefix) AddKnownKey(k string, defValue ...interface{}) { } keysMutex.Lock() defer keysMutex.Unlock() - c.keys[key] = true + knownKeys[key] = true } func (c *configPrefix) SetDefault(k string, defValue interface{}) { From 3efdbdd2bbaf7a6e196bcff2f0334a9e251b6af1 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Thu, 14 Oct 2021 23:35:40 -0400 Subject: [PATCH 3/6] Re-subscribe if the instance path changes Signed-off-by: Peter Broadhurst --- internal/blockchain/ethereum/ethereum.go | 26 ++++++++++++++----- internal/blockchain/ethereum/ethereum_test.go | 2 +- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 78c775a091..663e5d706e 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -18,6 +18,7 @@ package ethereum import ( "context" + "crypto/sha256" "encoding/hex" "encoding/json" "fmt" @@ -70,12 +71,18 @@ type eventStreamWebsocket struct { Topic string `json:"topic"` } +type subscriptionFilter struct { + Address string `json:"address"` + Topics []string `json:"topics"` +} + type subscription struct { - ID string `json:"id"` - Description string `json:"description"` - Name string `json:"name"` - Stream string `json:"stream"` - FromBlock string `json:"fromBlock"` + ID string `json:"id"` + Description string `json:"description"` + Name string `json:"name"` + Stream string `json:"stream"` + FromBlock string `json:"fromBlock"` + Filter subscriptionFilter `json:"filter"` } type asyncTXSubmission struct { @@ -215,7 +222,12 @@ func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error } func (e *Ethereum) ensureSubscriptions() error { + // Include a hash of the instance path in the subscription, so if we ever point at a different + // contract configuration, we re-subscribe from block 0. + instanceUniqueHash := hex.EncodeToString(sha256.New().Sum([]byte(e.instancePath)))[0:16] + for eventType, subDesc := range requiredSubscriptions { + subName := fmt.Sprintf("%s_%s", eventType, instanceUniqueHash) var existingSubs []*subscription res, err := e.client.R().SetResult(&existingSubs).Get("/subscriptions") @@ -225,14 +237,14 @@ func (e *Ethereum) ensureSubscriptions() error { var sub *subscription for _, s := range existingSubs { - if s.Name == eventType { + if s.Name == subName { sub = s } } if sub == nil { newSub := subscription{ - Name: eventType, + Name: subName, Description: subDesc, Stream: e.initInfo.stream.ID, FromBlock: "0", diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 310e96a228..01781f9559 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -206,7 +206,7 @@ func TestInitAllExistingStreams(t *testing.T) { httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ - {ID: "sub12345", Name: "BatchPin"}, + {ID: "sub12345", Name: "BatchPin_2f696e7374616e63" /* this is the subname for our combo of instance path and BatchPin */}, })) resetConf() From 185ad4a50574c5b444a63fbbf9e94ce422766433 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Thu, 14 Oct 2021 23:41:30 -0400 Subject: [PATCH 4/6] Make newly created subs unique to the instance path Signed-off-by: Peter Broadhurst --- internal/apiserver/http_server.go | 33 +++++++++++++----------- internal/blockchain/ethereum/ethereum.go | 9 +++++-- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/internal/apiserver/http_server.go b/internal/apiserver/http_server.go index 461c191653..88699136be 100644 --- a/internal/apiserver/http_server.go +++ b/internal/apiserver/http_server.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "net" "net/http" + "time" "github.com/gorilla/mux" "github.com/hyperledger/firefly/internal/config" @@ -63,14 +64,15 @@ type IServer interface { } type httpServer struct { - name string - s IServer - l net.Listener - conf config.Prefix - onClose chan error - tlsEnabled bool - tlsCertFile string - tlsKeyFile string + name string + s IServer + l net.Listener + conf config.Prefix + onClose chan error + tlsEnabled bool + tlsCertFile string + tlsKeyFile string + shutdownTimeout time.Duration } func initHTTPConfPrefx(prefix config.Prefix, defaultPort int) { @@ -88,12 +90,13 @@ func initHTTPConfPrefx(prefix config.Prefix, defaultPort int) { func newHTTPServer(ctx context.Context, name string, r *mux.Router, onClose chan error, conf config.Prefix) (hs *httpServer, err error) { hs = &httpServer{ - name: name, - onClose: onClose, - conf: conf, - tlsEnabled: conf.GetBool(HTTPConfTLSEnabled), - tlsCertFile: conf.GetString(HTTPConfTLSCertFile), - tlsKeyFile: conf.GetString(HTTPConfTLSKeyFile), + name: name, + onClose: onClose, + conf: conf, + tlsEnabled: conf.GetBool(HTTPConfTLSEnabled), + tlsCertFile: conf.GetString(HTTPConfTLSCertFile), + tlsKeyFile: conf.GetString(HTTPConfTLSKeyFile), + shutdownTimeout: config.GetDuration(config.APIShutdownTimeout), } hs.l, err = hs.createListener(ctx) if err == nil { @@ -172,7 +175,7 @@ func (hs *httpServer) serveHTTP(ctx context.Context) { select { case <-ctx.Done(): log.L(ctx).Infof("API server context cancelled - shutting down") - shutdownContext, cancel := context.WithTimeout(context.Background(), config.GetDuration(config.APIShutdownTimeout)) + shutdownContext, cancel := context.WithTimeout(context.Background(), hs.shutdownTimeout) defer cancel() if err := hs.s.Shutdown(shutdownContext); err != nil { hs.onClose <- err diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 663e5d706e..71844cedfe 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -224,10 +224,10 @@ func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error func (e *Ethereum) ensureSubscriptions() error { // Include a hash of the instance path in the subscription, so if we ever point at a different // contract configuration, we re-subscribe from block 0. + // We don't need full strength hashing, so just use the first 16 chars for readability. instanceUniqueHash := hex.EncodeToString(sha256.New().Sum([]byte(e.instancePath)))[0:16] for eventType, subDesc := range requiredSubscriptions { - subName := fmt.Sprintf("%s_%s", eventType, instanceUniqueHash) var existingSubs []*subscription res, err := e.client.R().SetResult(&existingSubs).Get("/subscriptions") @@ -236,8 +236,13 @@ func (e *Ethereum) ensureSubscriptions() error { } var sub *subscription + subName := fmt.Sprintf("%s_%s", eventType, instanceUniqueHash) for _, s := range existingSubs { - if s.Name == subName { + if s.Name == subName || + /* Check for the plain name we used to use originally, before adding uniqueness qualifier. + If one of these very early environments needed a new subscription, the existing one would need to + be deleted manually. */ + s.Name == eventType { sub = s } } From 418c678c10a4f1183fbeb464aa27eface02a4142 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Thu, 14 Oct 2021 23:55:29 -0400 Subject: [PATCH 5/6] Revert adding of extra params to de-serialize Signed-off-by: Peter Broadhurst --- internal/blockchain/ethereum/ethereum.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 71844cedfe..5e99b0025c 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -71,18 +71,12 @@ type eventStreamWebsocket struct { Topic string `json:"topic"` } -type subscriptionFilter struct { - Address string `json:"address"` - Topics []string `json:"topics"` -} - type subscription struct { - ID string `json:"id"` - Description string `json:"description"` - Name string `json:"name"` - Stream string `json:"stream"` - FromBlock string `json:"fromBlock"` - Filter subscriptionFilter `json:"filter"` + ID string `json:"id"` + Description string `json:"description"` + Name string `json:"name"` + Stream string `json:"stream"` + FromBlock string `json:"fromBlock"` } type asyncTXSubmission struct { From 352d60ae8f5e935441f04fe25867e8b5effd0f21 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 15 Oct 2021 14:03:16 -0400 Subject: [PATCH 6/6] Allocate to length Signed-off-by: Peter Broadhurst --- internal/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/config/config.go b/internal/config/config.go index df4e0aff47..9870791785 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -405,7 +405,7 @@ func rootKey(k string) RootKey { // GetKnownKeys gets the known keys func GetKnownKeys() []string { - var keys []string + keys := make([]string, 0, len(knownKeys)) keysMutex.Lock() defer keysMutex.Unlock() for k := range knownKeys {