Skip to content

Commit

Permalink
remove errgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
acishan committed Sep 21, 2023
1 parent 62fa9c7 commit 5bce962
Showing 1 changed file with 35 additions and 35 deletions.
70 changes: 35 additions & 35 deletions stream_bouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v2"

"github.com/asians-cloud/crowdsec/pkg/apiclient"
Expand Down Expand Up @@ -214,7 +213,7 @@ func (b *StreamBouncer) RunStream(ctx context.Context) {
return reader, resp, err
}

g, _ := errgroup.WithContext(ctx)
// g, _ := errgroup.WithContext(ctx)

// this is the init case, so we have to call it once
reader, resp, err := getDecoder(ctx)
Expand All @@ -226,44 +225,45 @@ func (b *StreamBouncer) RunStream(ctx context.Context) {
}

// Produce events
g.Go(func() error {
defer close(b.Stream)
defer resp.Body.Close()

for {
if evt, err := reader.ReadEvent(); err != nil {
if err == io.EOF {
continue
}
// g.Go(func() error {
defer close(b.Stream)
defer resp.Body.Close()

log.Errorf("Error while reading event, retrying later.. %v", err)
time.Sleep(500 * time.Millisecond)
reader, resp, err = getDecoder(ctx)
for {
if evt, err := reader.ReadEvent(); err != nil {
if err == io.EOF {
continue
} else {
if reflect.DeepEqual(evt, []byte("[]")) {
continue
}
}

data := &models.DecisionsStreamResponse{
New: []*models.Decision{},
Deleted: []*models.Decision{},
}
log.Errorf("Error while reading event, retrying later.. %v", err)
time.Sleep(500 * time.Millisecond)
reader, resp, err = getDecoder(ctx)
continue
} else {
if reflect.DeepEqual(evt, []byte("[]")) {
continue
}

err := json.Unmarshal(evt, &data)
if err != nil {
log.Errorf("Error while parsing event, retrying later.. %v", err)
time.Sleep(500 * time.Millisecond)
reader, resp, err = getDecoder(ctx)
continue
}
data := &models.DecisionsStreamResponse{
New: []*models.Decision{},
Deleted: []*models.Decision{},
}

select {
case <-ctx.Done():
return ctx.Err()
case b.Stream <- data:
}
err := json.Unmarshal(evt, &data)
if err != nil {
log.Errorf("Error while parsing event, retrying later.. %v", err)
time.Sleep(500 * time.Millisecond)
reader, resp, err = getDecoder(ctx)
continue
}

select {
case <-ctx.Done():
log.Printf("Received context shutdown, returning..")
return
case b.Stream <- data:
}
}
})
}
// })
}

0 comments on commit 5bce962

Please sign in to comment.