Skip to content

Commit

Permalink
fix: Use different clientID for each EventBus reconnection. Fixes #1055
Browse files Browse the repository at this point in the history
… (#1061)

* fix: Use differet clientID for each EventBus reconnection. Fixes #1055

Signed-off-by: Derek Wang <whynowy@gmail.com>

* comment

Signed-off-by: Derek Wang <whynowy@gmail.com>

* tiny

Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Feb 9, 2021
1 parent 714b465 commit bf63a4d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ENTRYPOINT [ "/bin/eventsource" ]
####################################################################################################
# sensor
####################################################################################################
FROM alpine as sensor
FROM alpine:3.12.3 as sensor
RUN apk update && apk upgrade && \
apk add --no-cache git

Expand Down
18 changes: 17 additions & 1 deletion eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -262,7 +263,8 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error {
logger := logging.FromContext(ctx).Desugar()
logger.Info("Starting event source server...")
servers := GetEventingServers(e.eventSource)
driver, err := eventbus.GetDriver(ctx, *e.eventBusConfig, e.eventBusSubject, strings.ReplaceAll(e.hostname, ".", "_"))
clientID := generateClientID(e.hostname)
driver, err := eventbus.GetDriver(ctx, *e.eventBusConfig, e.eventBusSubject, clientID)
if err != nil {
logger.Error("failed to get eventbus driver", zap.Error(err))
return err
Expand Down Expand Up @@ -293,6 +295,13 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error {
case <-ticker.C:
if e.eventBusConn == nil || e.eventBusConn.IsClosed() {
logger.Info("NATS connection lost, reconnecting...")
// Regenerate the client ID to avoid the issue that NAT server still thinks the client is alive.
clientID := generateClientID(e.hostname)
driver, err := eventbus.GetDriver(cctx, *e.eventBusConfig, e.eventBusSubject, clientID)
if err != nil {
logger.Error("failed to get eventbus driver during reconnection", zap.Error(err))
continue
}
e.eventBusConn, err = driver.Connect()
if err != nil {
logger.Error("failed to reconnect to eventbus", zap.Error(err))
Expand Down Expand Up @@ -382,3 +391,10 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error {
}
}
}

func generateClientID(hostname string) string {
s1 := rand.NewSource(time.Now().UnixNano())
r1 := rand.New(s1)
clientID := fmt.Sprintf("client-%s-%v", strings.ReplaceAll(hostname, ".", "_"), r1.Intn(1000))
return clientID
}
11 changes: 9 additions & 2 deletions sensors/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context) error {
group, clientID := sensorCtx.getGroupAndClientID(depExpression)
ebDriver, err := eventbus.GetDriver(cctx, *sensorCtx.EventBusConfig, sensorCtx.EventBusSubject, clientID)
if err != nil {
logger.Error("failed to get event bus driver", zap.Error(err))
logger.Error("failed to get eventbus driver", zap.Error(err))
return
}
triggerNames := []string{}
Expand Down Expand Up @@ -176,7 +176,7 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context) error {

err = ebDriver.SubscribeEventSources(cctx, conn, group, closeSubCh, depExpression, deps, filterFunc, actionFunc)
if err != nil {
logger.Error("failed to subscribe to event bus", zap.Any("clientID", clientID), zap.Error(err))
logger.Error("failed to subscribe to eventbus", zap.Any("clientID", clientID), zap.Error(err))
return
}
}()
Expand All @@ -196,6 +196,13 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context) error {
case <-ticker.C:
if conn == nil || conn.IsClosed() {
logger.Info("NATS connection lost, reconnecting...")
// Regenerate the client ID to avoid the issue that NAT server still thinks the client is alive.
_, clientID := sensorCtx.getGroupAndClientID(depExpression)
ebDriver, err := eventbus.GetDriver(cctx, *sensorCtx.EventBusConfig, sensorCtx.EventBusSubject, clientID)
if err != nil {
logger.Error("failed to get eventbus driver during reconnection", zap.Error(err))
continue
}
conn, err = ebDriver.Connect()
if err != nil {
logger.Error("failed to reconnect to eventbus", zap.Any("clientID", clientID), zap.Error(err))
Expand Down

0 comments on commit bf63a4d

Please sign in to comment.