Skip to content

Commit

Permalink
fix: add a waitgroup to coordinate shutdown (#238)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- followup to #230
- closes #228
- closes #237

## Short description of the changes

Use a sync.WaitGroup to ensure that both an assembler and an event
handler have stopped generating data before clean-up and process exit.

Behind the curtain, the assembler is using the libhoney client managed
within the event handler to ship assembler statistics. Closing that
libhoney client has moved from the case statement in Start() to a
separate Close() function on the event handler that is called after the
wait group count indicates that all our internal routines have ended
their loops.

How much of this interface is kept in an OTLP-flavored event handler
(and stats producer?) remains TBD.

## How to verify that this has the expected result

No more intermittent panics on process exit?
  • Loading branch information
robbkidd committed Sep 26, 2023
1 parent 26b3e5b commit 617dea4
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 19 deletions.
5 changes: 4 additions & 1 deletion assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package assemblers
import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -97,7 +98,9 @@ func NewTcpAssembler(config config.Config, httpEvents chan HttpEvent) tcpAssembl
}
}

func (h *tcpAssembler) Start(ctx context.Context) {
func (h *tcpAssembler) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

log.Info().Msg("Starting TCP assembler")
// Tick on the tightest loop. The flush timeout is the shorter of the two timeouts using this ticker.
// Tick even more frequently than the flush interval (4 is somewhat arbitrary)
Expand Down
4 changes: 3 additions & 1 deletion handlers/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package handlers

import (
"context"
"sync"

"github.com/honeycombio/honeycomb-network-agent/assemblers"
)

// EventHandler is an interface for event handlers
type EventHandler interface {
Start(ctx context.Context)
Start(ctx context.Context, wg *sync.WaitGroup)
Close()
handleEvent(event assemblers.HttpEvent)
}
13 changes: 10 additions & 3 deletions handlers/libhoney_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"fmt"
"sync"
"time"

"github.com/honeycombio/honeycomb-network-agent/assemblers"
Expand Down Expand Up @@ -31,20 +32,26 @@ func NewLibhoneyEventHandler(config config.Config, k8sClient *utils.CachedK8sCli

// Start starts the event handler and begins handling events from the events channel
// When the context is cancelled, the event handler will stop handling events
func (handler *libhoneyEventHandler) Start(ctx context.Context) {
func (handler *libhoneyEventHandler) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

var event assemblers.HttpEvent

for {
select {
case <-ctx.Done():
// close libhoney to ensure all events are sent
libhoney.Close()
return
case event = <-handler.eventsChan:
handler.handleEvent(event)
}
}
}

// Close closes the libhoney client, flushing any pending events.
func (handler *libhoneyEventHandler) Close() {
libhoney.Close()
}

// initLibhoney initializes libhoney and sets global fields
func initLibhoney(config config.Config, version string) func() {
libhoney.Init(libhoney.Config{
Expand Down
11 changes: 9 additions & 2 deletions handlers/libhoney_event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"net/http"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -72,17 +73,23 @@ func Test_libhoneyEventHandler_handleEvent(t *testing.T) {
// create event channel used to pass in events to the handler
eventsChannel := make(chan assemblers.HttpEvent, 1)

wgTest := sync.WaitGroup{} // used to wait for the event handler to finish

// create the event handler with default config, fake k8s client & event channel then start it
handler := NewLibhoneyEventHandler(config.Config{}, fakeCachedK8sClient, eventsChannel, "test")
go handler.Start(cancelableCtx)
wgTest.Add(1)
go handler.Start(cancelableCtx, &wgTest)

// Setup libhoney for testing, use mock transmission to retrieve events "sent"
// must be done after the event handler is created
mockTransmission := setupTestLibhoney(t)

// TEST ACTION: pass in httpEvent to handler
eventsChannel <- httpEvent
time.Sleep(10 * time.Millisecond)

done()
wgTest.Wait()
handler.Close()

// VALIDATE
events := mockTransmission.Events()
Expand Down
34 changes: 22 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"os/signal"
"sync"
"syscall"

"github.com/honeycombio/honeycomb-network-agent/assemblers"
Expand Down Expand Up @@ -55,31 +56,40 @@ func main() {
// create events channel for assembler to send events to and event handler to receive events from
eventsChannel := make(chan assemblers.HttpEvent, config.ChannelBufferSize)

// track our internal services
wgServices := sync.WaitGroup{}

// create event handler that sends events to backend (eg Honeycomb)
// TODO: move version outside of main package so it can be used directly in the eventHandler
eventHandler := handlers.NewLibhoneyEventHandler(config, cachedK8sClient, eventsChannel, Version)
go eventHandler.Start(ctx)
wgServices.Add(1)
go eventHandler.Start(ctx, &wgServices)

// create assembler that does packet capture and analysis
assembler := assemblers.NewTcpAssembler(config, eventsChannel)
go assembler.Start(ctx)
wgServices.Add(1)
go assembler.Start(ctx, &wgServices)

// listen for shutdown and interrupt signals
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)
shutdownChannel := make(chan bool, 1)
// channel to signal when agent process is ready to exit
shutdownNow := make(chan bool, 1)

// cleanup func that waits for for shutdown signal and stops the assembler, event handler and k8s client
// wait for shutdown signal then do the needful to prepare for process exit
go func() {
<-signalChannel
signals := make(chan os.Signal, 1)
// subscribe signals channel to interrupts: Interrupt (Ctrl+C), SIGINT (default kill), SIGTERM (k8s pod shutdown)
signal.Notify(signals, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-signals // wait for shutdown signals

log.Info().Msg("Agent is stopping. Cleaning up...")
// signal the k8sClient, event handler and assembler to stop via ctx.Done()
done()
<-shutdownChannel

done() // notify services to stop
wgServices.Wait() // wait for all coordinated services to stop
eventHandler.Close() // flush events before exit
shutdownNow <- true // signal main goroutine to exit
}()

log.Info().Msg("Agent is ready!")
<-shutdownChannel
<-shutdownNow
log.Info().Msg("Agent has stopped")
}

Expand Down

0 comments on commit 617dea4

Please sign in to comment.