From cb4e5f0b37b6907623f3516bdb7b0b7b4e7b1f8f Mon Sep 17 00:00:00 2001 From: Julie Vogelman Date: Fri, 4 Nov 2022 12:04:15 -0700 Subject: [PATCH 1/3] fix: if key/value store already exists use that Signed-off-by: Julie Vogelman --- eventbus/jetstream/sensor/sensor_jetstream.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/eventbus/jetstream/sensor/sensor_jetstream.go b/eventbus/jetstream/sensor/sensor_jetstream.go index 73b372c691..5e8473246a 100644 --- a/eventbus/jetstream/sensor/sensor_jetstream.go +++ b/eventbus/jetstream/sensor/sensor_jetstream.go @@ -55,12 +55,22 @@ func (stream *SensorJetstream) Initialize() error { } // create Key/Value store for this Sensor (seems to be okay to call this if it already exists) stream.keyValueStore, err = stream.MgmtConnection.JSContext.CreateKeyValue(&nats.KeyValueConfig{Bucket: stream.sensorName}) - if err != nil { + if err == nats.ErrStreamNameAlreadyInUse { + // get the existing one + stream.keyValueStore, err = stream.MgmtConnection.JSContext.KeyValue(stream.sensorName) + if err != nil { + errStr := fmt.Sprintf("failed to get existing Key/Value Store for sensor %s, err: %v", stream.sensorName, err) + stream.Logger.Error(errStr) + return err + } + stream.Logger.Infof("found existing K/V store for sensor %s, using that", stream.sensorName) + } else if err != nil { errStr := fmt.Sprintf("failed to Create Key/Value Store for sensor %s, err: %v", stream.sensorName, err) stream.Logger.Error(errStr) return err + } else { + stream.Logger.Infof("successfully created K/V store for sensor %s", stream.sensorName) } - stream.Logger.Infof("successfully created K/V store for sensor %s (if it doesn't already exist)", stream.sensorName) // Here we can take the sensor specification and clean up the K/V store so as to remove any old // Triggers for this Sensor that no longer exist and any old Dependencies (and also Drain any corresponding Connections) From 63e5a934c9a4db2d7c904736402239576e684234 Mon Sep 17 00:00:00 2001 From: Julie Vogelman Date: Fri, 4 Nov 2022 13:28:02 -0700 Subject: [PATCH 2/3] fix: lint Signed-off-by: Julie Vogelman --- eventbus/jetstream/sensor/sensor_jetstream.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/eventbus/jetstream/sensor/sensor_jetstream.go b/eventbus/jetstream/sensor/sensor_jetstream.go index 5e8473246a..228e9511f9 100644 --- a/eventbus/jetstream/sensor/sensor_jetstream.go +++ b/eventbus/jetstream/sensor/sensor_jetstream.go @@ -55,7 +55,8 @@ func (stream *SensorJetstream) Initialize() error { } // create Key/Value store for this Sensor (seems to be okay to call this if it already exists) stream.keyValueStore, err = stream.MgmtConnection.JSContext.CreateKeyValue(&nats.KeyValueConfig{Bucket: stream.sensorName}) - if err == nats.ErrStreamNameAlreadyInUse { + switch err { + case nats.ErrStreamNameAlreadyInUse: // get the existing one stream.keyValueStore, err = stream.MgmtConnection.JSContext.KeyValue(stream.sensorName) if err != nil { @@ -64,13 +65,12 @@ func (stream *SensorJetstream) Initialize() error { return err } stream.Logger.Infof("found existing K/V store for sensor %s, using that", stream.sensorName) - } else if err != nil { + case nil: errStr := fmt.Sprintf("failed to Create Key/Value Store for sensor %s, err: %v", stream.sensorName, err) stream.Logger.Error(errStr) return err - } else { - stream.Logger.Infof("successfully created K/V store for sensor %s", stream.sensorName) } + stream.Logger.Infof("successfully created/located K/V store for sensor %s", stream.sensorName) // Here we can take the sensor specification and clean up the K/V store so as to remove any old // Triggers for this Sensor that no longer exist and any old Dependencies (and also Drain any corresponding Connections) From adbda5a3a089a9ec9feac1badaedb651ca3b35ff Mon Sep 17 00:00:00 2001 From: Julie Vogelman Date: Tue, 8 Nov 2022 08:54:56 -0800 Subject: [PATCH 3/3] fix: reverse order of operations Signed-off-by: Julie Vogelman --- eventbus/jetstream/sensor/sensor_jetstream.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/eventbus/jetstream/sensor/sensor_jetstream.go b/eventbus/jetstream/sensor/sensor_jetstream.go index 228e9511f9..8a962e350d 100644 --- a/eventbus/jetstream/sensor/sensor_jetstream.go +++ b/eventbus/jetstream/sensor/sensor_jetstream.go @@ -53,22 +53,19 @@ func (stream *SensorJetstream) Initialize() error { if err != nil { return err } - // create Key/Value store for this Sensor (seems to be okay to call this if it already exists) - stream.keyValueStore, err = stream.MgmtConnection.JSContext.CreateKeyValue(&nats.KeyValueConfig{Bucket: stream.sensorName}) - switch err { - case nats.ErrStreamNameAlreadyInUse: - // get the existing one - stream.keyValueStore, err = stream.MgmtConnection.JSContext.KeyValue(stream.sensorName) + + // see if there's an existing one + stream.keyValueStore, _ = stream.MgmtConnection.JSContext.KeyValue(stream.sensorName) + if stream.keyValueStore == nil { + // create Key/Value store for this Sensor (seems to be okay to call this if it already exists) + stream.keyValueStore, err = stream.MgmtConnection.JSContext.CreateKeyValue(&nats.KeyValueConfig{Bucket: stream.sensorName}) if err != nil { - errStr := fmt.Sprintf("failed to get existing Key/Value Store for sensor %s, err: %v", stream.sensorName, err) + errStr := fmt.Sprintf("failed to Create Key/Value Store for sensor %s, err: %v", stream.sensorName, err) stream.Logger.Error(errStr) return err } + } else { stream.Logger.Infof("found existing K/V store for sensor %s, using that", stream.sensorName) - case nil: - errStr := fmt.Sprintf("failed to Create Key/Value Store for sensor %s, err: %v", stream.sensorName, err) - stream.Logger.Error(errStr) - return err } stream.Logger.Infof("successfully created/located K/V store for sensor %s", stream.sensorName)