Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions internal/apiserver/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"net"
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/hyperledger/firefly/internal/config"
Expand Down Expand Up @@ -63,14 +64,15 @@ type IServer interface {
}

type httpServer struct {
name string
s IServer
l net.Listener
conf config.Prefix
onClose chan error
tlsEnabled bool
tlsCertFile string
tlsKeyFile string
name string
s IServer
l net.Listener
conf config.Prefix
onClose chan error
tlsEnabled bool
tlsCertFile string
tlsKeyFile string
shutdownTimeout time.Duration
}

func initHTTPConfPrefx(prefix config.Prefix, defaultPort int) {
Expand All @@ -88,12 +90,13 @@ func initHTTPConfPrefx(prefix config.Prefix, defaultPort int) {

func newHTTPServer(ctx context.Context, name string, r *mux.Router, onClose chan error, conf config.Prefix) (hs *httpServer, err error) {
hs = &httpServer{
name: name,
onClose: onClose,
conf: conf,
tlsEnabled: conf.GetBool(HTTPConfTLSEnabled),
tlsCertFile: conf.GetString(HTTPConfTLSCertFile),
tlsKeyFile: conf.GetString(HTTPConfTLSKeyFile),
name: name,
onClose: onClose,
conf: conf,
tlsEnabled: conf.GetBool(HTTPConfTLSEnabled),
tlsCertFile: conf.GetString(HTTPConfTLSCertFile),
tlsKeyFile: conf.GetString(HTTPConfTLSKeyFile),
shutdownTimeout: config.GetDuration(config.APIShutdownTimeout),
}
hs.l, err = hs.createListener(ctx)
if err == nil {
Expand Down Expand Up @@ -172,7 +175,7 @@ func (hs *httpServer) serveHTTP(ctx context.Context) {
select {
case <-ctx.Done():
log.L(ctx).Infof("API server context cancelled - shutting down")
shutdownContext, cancel := context.WithTimeout(context.Background(), config.GetDuration(config.APIShutdownTimeout))
shutdownContext, cancel := context.WithTimeout(context.Background(), hs.shutdownTimeout)
defer cancel()
if err := hs.s.Shutdown(shutdownContext); err != nil {
hs.onClose <- err
Expand Down
15 changes: 13 additions & 2 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ethereum

import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -215,6 +216,11 @@ func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error
}

func (e *Ethereum) ensureSubscriptions() error {
// Include a hash of the instance path in the subscription, so if we ever point at a different
// contract configuration, we re-subscribe from block 0.
// We don't need full strength hashing, so just use the first 16 chars for readability.
instanceUniqueHash := hex.EncodeToString(sha256.New().Sum([]byte(e.instancePath)))[0:16]

for eventType, subDesc := range requiredSubscriptions {

var existingSubs []*subscription
Expand All @@ -224,15 +230,20 @@ func (e *Ethereum) ensureSubscriptions() error {
}

var sub *subscription
subName := fmt.Sprintf("%s_%s", eventType, instanceUniqueHash)
for _, s := range existingSubs {
if s.Name == eventType {
if s.Name == subName ||
/* Check for the plain name we used to use originally, before adding uniqueness qualifier.
If one of these very early environments needed a new subscription, the existing one would need to
be deleted manually. */
s.Name == eventType {
sub = s
}
}

if sub == nil {
newSub := subscription{
Name: eventType,
Name: subName,
Description: subDesc,
Stream: e.initInfo.stream.ID,
FromBlock: "0",
Expand Down
2 changes: 1 addition & 1 deletion internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestInitAllExistingStreams(t *testing.T) {
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{
{ID: "sub12345", Name: "BatchPin"},
{ID: "sub12345", Name: "BatchPin_2f696e7374616e63" /* this is the subname for our combo of instance path and BatchPin */},
}))

resetConf()
Expand Down
39 changes: 17 additions & 22 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,9 @@ func MergeConfig(configRecords []*fftypes.ConfigRecord) error {
return nil
}

var root = &configPrefix{
keys: map[string]bool{}, // All keys go here, including those defined in sub prefixies
}
var knownKeys = map[string]bool{} // All keys go here, including those defined in sub prefixies
var keysMutex sync.Mutex
var root = &configPrefix{}

// ark adds a root key, used to define the keys that are used within the core
func rootKey(k string) RootKey {
Expand All @@ -405,10 +405,10 @@ func rootKey(k string) RootKey {

// GetKnownKeys gets the known keys
func GetKnownKeys() []string {
var keys []string
root.keysMutex.Lock()
defer root.keysMutex.Unlock()
for k := range root.keys {
keys := make([]string, 0, len(knownKeys))
keysMutex.Lock()
defer keysMutex.Unlock()
for k := range knownKeys {
keys = append(keys, k)
}
sort.Strings(keys)
Expand All @@ -417,9 +417,7 @@ func GetKnownKeys() []string {

// configPrefix is the main config structure passed to plugins, and used for root to wrap viper
type configPrefix struct {
prefix string
keys map[string]bool
keysMutex sync.Mutex
prefix string
}

// configPrefixArray is a point in the config that supports an array
Expand All @@ -435,15 +433,14 @@ func NewPluginConfig(prefix string) Prefix {
}
return &configPrefix{
prefix: prefix,
keys: root.keys,
}
}

func (c *configPrefix) prefixKey(k string) string {
c.keysMutex.Lock()
defer c.keysMutex.Unlock()
keysMutex.Lock()
defer keysMutex.Unlock()
key := c.prefix + k
if !c.keys[key] {
if !knownKeys[key] {
panic(fmt.Sprintf("Undefined configuration key '%s'", key))
}
return key
Expand All @@ -452,7 +449,6 @@ func (c *configPrefix) prefixKey(k string) string {
func (c *configPrefix) SubPrefix(suffix string) Prefix {
return &configPrefix{
prefix: c.prefix + suffix + ".",
keys: root.keys,
}
}

Expand All @@ -476,7 +472,6 @@ func (c *configPrefixArray) ArraySize() int {
func (c *configPrefixArray) ArrayEntry(i int) Prefix {
cp := &configPrefix{
prefix: c.base + fmt.Sprintf(".%d.", i),
keys: root.keys,
}
for knownKey, defValue := range c.defaults {
cp.AddKnownKey(knownKey, defValue...)
Expand All @@ -495,9 +490,9 @@ func (c *configPrefixArray) ArrayEntry(i int) Prefix {

func (c *configPrefixArray) AddKnownKey(k string, defValue ...interface{}) {
// Put a simulated key in the known keys array, to pop into the help info.
root.keysMutex.Lock()
defer root.keysMutex.Unlock()
root.keys[fmt.Sprintf("%s[].%s", c.base, k)] = true
keysMutex.Lock()
defer keysMutex.Unlock()
knownKeys[fmt.Sprintf("%s[].%s", c.base, k)] = true
c.defaults[k] = defValue
}

Expand All @@ -508,9 +503,9 @@ func (c *configPrefix) AddKnownKey(k string, defValue ...interface{}) {
} else if len(defValue) > 0 {
c.SetDefault(k, defValue)
}
c.keysMutex.Lock()
defer c.keysMutex.Unlock()
c.keys[key] = true
keysMutex.Lock()
defer keysMutex.Unlock()
knownKeys[key] = true
}

func (c *configPrefix) SetDefault(k string, defValue interface{}) {
Expand Down