Skip to content

Commit

Permalink
Merge branch 'add-context'
Browse files Browse the repository at this point in the history
  • Loading branch information
suryadana committed Sep 21, 2023
2 parents 7c4a773 + 4dfc7b9 commit 136a026
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 51 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ go 1.20
require (
github.com/asians-cloud/crowdsec v1.5.6
github.com/google/go-querystring v1.1.0
github.com/hashicorp/go-retryablehttp v0.7.4
github.com/prometheus/client_golang v1.15.1
github.com/sirupsen/logrus v1.9.2
golang.org/x/sync v0.2.0
gopkg.in/yaml.v2 v2.4.0
)

Expand All @@ -29,7 +31,6 @@ require (
github.com/goccy/go-yaml v1.11.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
github.com/hashicorp/go-version v1.2.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/magefile/mage v1.15.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-hclog v1.0.0 h1:bkKf0BeBXcSYa7f5Fyi9gMuQ8gNsxeiNpZjR6VxNZeo=
github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA=
github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/hashicorp/go-version v1.2.1 h1:zEfKbn2+PDgroKdiOzqiE8rsmLqU2uwi5PB5pBJ3TkI=
Expand Down Expand Up @@ -222,6 +223,8 @@ golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
4 changes: 3 additions & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ type SSEReader struct {
}

func newSSEReader(reader io.Reader) *SSEReader {
return &SSEReader{reader: reader}
return &SSEReader{
reader: reader,
}
}

func alpha(r byte) byte {
Expand Down
145 changes: 96 additions & 49 deletions stream_bouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"reflect"
"strings"
"time"
"net/http"
"reflect"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v2"

"github.com/asians-cloud/crowdsec/pkg/apiclient"
Expand Down Expand Up @@ -49,10 +50,10 @@ type StreamBouncer struct {
TickerIntervalDuration time.Duration
Stream chan *models.DecisionsStreamResponse
APIClient *apiclient.ApiClient
STREAMClient *Client
STREAMClient *Client
UserAgent string
Opts apiclient.DecisionsStreamOpts
maxBufferSize int
maxBufferSize int
}

// Config() fills the struct with configuration values from a file. It is not
Expand Down Expand Up @@ -140,12 +141,12 @@ func (b *StreamBouncer) Init() error {
return fmt.Errorf("api client init: %w", err)
}

b.STREAMClient = &Client{
URL: b.APIUrl,
APIKey: b.APIKey,
UserAgent: b.UserAgent,
maxBufferSize: 1 << 16,
}
b.STREAMClient = &Client{
URL: b.APIUrl,
APIKey: b.APIKey,
UserAgent: b.UserAgent,
maxBufferSize: 1 << 16,
}
return nil
}

Expand Down Expand Up @@ -201,63 +202,109 @@ func (b *StreamBouncer) Run(ctx context.Context) {
}

func (b *StreamBouncer) RunStream(ctx context.Context) {
// getDecoder creates or re-creates the connection to SSE as necessary
getDecoder := func(ctx context.Context) (*EventStreamReader, *http.Response, error) {
resp, err := b.STREAMClient.StreamDecisionConnect(ctx, b.Opts)
TotalLAPICalls.Inc()
if err != nil {
TotalLAPIError.Inc()
return nil, nil, err
return nil, nil, err
}
reader := NewEventStreamReader(resp.Body, b.STREAMClient.maxBufferSize)
reader := NewEventStreamReader(resp.Body, b.STREAMClient.maxBufferSize)
return reader, resp, err
}

reader, resp, err := getDecoder(ctx)
g, _ := errgroup.WithContext(ctx)

// this is the init case, so we have to call it once
reader, resp, err := getDecoder(ctx)
if err != nil {
log.Error(err)
return
} else if resp.StatusCode != 200 {
log.Errorf("Response status is %d", resp.StatusCode)
}
log.Errorf("Response status is %d", resp.StatusCode)
}

// Produce events
g.Go(func() error {
defer close(b.Stream)
defer resp.Body.Close()

for {
select {
case <-ctx.Done():
close(b.Stream)
resp.Body.Close()
return
default:
data := &models.DecisionsStreamResponse{
New: []*models.Decision{},
Deleted: []*models.Decision{},
}

event, err := reader.ReadEvent()

// Decode each JSON object
if err == io.EOF || reflect.DeepEqual(event, []byte("[]")) {
continue
} else if err != nil {
log.Error(err)
time.Sleep(500 * time.Millisecond)
reader, resp, err = getDecoder(ctx)
continue
}
for {
if evt, err := reader.ReadEvent(); err != nil {
if err == io.EOF {
continue
}

log.Errorf("Error while reading event, retrying later.. %v", err)
time.Sleep(500 * time.Millisecond)
reader, resp, err = getDecoder(ctx)
continue
} else {
if reflect.DeepEqual(evt, []byte("[]")) {
continue
}

err = json.Unmarshal(event, &data)
data := &models.DecisionsStreamResponse{
New: []*models.Decision{},
Deleted: []*models.Decision{},
}

if err != nil {
log.Error(err)
time.Sleep(500 * time.Millisecond)
reader, resp, err = getDecoder(ctx)
continue
}
err := json.Unmarshal(evt, &data)
if err != nil {
log.Errorf("Error while parsing event, retrying later.. %v")
time.Sleep(500 * time.Millisecond)
reader, resp, err = getDecoder(ctx)
continue
}

log.Info("Recieved data: ", data)

b.Stream <- data
select {
case <-ctx.Done():
return ctx.Err()
case b.Stream <- data:
}
}
}
}
})

g.Wait()

// for {
// select {
// case <-ctx.Done():
// close(b.Stream)
// resp.Body.Close()
// return
// default:
// data := &models.DecisionsStreamResponse{
// New: []*models.Decision{},
// Deleted: []*models.Decision{},
// }

// event, err := reader.ReadEvent()

// // Decode each JSON object
// if err == io.EOF || reflect.DeepEqual(event, []byte("[]")) {
// continue
// } else if err != nil {
// log.Error(err)
// time.Sleep(500 * time.Millisecond)
// reader, resp, err = getDecoder(ctx)
// continue
// }

// err = json.Unmarshal(event, &data)

// if err != nil {
// log.Error(err)
// time.Sleep(500 * time.Millisecond)
// reader, resp, err = getDecoder(ctx)
// continue
// }

// log.Info("Recieved data: ", data)

// b.Stream <- data
// }
// }
}

0 comments on commit 136a026

Please sign in to comment.