Skip to content

Commit

Permalink
Update error handling on ingestion (#1832)
Browse files Browse the repository at this point in the history
Bulk emitter will return errors now. It continues to try to ingest all verbs
even if one fails. It stops ingestion on a single noun failure. Guacone files
command will log all errors and continue to ingest all files found even if one
errors. Summary should list errors. Guacingest service will log errors and
continue, it will ack messages even if errored.

Signed-off-by: Jeff Mendoza <jlm@jlm.name>
  • Loading branch information
jeffmendoza committed Apr 11, 2024
1 parent 0550c31 commit d0c51f5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 23 deletions.
5 changes: 4 additions & 1 deletion cmd/guacingest/cmd/ingest.go
Expand Up @@ -90,7 +90,10 @@ func ingest(cmd *cobra.Command, args []string) {
defer csubClient.Close()

emit := func(d *processor.Document) error {
return ingestor.Ingest(ctx, d, opts.graphqlEndpoint, csubClient)
if err := ingestor.Ingest(ctx, d, opts.graphqlEndpoint, csubClient); err != nil {
logger.Errorf("unable to ingest document %q : %v", d.SourceInformation.Source, err)
}
return nil
}

// Assuming that publisher and consumer are different processes.
Expand Down
16 changes: 4 additions & 12 deletions cmd/guacone/cmd/files.go
Expand Up @@ -122,10 +122,9 @@ var filesCmd = &cobra.Command{

emit := func(d *processor.Document) error {
totalNum += 1
err := ingestor.Ingest(ctx, d, opts.graphqlEndpoint, csubClient)

if err != nil {
if err := ingestor.Ingest(ctx, d, opts.graphqlEndpoint, csubClient); err != nil {
gotErr = true
filesWithErrors = append(filesWithErrors, d.SourceInformation.Source)
return fmt.Errorf("unable to ingest document: %w", err)
}
totalSuccess += 1
Expand All @@ -147,7 +146,8 @@ var filesCmd = &cobra.Command{
}

if gotErr {
logger.Fatalf("completed ingestion with error, %v of %v were successful - the following files did not ingest successfully: %v", totalSuccess, totalNum, printErrors(filesWithErrors))
logger.Fatalf("completed ingestion with error, %v of %v were successful - the following files did not ingest successfully: %v",
totalSuccess, totalNum, strings.Join(filesWithErrors, " "))
} else {
logger.Infof("completed ingesting %v documents of %v", totalSuccess, totalNum)
}
Expand Down Expand Up @@ -183,14 +183,6 @@ func validateFilesFlags(keyPath string, keyID string, graphqlEndpoint string, cs
return opts, nil
}

func printErrors(filesWithErrors []string) string {
var output string
for f := range filesWithErrors {
output += fmt.Sprintf("%v ", filesWithErrors[f])
}
return output
}

func init() {
set, err := cli.BuildFlags([]string{"verifier-key-path", "verifier-key-id"})
if err != nil {
Expand Down
25 changes: 20 additions & 5 deletions pkg/assembler/clients/helpers/bulk.go
Expand Up @@ -30,6 +30,7 @@ import (
func GetBulkAssembler(ctx context.Context, gqlclient graphql.Client) func([]assembler.AssemblerInput) error {
logger := logging.FromContext(ctx)
return func(preds []assembler.IngestPredicates) error {
var rvErr error
for _, p := range preds {

// Ingest Packages
Expand Down Expand Up @@ -114,12 +115,14 @@ func GetBulkAssembler(ctx context.Context, gqlclient graphql.Client) func([]asse
logger.Infof("assembling CertifyScorecard: %v", len(p.CertifyScorecard))
if err := ingestCertifyScorecards(ctx, gqlclient, p.CertifyScorecard, collectedIDorSrcInputs); err != nil {
logger.Errorf("ingestCertifyScorecards failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling IsDependency: %v", len(p.IsDependency))
isDependenciesIDs := make([]string, 0)
if ingestedIsDependenciesIDs, err := ingestIsDependencies(ctx, gqlclient, p.IsDependency, collectedIDorPkgInputs); err != nil {
logger.Errorf("ingestIsDependencies failed with error: %v", err)
rvErr = err
} else {
isDependenciesIDs = append(isDependenciesIDs, ingestedIsDependenciesIDs...)
}
Expand All @@ -128,56 +131,63 @@ func GetBulkAssembler(ctx context.Context, gqlclient graphql.Client) func([]asse
isOccurrencesIDs := make([]string, 0)
if ingestedIsOccurrencesIDs, err := ingestIsOccurrences(ctx, gqlclient, p.IsOccurrence, collectedIDorPkgInputs, collectedIDorArtInputs, collectedIDorSrcInputs); err != nil {
logger.Errorf("ingestIsOccurrences failed with error: %v", err)
rvErr = err
} else {
isOccurrencesIDs = append(isOccurrencesIDs, ingestedIsOccurrencesIDs...)
}

logger.Infof("assembling HasSLSA: %v", len(p.HasSlsa))
if err := ingestHasSLSAs(ctx, gqlclient, p.HasSlsa, collectedIDorArtInputs, collectedIDorMatInputs, collectedIDorBuilderInputs); err != nil {
logger.Errorf("ingestHasSLSAs failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling CertifyVuln: %v", len(p.CertifyVuln))
if err := ingestCertifyVulns(ctx, gqlclient, p.CertifyVuln, collectedIDorPkgInputs, collectedIDorVulnInputs); err != nil {
logger.Errorf("ingestCertifyVulns failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling VulnMetadata: %v", len(p.VulnMetadata))
if err := ingestVulnMetadatas(ctx, gqlclient, p.VulnMetadata, collectedIDorVulnInputs); err != nil {
logger.Errorf("ingestVulnMetadatas failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling VulnEqual: %v", len(p.VulnEqual))
if err := ingestVulnEquals(ctx, gqlclient, p.VulnEqual, collectedIDorVulnInputs); err != nil {
logger.Errorf("ingestVulnEquals failed with error: %v", err)

rvErr = err
}

logger.Infof("assembling HasSourceAt: %v", len(p.HasSourceAt))
if err := ingestHasSourceAts(ctx, gqlclient, p.HasSourceAt, collectedIDorPkgInputs, collectedIDorSrcInputs); err != nil {
return fmt.Errorf("ingestHasSourceAts failed with error: %w", err)
logger.Errorf("ingestHasSourceAts failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling CertifyBad: %v", len(p.CertifyBad))
if err := ingestCertifyBads(ctx, gqlclient, p.CertifyBad, collectedIDorPkgInputs, collectedIDorArtInputs, collectedIDorSrcInputs); err != nil {
logger.Errorf("ingestCertifyBads failed with error: %v", err)

rvErr = err
}

logger.Infof("assembling CertifyGood: %v", len(p.CertifyGood))
if err := ingestCertifyGoods(ctx, gqlclient, p.CertifyGood, collectedIDorPkgInputs, collectedIDorArtInputs, collectedIDorSrcInputs); err != nil {
logger.Errorf("ingestCertifyGoods failed with error: %v", err)

rvErr = err
}

logger.Infof("assembling PointOfContact: %v", len(p.PointOfContact))
if err := ingestPointOfContacts(ctx, gqlclient, p.PointOfContact, collectedIDorPkgInputs, collectedIDorArtInputs, collectedIDorSrcInputs); err != nil {
logger.Errorf("ingestPointOfContacts failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling HasMetadata: %v", len(p.HasMetadata))
if err := ingestBulkHasMetadata(ctx, gqlclient, p.HasMetadata, collectedIDorPkgInputs, collectedIDorArtInputs, collectedIDorSrcInputs); err != nil {
logger.Errorf("ingestBulkHasMetadata failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling HasSBOM: %v", len(p.HasSBOM))
Expand All @@ -188,29 +198,34 @@ func GetBulkAssembler(ctx context.Context, gqlclient graphql.Client) func([]asse
Occurrences: isOccurrencesIDs,
}, collectedIDorPkgInputs, collectedIDorArtInputs); err != nil {
logger.Errorf("ingestHasSBOMs failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling VEX : %v", len(p.Vex))
if err := ingestVEXs(ctx, gqlclient, p.Vex, collectedIDorPkgInputs, collectedIDorArtInputs, collectedIDorVulnInputs); err != nil {
logger.Errorf("ingestVEXs failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling HashEqual : %v", len(p.HashEqual))
if err := ingestHashEquals(ctx, gqlclient, p.HashEqual, collectedIDorArtInputs); err != nil {
logger.Errorf("ingestHashEquals failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling PkgEqual : %v", len(p.PkgEqual))
if err := ingestPkgEquals(ctx, gqlclient, p.PkgEqual, collectedIDorPkgInputs); err != nil {
logger.Errorf("ingestPkgEquals failed with error: %v", err)
rvErr = err
}

logger.Infof("assembling CertifyLegal : %v", len(p.CertifyLegal))
if err := ingestCertifyLegals(ctx, gqlclient, p.CertifyLegal, collectedIDorPkgInputs, collectedIDorSrcInputs, collectedIDorLicenseInputs); err != nil {
logger.Errorf("ingestCertifyLegals failed with error: %v", err)
rvErr = err
}
}
return nil
return rvErr
}
}

Expand Down
9 changes: 4 additions & 5 deletions pkg/ingestor/ingestor.go
Expand Up @@ -54,15 +54,14 @@ func Ingest(ctx context.Context, d *processor.Document, graphqlEndpoint string,
return fmt.Errorf("unable to ingest doc tree: %v", err)
}

err = collectSubEmitFunc(idstrings)
if err != nil {
if err := collectSubEmitFunc(idstrings); err != nil {
logger.Infof("unable to create entries in collectsub server, but continuing: %v", err)
}

err = assemblerFunc(predicates)
if err != nil {
return fmt.Errorf("unable to assemble graphs: %v", err)
if err := assemblerFunc(predicates); err != nil {
return fmt.Errorf("error assembling graphs for %q : %w", d.SourceInformation.Source, err)
}

t := time.Now()
elapsed := t.Sub(start)
logger.Infof("[%v] completed doc %+v", elapsed, d.SourceInformation)
Expand Down

0 comments on commit d0c51f5

Please sign in to comment.