Skip to content

Commit

Permalink
Certifier OSV: fixed emit func when polling (#1396)
Browse files Browse the repository at this point in the history
* Certifier OSV: fixed emit func when polling

Signed-off-by: mrizzi <mrizzi@redhat.com>

* Certifier OSV: flush the totalDocs

Signed-off-by: mrizzi <mrizzi@redhat.com>

* Certifier OSV: refactored ingestion goroutine

Signed-off-by: mrizzi <mrizzi@redhat.com>

* Certifier OSV: linter enhancements

Signed-off-by: mrizzi <mrizzi@redhat.com>

* Certifier OSV: enhanced for loop

Signed-off-by: mrizzi <mrizzi@redhat.com>

* Certifier OSV: fix atomic.LoadInt32

Signed-off-by: mrizzi <mrizzi@redhat.com>

* Certifier OSV: restored flag approach

Signed-off-by: mrizzi <mrizzi@redhat.com>

---------

Signed-off-by: mrizzi <mrizzi@redhat.com>
  • Loading branch information
mrizzi committed Oct 13, 2023
1 parent c225a8e commit 7c3b1b9
Showing 1 changed file with 73 additions and 13 deletions.
86 changes: 73 additions & 13 deletions cmd/guacone/cmd/osv.go
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -86,12 +87,76 @@ var osvCmd = &cobra.Command{
packageQuery := root_package.NewPackageQuery(gqlclient, 0)

totalNum := 0
var totalDocs []*processor.Document
gotErr := false
docChan := make(chan *processor.Document)
ingestionStop := make(chan bool, 1)
tickInterval := 30 * time.Second
ticker := time.NewTicker(tickInterval)

var gotErr int32
var wg sync.WaitGroup
ingestion := func() {
defer wg.Done()
var totalDocs []*processor.Document
const threshold = 1000
stop := false
for !stop {
select {
case <-ticker.C:
if len(totalDocs) > 0 {
err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, csubClient)
if err != nil {
stop = true
atomic.StoreInt32(&gotErr, 1)
logger.Errorf("unable to ingest documents: %v", err)
}
totalDocs = []*processor.Document{}
}
ticker.Reset(tickInterval)
case d := <-docChan:
totalNum += 1
totalDocs = append(totalDocs, d)
if len(totalDocs) >= threshold {
err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, csubClient)
if err != nil {
stop = true
atomic.StoreInt32(&gotErr, 1)
logger.Errorf("unable to ingest documents: %v", err)
}
totalDocs = []*processor.Document{}
ticker.Reset(tickInterval)
}
case <-ingestionStop:
stop = true
case <-ctx.Done():
return
}
}
for len(docChan) > 0 {
totalNum += 1
totalDocs = append(totalDocs, <-docChan)
if len(totalDocs) >= threshold {
err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, csubClient)
if err != nil {
atomic.StoreInt32(&gotErr, 1)
logger.Errorf("unable to ingest documents: %v", err)
}
totalDocs = []*processor.Document{}
}
}
if len(totalDocs) > 0 {
err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, csubClient)
if err != nil {
atomic.StoreInt32(&gotErr, 1)
logger.Errorf("unable to ingest documents: %v", err)
}
}
}
wg.Add(1)
go ingestion()

// Set emit function to go through the entire pipeline
emit := func(d *processor.Document) error {
totalNum += 1
totalDocs = append(totalDocs, d)
docChan <- d
return nil
}

Expand All @@ -102,13 +167,12 @@ var osvCmd = &cobra.Command{
return true
}
logger.Errorf("certifier ended with error: %v", err)
gotErr = true
atomic.StoreInt32(&gotErr, 1)
// process documents already captures
return true
}

ctx, cf := context.WithCancel(ctx)
var wg sync.WaitGroup
done := make(chan bool, 1)
wg.Add(1)
go func() {
Expand All @@ -123,19 +187,15 @@ var osvCmd = &cobra.Command{
select {
case s := <-sigs:
logger.Infof("Signal received: %s, shutting down gracefully\n", s.String())
cf()
case <-done:
logger.Infof("All certifiers completed")
}
ingestionStop <- true
wg.Wait()

err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, csubClient)
if err != nil {
gotErr = true
logger.Errorf("unable to ingest documents: %v", err)
}
cf()

if gotErr {
if atomic.LoadInt32(&gotErr) == 1 {
logger.Errorf("completed ingestion with errors")
} else {
logger.Infof("completed ingesting %v documents", totalNum)
Expand Down

0 comments on commit 7c3b1b9

Please sign in to comment.