diff --git a/go.mod b/go.mod index 23b82bd..26966a7 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,10 @@ go 1.20 require ( github.com/asians-cloud/crowdsec v1.5.6 github.com/google/go-querystring v1.1.0 + github.com/hashicorp/go-retryablehttp v0.7.4 github.com/prometheus/client_golang v1.15.1 github.com/sirupsen/logrus v1.9.2 + golang.org/x/sync v0.2.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -29,7 +31,6 @@ require ( github.com/goccy/go-yaml v1.11.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/hashicorp/go-retryablehttp v0.7.4 // indirect github.com/hashicorp/go-version v1.2.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/magefile/mage v1.15.0 // indirect diff --git a/go.sum b/go.sum index cde5cdb..b8f0e8a 100644 --- a/go.sum +++ b/go.sum @@ -100,6 +100,7 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-hclog v1.0.0 h1:bkKf0BeBXcSYa7f5Fyi9gMuQ8gNsxeiNpZjR6VxNZeo= github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA= github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/hashicorp/go-version v1.2.1 h1:zEfKbn2+PDgroKdiOzqiE8rsmLqU2uwi5PB5pBJ3TkI= @@ -222,6 +223,8 @@ golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= +golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/reader.go b/reader.go index 8744d89..c0ca0c9 100644 --- a/reader.go +++ b/reader.go @@ -10,7 +10,9 @@ type SSEReader struct { } func newSSEReader(reader io.Reader) *SSEReader { - return &SSEReader{reader: reader} + return &SSEReader{ + reader: reader, + } } func alpha(r byte) byte { diff --git a/stream_bouncer.go b/stream_bouncer.go index 32b9a01..4cd7c65 100644 --- a/stream_bouncer.go +++ b/stream_bouncer.go @@ -5,14 +5,15 @@ import ( "encoding/json" "fmt" "io" + "net/http" "os" + "reflect" "strings" "time" - "net/http" - "reflect" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" "github.com/asians-cloud/crowdsec/pkg/apiclient" @@ -49,10 +50,10 @@ type StreamBouncer struct { TickerIntervalDuration time.Duration Stream chan *models.DecisionsStreamResponse APIClient *apiclient.ApiClient - STREAMClient *Client + STREAMClient *Client UserAgent string Opts apiclient.DecisionsStreamOpts - maxBufferSize int + maxBufferSize int } // Config() fills the struct with configuration values from a file. It is not @@ -140,12 +141,12 @@ func (b *StreamBouncer) Init() error { return fmt.Errorf("api client init: %w", err) } - b.STREAMClient = &Client{ - URL: b.APIUrl, - APIKey: b.APIKey, - UserAgent: b.UserAgent, - maxBufferSize: 1 << 16, - } + b.STREAMClient = &Client{ + URL: b.APIUrl, + APIKey: b.APIKey, + UserAgent: b.UserAgent, + maxBufferSize: 1 << 16, + } return nil } @@ -201,63 +202,109 @@ func (b *StreamBouncer) Run(ctx context.Context) { } func (b *StreamBouncer) RunStream(ctx context.Context) { + // getDecoder creates or re-creates the connection to SSE as necessary getDecoder := func(ctx context.Context) (*EventStreamReader, *http.Response, error) { resp, err := b.STREAMClient.StreamDecisionConnect(ctx, b.Opts) TotalLAPICalls.Inc() if err != nil { TotalLAPIError.Inc() - return nil, nil, err + return nil, nil, err } - reader := NewEventStreamReader(resp.Body, b.STREAMClient.maxBufferSize) + reader := NewEventStreamReader(resp.Body, b.STREAMClient.maxBufferSize) return reader, resp, err } - reader, resp, err := getDecoder(ctx) + g, _ := errgroup.WithContext(ctx) + // this is the init case, so we have to call it once + reader, resp, err := getDecoder(ctx) if err != nil { log.Error(err) return } else if resp.StatusCode != 200 { - log.Errorf("Response status is %d", resp.StatusCode) - } + log.Errorf("Response status is %d", resp.StatusCode) + } + // Produce events + g.Go(func() error { + defer close(b.Stream) + defer resp.Body.Close() - for { - select { - case <-ctx.Done(): - close(b.Stream) - resp.Body.Close() - return - default: - data := &models.DecisionsStreamResponse{ - New: []*models.Decision{}, - Deleted: []*models.Decision{}, - } - - event, err := reader.ReadEvent() - - // Decode each JSON object - if err == io.EOF || reflect.DeepEqual(event, []byte("[]")) { - continue - } else if err != nil { - log.Error(err) - time.Sleep(500 * time.Millisecond) - reader, resp, err = getDecoder(ctx) - continue - } + for { + if evt, err := reader.ReadEvent(); err != nil { + if err == io.EOF { + continue + } + + log.Errorf("Error while reading event, retrying later.. %v", err) + time.Sleep(500 * time.Millisecond) + reader, resp, err = getDecoder(ctx) + continue + } else { + if reflect.DeepEqual(evt, []byte("[]")) { + continue + } - err = json.Unmarshal(event, &data) + data := &models.DecisionsStreamResponse{ + New: []*models.Decision{}, + Deleted: []*models.Decision{}, + } - if err != nil { - log.Error(err) - time.Sleep(500 * time.Millisecond) - reader, resp, err = getDecoder(ctx) - continue - } + err := json.Unmarshal(evt, &data) + if err != nil { + log.Errorf("Error while parsing event, retrying later.. %v") + time.Sleep(500 * time.Millisecond) + reader, resp, err = getDecoder(ctx) + continue + } - log.Info("Recieved data: ", data) - - b.Stream <- data + select { + case <-ctx.Done(): + return ctx.Err() + case b.Stream <- data: + } + } } - } + }) + + g.Wait() + + // for { + // select { + // case <-ctx.Done(): + // close(b.Stream) + // resp.Body.Close() + // return + // default: + // data := &models.DecisionsStreamResponse{ + // New: []*models.Decision{}, + // Deleted: []*models.Decision{}, + // } + + // event, err := reader.ReadEvent() + + // // Decode each JSON object + // if err == io.EOF || reflect.DeepEqual(event, []byte("[]")) { + // continue + // } else if err != nil { + // log.Error(err) + // time.Sleep(500 * time.Millisecond) + // reader, resp, err = getDecoder(ctx) + // continue + // } + + // err = json.Unmarshal(event, &data) + + // if err != nil { + // log.Error(err) + // time.Sleep(500 * time.Millisecond) + // reader, resp, err = getDecoder(ctx) + // continue + // } + + // log.Info("Recieved data: ", data) + + // b.Stream <- data + // } + // } }