Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 62 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -273,6 +273,9 @@ type PrefixArray interface {
type RootKey string

func Reset() {
keysMutex.Lock() // must only call viper directly here (as we already hold the lock)
defer keysMutex.Unlock()

viper.Reset()

// Set defaults
Expand Down Expand Up @@ -351,11 +354,14 @@ func Reset() {
viper.SetDefault(string(IdentityManagerCacheLimit), 100 /* items */)
viper.SetDefault(string(IdentityManagerCacheTTL), "1h")

i18n.SetLang(GetString(Lang))
i18n.SetLang(viper.GetString(string(Lang)))
}

// ReadConfig initializes the config
func ReadConfig(cfgFile string) error {
keysMutex.Lock() // must only call viper directly here (as we already hold the lock)
defer keysMutex.Unlock()

// Set precedence order for reading config location
viper.SetEnvPrefix("firefly")
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
Expand All @@ -377,6 +383,9 @@ func ReadConfig(cfgFile string) error {
}

func MergeConfig(configRecords []*fftypes.ConfigRecord) error {
keysMutex.Lock()
defer keysMutex.Unlock()

for _, c := range configRecords {
s := viper.New()
s.SetConfigType("json")
Expand Down Expand Up @@ -422,9 +431,10 @@ func rootKey(k string) RootKey {

// GetKnownKeys gets the known keys
func GetKnownKeys() []string {
keys := make([]string, 0, len(knownKeys))
keysMutex.Lock()
defer keysMutex.Unlock()

keys := make([]string, 0, len(knownKeys))
for k := range knownKeys {
keys = append(keys, k)
}
Expand Down Expand Up @@ -454,8 +464,7 @@ func NewPluginConfig(prefix string) Prefix {
}

func (c *configPrefix) prefixKey(k string) string {
keysMutex.Lock()
defer keysMutex.Unlock()
// Caller responsible for holding lock when calling
key := c.prefix + k
if !knownKeys[key] {
panic(fmt.Sprintf("Undefined configuration key '%s'", key))
Expand Down Expand Up @@ -506,9 +515,10 @@ func (c *configPrefixArray) ArrayEntry(i int) Prefix {
}

func (c *configPrefixArray) AddKnownKey(k string, defValue ...interface{}) {
// Put a simulated key in the known keys array, to pop into the help info.
keysMutex.Lock()
defer keysMutex.Unlock()

// Put a simulated key in the known keys array, to pop into the help info.
knownKeys[fmt.Sprintf("%s[].%s", c.base, k)] = true
c.defaults[k] = defValue
}
Expand All @@ -531,6 +541,9 @@ func (c *configPrefix) SetDefault(k string, defValue interface{}) {
}

func GetConfig() fftypes.JSONObject {
keysMutex.Lock()
defer keysMutex.Unlock()

conf := fftypes.JSONObject{}
_ = viper.Unmarshal(&conf)
return conf
Expand All @@ -541,6 +554,9 @@ func GetString(key RootKey) string {
return root.GetString(string(key))
}
func (c *configPrefix) GetString(key string) string {
keysMutex.Lock()
defer keysMutex.Unlock()

return viper.GetString(c.prefixKey(key))
}

Expand All @@ -549,6 +565,9 @@ func GetStringSlice(key RootKey) []string {
return root.GetStringSlice(string(key))
}
func (c *configPrefix) GetStringSlice(key string) []string {
keysMutex.Lock()
defer keysMutex.Unlock()

return viper.GetStringSlice(c.prefixKey(key))
}

Expand All @@ -557,6 +576,9 @@ func GetBool(key RootKey) bool {
return root.GetBool(string(key))
}
func (c *configPrefix) GetBool(key string) bool {
keysMutex.Lock()
defer keysMutex.Unlock()

return viper.GetBool(c.prefixKey(key))
}

Expand All @@ -565,6 +587,9 @@ func GetDuration(key RootKey) time.Duration {
return root.GetDuration(string(key))
}
func (c *configPrefix) GetDuration(key string) time.Duration {
keysMutex.Lock()
defer keysMutex.Unlock()

return fftypes.ParseToDuration(viper.GetString(c.prefixKey(key)))
}

Expand All @@ -573,14 +598,20 @@ func GetByteSize(key RootKey) int64 {
return root.GetByteSize(string(key))
}
func (c *configPrefix) GetByteSize(key string) int64 {
return fftypes.ParseToByteSize(c.GetString(key))
keysMutex.Lock()
defer keysMutex.Unlock()

return fftypes.ParseToByteSize(viper.GetString(c.prefixKey(key)))
}

// GetUint gets a configuration uint
func GetUint(key RootKey) uint {
return root.GetUint(string(key))
}
func (c *configPrefix) GetUint(key string) uint {
keysMutex.Lock()
defer keysMutex.Unlock()

return viper.GetUint(c.prefixKey(key))
}

Expand All @@ -589,6 +620,9 @@ func GetInt(key RootKey) int {
return root.GetInt(string(key))
}
func (c *configPrefix) GetInt(key string) int {
keysMutex.Lock()
defer keysMutex.Unlock()

return viper.GetInt(c.prefixKey(key))
}

Expand All @@ -597,6 +631,9 @@ func GetInt64(key RootKey) int64 {
return root.GetInt64(string(key))
}
func (c *configPrefix) GetInt64(key string) int64 {
keysMutex.Lock()
defer keysMutex.Unlock()

return viper.GetInt64(c.prefixKey(key))
}

Expand All @@ -605,6 +642,9 @@ func GetFloat64(key RootKey) float64 {
return root.GetFloat64(string(key))
}
func (c *configPrefix) GetFloat64(key string) float64 {
keysMutex.Lock()
defer keysMutex.Unlock()

return viper.GetFloat64(c.prefixKey(key))
}

Expand All @@ -613,6 +653,9 @@ func GetObject(key RootKey) fftypes.JSONObject {
return root.GetObject(string(key))
}
func (c *configPrefix) GetObject(key string) fftypes.JSONObject {
keysMutex.Lock()
defer keysMutex.Unlock()

return fftypes.JSONObject(viper.GetStringMap(c.prefixKey(key)))
}

Expand All @@ -621,6 +664,9 @@ func GetObjectArray(key RootKey) fftypes.JSONObjectArray {
return root.GetObjectArray(string(key))
}
func (c *configPrefix) GetObjectArray(key string) fftypes.JSONObjectArray {
keysMutex.Lock()
defer keysMutex.Unlock()

v, _ := fftypes.ToJSONObjectArray(viper.Get(c.prefixKey(key)))
return v
}
Expand All @@ -630,6 +676,9 @@ func Get(key RootKey) interface{} {
return root.Get(string(key))
}
func (c *configPrefix) Get(key string) interface{} {
keysMutex.Lock()
defer keysMutex.Unlock()

return viper.Get(c.prefixKey(key))
}

Expand All @@ -638,11 +687,17 @@ func Set(key RootKey, value interface{}) {
root.Set(string(key), value)
}
func (c *configPrefix) Set(key string, value interface{}) {
keysMutex.Lock()
defer keysMutex.Unlock()

viper.Set(c.prefixKey(key), value)
}

// Resolve gives the fully qualified path of a key
func (c *configPrefix) Resolve(key string) string {
keysMutex.Lock()
defer keysMutex.Unlock()

return c.prefixKey(key)
}

Expand Down
7 changes: 6 additions & 1 deletion internal/config/wsconfig/wsconfig.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -25,6 +25,7 @@ import (
const (
defaultIntialConnectAttempts = 5
defaultBufferSize = "16Kb"
defaultHeartbeatInterval = "30s" // up to a minute to detect a dead connection
)

const (
Expand All @@ -38,6 +39,8 @@ const (
WSConfigKeyInitialConnectAttempts = "ws.initialConnectAttempts"
// WSConfigKeyPath if set will define the path to connect to - allows sharing of the same URL between HTTP and WebSocket connection info
WSConfigKeyPath = "ws.path"
// WSConfigHeartbeatInterval is the frequency of ping/pong requests, and also used for the timeout to receive a response to the heartbeat
WSConfigHeartbeatInterval = "ws.heartbeatInterval"
)

// InitPrefix ensures the prefix is initialized for HTTP too, as WS and HTTP
Expand All @@ -48,6 +51,7 @@ func InitPrefix(prefix config.KeySet) {
prefix.AddKnownKey(WSConfigKeyReadBufferSize, defaultBufferSize)
prefix.AddKnownKey(WSConfigKeyInitialConnectAttempts, defaultIntialConnectAttempts)
prefix.AddKnownKey(WSConfigKeyPath)
prefix.AddKnownKey(WSConfigHeartbeatInterval, defaultHeartbeatInterval)
}

func GenerateConfigFromPrefix(prefix config.Prefix) *wsclient.WSConfig {
Expand All @@ -62,5 +66,6 @@ func GenerateConfigFromPrefix(prefix config.Prefix) *wsclient.WSConfig {
HTTPHeaders: prefix.GetObject(restclient.HTTPConfigHeaders),
AuthUsername: prefix.GetString(restclient.HTTPConfigAuthUsername),
AuthPassword: prefix.GetString(restclient.HTTPConfigAuthPassword),
HeartbeatInterval: prefix.GetDuration(WSConfigHeartbeatInterval),
}
}
17 changes: 9 additions & 8 deletions internal/events/websockets/websocket_connection.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -44,6 +44,7 @@ type websocketConnection struct {
connID string
sendMessages chan interface{}
senderDone chan struct{}
receiverDone chan struct{}
autoAck bool
started []*websocketStartedSub
inflight []*fftypes.EventDeliveryResponse
Expand All @@ -64,6 +65,7 @@ func newConnection(pCtx context.Context, ws *WebSockets, wsConn *websocket.Conn)
connID: connID,
sendMessages: make(chan interface{}),
senderDone: make(chan struct{}),
receiverDone: make(chan struct{}),
}
go wc.sendLoop()
go wc.receiveLoop()
Expand Down Expand Up @@ -104,11 +106,7 @@ func (wc *websocketConnection) sendLoop() {
defer wc.close()
for {
select {
case msg, ok := <-wc.sendMessages:
if !ok {
l.Debugf("Sender closing")
return
}
case msg := <-wc.sendMessages:
l.Tracef("Sending: %+v", msg)
writer, err := wc.wsConn.NextWriter(websocket.TextMessage)
if err == nil {
Expand All @@ -119,6 +117,9 @@ func (wc *websocketConnection) sendLoop() {
l.Errorf("Write failed on socket: %s", err)
return
}
case <-wc.receiverDone:
l.Debugf("Sender closing - receiver completed")
return
case <-wc.ctx.Done():
l.Debugf("Sender closing - context cancelled")
return
Expand All @@ -128,7 +129,7 @@ func (wc *websocketConnection) sendLoop() {

func (wc *websocketConnection) receiveLoop() {
l := log.L(wc.ctx)
defer close(wc.sendMessages)
defer close(wc.receiverDone)
for {
var msgData []byte
var msgHeader fftypes.WSClientActionBase
Expand Down Expand Up @@ -361,5 +362,5 @@ func (wc *websocketConnection) close() {

func (wc *websocketConnection) waitClose() {
<-wc.senderDone
<-wc.sendMessages
<-wc.receiverDone
}
Loading