From f96da2b874cffcda7d05fc20484501f6e15da8ef Mon Sep 17 00:00:00 2001 From: jebonfig Date: Tue, 8 Mar 2022 15:58:53 -0500 Subject: [PATCH] Scope subscription check to the appropriate stream ID Signed-off-by: jebonfig --- internal/blockchain/ethereum/ethereum_test.go | 2 +- internal/blockchain/ethereum/eventstream.go | 4 ++-- internal/blockchain/fabric/eventstream.go | 2 +- internal/blockchain/fabric/fabric_test.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 8d81a0a389..5fdd2d9c80 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -238,7 +238,7 @@ func TestInitAllExistingStreams(t *testing.T) { httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ - {ID: "sub12345", Name: "BatchPin_30783132333435e3" /* this is the subname for our combo of instance path and BatchPin */}, + {ID: "sub12345", Stream: "es12345", Name: "BatchPin_30783132333435e3" /* this is the subname for our combo of instance path and BatchPin */}, })) httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345", httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}})) diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index a19b79d787..54cc6e7844 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -176,11 +176,11 @@ func (s *streamManager) ensureSubscription(ctx context.Context, instancePath, st subName := fmt.Sprintf("%s_%s", abi.Name, instanceUniqueHash) for _, s := range existingSubs { - if s.Name == subName || + if s.Stream == stream && (s.Name == subName || /* Check for the plain name we used to use originally, before adding uniqueness qualifier. If one of these very early environments needed a new subscription, the existing one would need to be deleted manually. */ - s.Name == abi.Name { + s.Name == abi.Name) { sub = s } } diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index 43965940db..2ad84c6fd3 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -153,7 +153,7 @@ func (s *streamManager) ensureSubscription(ctx context.Context, location *Locati subName := event for _, s := range existingSubs { - if s.Name == subName { + if s.Stream == stream && s.Name == subName { sub = s } } diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 4c42d2ba8a..392bbb1712 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -239,7 +239,7 @@ func TestInitAllExistingStreams(t *testing.T) { httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}})) httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{ - {ID: "sub12345", Name: "BatchPin"}, + {ID: "sub12345", Stream: "es12345", Name: "BatchPin"}, })) resetConf()