From 7df7c0f9c394e18ebd21c7992fcb7f8caa30ccdb Mon Sep 17 00:00:00 2001 From: jebonfig Date: Tue, 21 Dec 2021 15:27:11 -0500 Subject: [PATCH 1/3] Fix size function to get length of config list if its a map - which is possible if config is overriden via admin API and merged in with viper nested setter syntax Signed-off-by: jebonfig --- internal/config/config.go | 2 +- internal/config/config_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/internal/config/config.go b/internal/config/config.go index 3be23a2451..089c7b83c6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -479,7 +479,7 @@ func (c *configPrefix) Array() PrefixArray { func (c *configPrefixArray) ArraySize() int { val := viper.Get(c.base) vt := reflect.TypeOf(val) - if vt != nil && vt.Kind() == reflect.Slice { + if vt != nil && (vt.Kind() == reflect.Slice || vt.Kind() == reflect.Map) { return reflect.ValueOf(val).Len() } return 0 diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 9e2c61ae43..240af2c8d5 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -146,6 +146,31 @@ tokens: assert.Equal(t, []string{"arr1", "arr2"}, sally.GetStringSlice("key2")) } +func TestMapOfAdminOverridePlugins(t *testing.T) { + defer Reset() + + tokPlugins := NewPluginConfig("tokens").Array() + tokPlugins.AddKnownKey("firstkey") + tokPlugins.AddKnownKey("secondkey") + viper.SetConfigType("json") + err := viper.ReadConfig(strings.NewReader(`{ + "tokens": { + "0": { + "firstkey": "firstitemfirstkeyvalue", + "secondkey": "firstitemsecondkeyvalue" + }, + "1": { + "firstkey": "seconditemfirstkeyvalue", + "secondkey": "seconditemsecondkeyvalue" + } + } + }`)) + assert.NoError(t, err) + assert.Equal(t, 2, tokPlugins.ArraySize()) + assert.Equal(t, "firstitemfirstkeyvalue", tokPlugins.ArrayEntry(0).Get("firstkey")) + assert.Equal(t, "seconditemsecondkeyvalue", tokPlugins.ArrayEntry(1).Get("secondkey")) +} + func TestGetKnownKeys(t *testing.T) { knownKeys := GetKnownKeys() assert.NotEmpty(t, knownKeys) From 750a958eecbe01d0ea188f347de349786bb66daf Mon Sep 17 00:00:00 2001 From: jebonfig Date: Thu, 23 Dec 2021 09:54:14 -0500 Subject: [PATCH 2/3] Fix bug with missed events after a FireFly API reset - the websocket connection needs to be closed when event stream loop terminates to prevent the dangling connection consuming events Signed-off-by: jebonfig --- internal/blockchain/ethereum/ethereum.go | 1 + internal/blockchain/ethereum/ethereum_test.go | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index ced0be73b1..3713f6bdea 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -303,6 +303,7 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{ func (e *Ethereum) eventLoop() { defer close(e.closed) + defer e.wsconn.Close() l := log.L(e.ctx).WithField("role", "event-loop") ctx := log.WithLogger(e.ctx, l) ack, _ := json.Marshal(map[string]string{"type": "ack", "topic": e.topic}) diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index ea868e4f79..0915171c17 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -804,6 +804,7 @@ func TestEventLoopContextCancelled(t *testing.T) { r := make(<-chan []byte) wsm := e.wsconn.(*wsmocks.WSClient) wsm.On("Receive").Return(r) + wsm.On("Close").Return() e.closed = make(chan struct{}) e.eventLoop() // we're simply looking for it exiting } @@ -815,6 +816,7 @@ func TestEventLoopReceiveClosed(t *testing.T) { wsm := e.wsconn.(*wsmocks.WSClient) close(r) wsm.On("Receive").Return((<-chan []byte)(r)) + wsm.On("Close").Return() e.closed = make(chan struct{}) e.eventLoop() // we're simply looking for it exiting } @@ -827,6 +829,7 @@ func TestEventLoopSendClosed(t *testing.T) { close(r) wsm.On("Receive").Return((<-chan []byte)(r)) wsm.On("Send", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + wsm.On("Close").Return() e.closed = make(chan struct{}) e.eventLoop() // we're simply looking for it exiting } @@ -887,6 +890,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) { e.closed = make(chan struct{}) wsm.On("Receive").Return((<-chan []byte)(r)) + wsm.On("Close").Return() operationID := fftypes.NewUUID() data := []byte(`{ "_id": "6fb94fff-81d3-4094-567d-e031b1871694", From e878840eaa01c2586a81355340347ab35a9e4392 Mon Sep 17 00:00:00 2001 From: jebonfig Date: Wed, 29 Dec 2021 10:59:11 -0500 Subject: [PATCH 3/3] Close websocket connection for Fabric as well Signed-off-by: jebonfig --- internal/blockchain/ethereum/ethereum.go | 2 +- internal/blockchain/fabric/fabric.go | 1 + internal/blockchain/fabric/fabric_test.go | 4 ++++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 3713f6bdea..b249a937a4 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -302,8 +302,8 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{ } func (e *Ethereum) eventLoop() { - defer close(e.closed) defer e.wsconn.Close() + defer close(e.closed) l := log.L(e.ctx).WithField("role", "event-loop") ctx := log.WithLogger(e.ctx, l) ack, _ := json.Marshal(map[string]string{"type": "ack", "topic": e.topic}) diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index 0b4bc4f037..9450357eb8 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -342,6 +342,7 @@ func (f *Fabric) handleMessageBatch(ctx context.Context, messages []interface{}) } func (f *Fabric) eventLoop() { + defer f.wsconn.Close() defer close(f.closed) l := log.L(f.ctx).WithField("role", "event-loop") ctx := log.WithLogger(f.ctx, l) diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index efe749ed7a..2fcbcd9f6e 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -798,6 +798,7 @@ func TestEventLoopContextCancelled(t *testing.T) { r := make(<-chan []byte) wsm := e.wsconn.(*wsmocks.WSClient) wsm.On("Receive").Return(r) + wsm.On("Close").Return() e.closed = make(chan struct{}) e.eventLoop() // we're simply looking for it exiting } @@ -809,6 +810,7 @@ func TestEventLoopReceiveClosed(t *testing.T) { wsm := e.wsconn.(*wsmocks.WSClient) close(r) wsm.On("Receive").Return((<-chan []byte)(r)) + wsm.On("Close").Return() e.closed = make(chan struct{}) e.eventLoop() // we're simply looking for it exiting } @@ -820,6 +822,7 @@ func TestEventLoopSendClosed(t *testing.T) { wsm := e.wsconn.(*wsmocks.WSClient) close(r) wsm.On("Receive").Return((<-chan []byte)(r)) + wsm.On("Close").Return() wsm.On("Send", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) e.closed = make(chan struct{}) e.eventLoop() // we're simply looking for it exiting @@ -831,6 +834,7 @@ func TestEventLoopUnexpectedMessage(t *testing.T) { r := make(chan []byte) wsm := e.wsconn.(*wsmocks.WSClient) wsm.On("Receive").Return((<-chan []byte)(r)) + wsm.On("Close").Return() e.closed = make(chan struct{}) operationID := fftypes.NewUUID() data := []byte(`{