Skip to content

Commit

Permalink
Extraction of log reading logic to defer its close (#417)
Browse files Browse the repository at this point in the history
* extracted log reader logic into own function so we can defer close it

* removed values from values.yaml
  • Loading branch information
1003n40 committed Aug 17, 2022
1 parent fbedfbf commit 11cd308
Showing 1 changed file with 73 additions and 41 deletions.
114 changes: 73 additions & 41 deletions pkg/vulnerabilityreport/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"

. "github.com/aquasecurity/trivy-operator/pkg/operator/predicate"
"go.uber.org/multierr"

"github.com/aquasecurity/trivy-operator/pkg/apis/aquasecurity/v1alpha1"
"github.com/aquasecurity/trivy-operator/pkg/exposedsecretreport"
Expand Down Expand Up @@ -428,68 +429,99 @@ func (r *WorkloadController) processCompleteScanJob(ctx context.Context, job *ba
var vulnerabilityReports []v1alpha1.VulnerabilityReport
var secretReports []v1alpha1.ExposedSecretReport

var merr error
for containerName, containerImage := range containerImages {
logsStream, err := r.LogsReader.GetLogsByJobAndContainerName(ctx, job, containerName)
vulnReports, secReports, err := r.processScanJobResults(ctx, job, containerName, containerImage, owner)
if err != nil {
if k8sapierror.IsNotFound(err) {
log.V(1).Info("Cached job must have been deleted")
return nil
}
if kube.IsPodControlledByJobNotFound(err) {
log.V(1).Info("Pod must have been deleted")
return r.deleteJob(ctx, job)
}
return fmt.Errorf("getting logs for pod %q: %w", job.Namespace+"/"+job.Name, err)
}
vulnReportData, secretReportData, err := r.Plugin.ParseReportData(r.PluginContext, containerImage, logsStream)
if err != nil {
return err
merr = multierr.Append(merr, err)
}
vulnerabilityReports = append(vulnerabilityReports, vulnReports...)
secretReports = append(secretReports, secReports...)
}
if merr != nil {
return merr
}

_ = logsStream.Close()
err = r.VulnerabilityReadWriter.Write(ctx, vulnerabilityReports)
if err != nil {
return err
}

err = r.ExposedSecretReadWriter.Write(ctx, secretReports)
if err != nil {
return err
}

log.V(1).Info("Deleting complete scan job", "owner", owner)
return r.deleteJob(ctx, job)
}

func (r *WorkloadController) processScanJobResults(ctx context.Context, job *batchv1.Job, containerName, containerImage string, owner client.Object) ([]v1alpha1.VulnerabilityReport, []v1alpha1.ExposedSecretReport, error) {
log := r.Logger.WithValues("job-results-processor", fmt.Sprintf("%s/%s", job.Namespace, job.Name))

var vulnerabilityReports []v1alpha1.VulnerabilityReport
var secretReports []v1alpha1.ExposedSecretReport

reportBuilder := NewReportBuilder(r.Client.Scheme()).
Controller(owner).
Container(containerName).
Data(vulnReportData).
PodSpecHash(podSpecHash)
podSpecHash, ok := job.Labels[trivyoperator.LabelResourceSpecHash]
if !ok {
return nil, nil, fmt.Errorf("expected label %s not set", trivyoperator.LabelResourceSpecHash)
}

if r.Config.VulnerabilityScannerReportTTL != nil {
reportBuilder.ReportTTL(r.Config.VulnerabilityScannerReportTTL)
logsStream, err := r.LogsReader.GetLogsByJobAndContainerName(ctx, job, containerName)
if err != nil {
if k8sapierror.IsNotFound(err) {
log.V(1).Info("Cached job must have been deleted")
return nil, nil, nil
}
if kube.IsPodControlledByJobNotFound(err) {
log.V(1).Info("Pod must have been deleted")
return nil, nil, r.deleteJob(ctx, job)
}
return nil, nil, fmt.Errorf("getting logs for pod %q: %w", job.Namespace+"/"+job.Name, err)
}

report, err := reportBuilder.Get()
defer func() {
err := logsStream.Close()
if err != nil {
return err
log.V(1).Error(err, "could not close log stream")
}
}()

secretReportBuilder := exposedsecretreport.NewReportBuilder(r.Client.Scheme()).
Controller(owner).
Container(containerName).
Data(secretReportData).
PodSpecHash(podSpecHash)
vulnReportData, secretReportData, err := r.Plugin.ParseReportData(r.PluginContext, containerImage, logsStream)
if err != nil {
return nil, nil, err
}

secretReport, err := secretReportBuilder.Get()
if err != nil {
return err
}
reportBuilder := NewReportBuilder(r.Client.Scheme()).
Controller(owner).
Container(containerName).
Data(vulnReportData).
PodSpecHash(podSpecHash)

vulnerabilityReports = append(vulnerabilityReports, report)
secretReports = append(secretReports, secretReport)
if r.Config.VulnerabilityScannerReportTTL != nil {
reportBuilder.ReportTTL(r.Config.VulnerabilityScannerReportTTL)
}

err = r.VulnerabilityReadWriter.Write(ctx, vulnerabilityReports)
report, err := reportBuilder.Get()
if err != nil {
return err
return nil, nil, err
}

err = r.ExposedSecretReadWriter.Write(ctx, secretReports)
secretReportBuilder := exposedsecretreport.NewReportBuilder(r.Client.Scheme()).
Controller(owner).
Container(containerName).
Data(secretReportData).
PodSpecHash(podSpecHash)

secretReport, err := secretReportBuilder.Get()
if err != nil {
return err
return nil, nil, err
}

log.V(1).Info("Deleting complete scan job", "owner", owner)
return r.deleteJob(ctx, job)
vulnerabilityReports = append(vulnerabilityReports, report)
secretReports = append(secretReports, secretReport)

return vulnerabilityReports, secretReports, nil
}

func (r *WorkloadController) processFailedScanJob(ctx context.Context, scanJob *batchv1.Job) error {
Expand Down

0 comments on commit 11cd308

Please sign in to comment.