diff --git a/internal/apiserver/http_server.go b/internal/apiserver/http_server.go index 461c191653..88699136be 100644 --- a/internal/apiserver/http_server.go +++ b/internal/apiserver/http_server.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "net" "net/http" + "time" "github.com/gorilla/mux" "github.com/hyperledger/firefly/internal/config" @@ -63,14 +64,15 @@ type IServer interface { } type httpServer struct { - name string - s IServer - l net.Listener - conf config.Prefix - onClose chan error - tlsEnabled bool - tlsCertFile string - tlsKeyFile string + name string + s IServer + l net.Listener + conf config.Prefix + onClose chan error + tlsEnabled bool + tlsCertFile string + tlsKeyFile string + shutdownTimeout time.Duration } func initHTTPConfPrefx(prefix config.Prefix, defaultPort int) { @@ -88,12 +90,13 @@ func initHTTPConfPrefx(prefix config.Prefix, defaultPort int) { func newHTTPServer(ctx context.Context, name string, r *mux.Router, onClose chan error, conf config.Prefix) (hs *httpServer, err error) { hs = &httpServer{ - name: name, - onClose: onClose, - conf: conf, - tlsEnabled: conf.GetBool(HTTPConfTLSEnabled), - tlsCertFile: conf.GetString(HTTPConfTLSCertFile), - tlsKeyFile: conf.GetString(HTTPConfTLSKeyFile), + name: name, + onClose: onClose, + conf: conf, + tlsEnabled: conf.GetBool(HTTPConfTLSEnabled), + tlsCertFile: conf.GetString(HTTPConfTLSCertFile), + tlsKeyFile: conf.GetString(HTTPConfTLSKeyFile), + shutdownTimeout: config.GetDuration(config.APIShutdownTimeout), } hs.l, err = hs.createListener(ctx) if err == nil { @@ -172,7 +175,7 @@ func (hs *httpServer) serveHTTP(ctx context.Context) { select { case <-ctx.Done(): log.L(ctx).Infof("API server context cancelled - shutting down") - shutdownContext, cancel := context.WithTimeout(context.Background(), config.GetDuration(config.APIShutdownTimeout)) + shutdownContext, cancel := context.WithTimeout(context.Background(), hs.shutdownTimeout) defer cancel() if err := hs.s.Shutdown(shutdownContext); err != nil { hs.onClose <- err diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 78c775a091..5e99b0025c 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -18,6 +18,7 @@ package ethereum import ( "context" + "crypto/sha256" "encoding/hex" "encoding/json" "fmt" @@ -215,6 +216,11 @@ func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error } func (e *Ethereum) ensureSubscriptions() error { + // Include a hash of the instance path in the subscription, so if we ever point at a different + // contract configuration, we re-subscribe from block 0. + // We don't need full strength hashing, so just use the first 16 chars for readability. + instanceUniqueHash := hex.EncodeToString(sha256.New().Sum([]byte(e.instancePath)))[0:16] + for eventType, subDesc := range requiredSubscriptions { var existingSubs []*subscription @@ -224,15 +230,20 @@ func (e *Ethereum) ensureSubscriptions() error { } var sub *subscription + subName := fmt.Sprintf("%s_%s", eventType, instanceUniqueHash) for _, s := range existingSubs { - if s.Name == eventType { + if s.Name == subName || + /* Check for the plain name we used to use originally, before adding uniqueness qualifier. + If one of these very early environments needed a new subscription, the existing one would need to + be deleted manually. */ + s.Name == eventType { sub = s } } if sub == nil { newSub := subscription{ - Name: eventType, + Name: subName, Description: subDesc, Stream: e.initInfo.stream.ID, FromBlock: "0", diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 310e96a228..01781f9559 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -206,7 +206,7 @@ func TestInitAllExistingStreams(t *testing.T) { httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ - {ID: "sub12345", Name: "BatchPin"}, + {ID: "sub12345", Name: "BatchPin_2f696e7374616e63" /* this is the subname for our combo of instance path and BatchPin */}, })) resetConf() diff --git a/internal/config/config.go b/internal/config/config.go index 1f977d8d18..9870791785 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -393,9 +393,9 @@ func MergeConfig(configRecords []*fftypes.ConfigRecord) error { return nil } -var root = &configPrefix{ - keys: map[string]bool{}, // All keys go here, including those defined in sub prefixies -} +var knownKeys = map[string]bool{} // All keys go here, including those defined in sub prefixies +var keysMutex sync.Mutex +var root = &configPrefix{} // ark adds a root key, used to define the keys that are used within the core func rootKey(k string) RootKey { @@ -405,10 +405,10 @@ func rootKey(k string) RootKey { // GetKnownKeys gets the known keys func GetKnownKeys() []string { - var keys []string - root.keysMutex.Lock() - defer root.keysMutex.Unlock() - for k := range root.keys { + keys := make([]string, 0, len(knownKeys)) + keysMutex.Lock() + defer keysMutex.Unlock() + for k := range knownKeys { keys = append(keys, k) } sort.Strings(keys) @@ -417,9 +417,7 @@ func GetKnownKeys() []string { // configPrefix is the main config structure passed to plugins, and used for root to wrap viper type configPrefix struct { - prefix string - keys map[string]bool - keysMutex sync.Mutex + prefix string } // configPrefixArray is a point in the config that supports an array @@ -435,15 +433,14 @@ func NewPluginConfig(prefix string) Prefix { } return &configPrefix{ prefix: prefix, - keys: root.keys, } } func (c *configPrefix) prefixKey(k string) string { - c.keysMutex.Lock() - defer c.keysMutex.Unlock() + keysMutex.Lock() + defer keysMutex.Unlock() key := c.prefix + k - if !c.keys[key] { + if !knownKeys[key] { panic(fmt.Sprintf("Undefined configuration key '%s'", key)) } return key @@ -452,7 +449,6 @@ func (c *configPrefix) prefixKey(k string) string { func (c *configPrefix) SubPrefix(suffix string) Prefix { return &configPrefix{ prefix: c.prefix + suffix + ".", - keys: root.keys, } } @@ -476,7 +472,6 @@ func (c *configPrefixArray) ArraySize() int { func (c *configPrefixArray) ArrayEntry(i int) Prefix { cp := &configPrefix{ prefix: c.base + fmt.Sprintf(".%d.", i), - keys: root.keys, } for knownKey, defValue := range c.defaults { cp.AddKnownKey(knownKey, defValue...) @@ -495,9 +490,9 @@ func (c *configPrefixArray) ArrayEntry(i int) Prefix { func (c *configPrefixArray) AddKnownKey(k string, defValue ...interface{}) { // Put a simulated key in the known keys array, to pop into the help info. - root.keysMutex.Lock() - defer root.keysMutex.Unlock() - root.keys[fmt.Sprintf("%s[].%s", c.base, k)] = true + keysMutex.Lock() + defer keysMutex.Unlock() + knownKeys[fmt.Sprintf("%s[].%s", c.base, k)] = true c.defaults[k] = defValue } @@ -508,9 +503,9 @@ func (c *configPrefix) AddKnownKey(k string, defValue ...interface{}) { } else if len(defValue) > 0 { c.SetDefault(k, defValue) } - c.keysMutex.Lock() - defer c.keysMutex.Unlock() - c.keys[key] = true + keysMutex.Lock() + defer keysMutex.Unlock() + knownKeys[key] = true } func (c *configPrefix) SetDefault(k string, defValue interface{}) {