From 1f9439fcb62a723224dad55aa941b86a90db8d37 Mon Sep 17 00:00:00 2001 From: Miguel Martinez Trivino Date: Thu, 15 Jun 2023 14:18:17 +0200 Subject: [PATCH] feat(extensions sdk): add statement and predicate context Signed-off-by: Miguel Martinez Trivino --- .../core/ociregistry/v1/ociregistry.go | 4 +- .../extensions/core/smtp/v1/extension.go | 15 ++----- app/controlplane/extensions/sdk/v1/fanout.go | 9 +++- .../internal/dispatcher/dispatcher.go | 42 +++++++++++++------ 4 files changed, 43 insertions(+), 27 deletions(-) diff --git a/app/controlplane/extensions/core/ociregistry/v1/ociregistry.go b/app/controlplane/extensions/core/ociregistry/v1/ociregistry.go index e9a9e4ed2..5486594ef 100644 --- a/app/controlplane/extensions/core/ociregistry/v1/ociregistry.go +++ b/app/controlplane/extensions/core/ociregistry/v1/ociregistry.go @@ -179,7 +179,7 @@ func (i *Integration) Execute(ctx context.Context, req *sdk.ExecutionRequest) er i.Logger.Infow("msg", "Uploading attestation", "repo", registrationConfig.Repository, "workflowID", req.WorkflowID) // Perform the upload of the json marshalled attestation - jsonContent, err := json.Marshal(req.Input.DSSEnvelope) + jsonContent, err := json.Marshal(req.Input.Attestation.Envelope) if err != nil { return fmt.Errorf("marshaling the envelope: %w", err) } @@ -206,7 +206,7 @@ func validateExecuteRequest(req *sdk.ExecutionRequest) error { return errors.New("execution input not received") } - if req.Input.DSSEnvelope == nil { + if req.Input.Attestation == nil { return errors.New("execution input invalid, the envelope is empty") } diff --git a/app/controlplane/extensions/core/smtp/v1/extension.go b/app/controlplane/extensions/core/smtp/v1/extension.go index 6334e31e6..36a8af4b3 100644 --- a/app/controlplane/extensions/core/smtp/v1/extension.go +++ b/app/controlplane/extensions/core/smtp/v1/extension.go @@ -24,7 +24,6 @@ import ( "github.com/chainloop-dev/chainloop/app/controlplane/extensions/sdk/v1" "github.com/go-kratos/kratos/v2/log" - "github.com/in-toto/in-toto-golang/in_toto" ) type Integration struct { @@ -160,16 +159,8 @@ func (i *Integration) Execute(_ context.Context, req *sdk.ExecutionRequest) erro return errors.New("invalid attachment configuration") } - // get the attestation - decodedPayload, err := req.Input.DSSEnvelope.DecodeB64Payload() - if err != nil { - return err - } - statement := &in_toto.Statement{} - if err := json.Unmarshal(decodedPayload, statement); err != nil { - return fmt.Errorf("un-marshaling predicate: %w", err) - } - jsonBytes, err := json.MarshalIndent(statement, "", " ") + // marshal the statement + jsonBytes, err := json.MarshalIndent(req.Input.Attestation.Statement, "", " ") if err != nil { return fmt.Errorf("error marshaling JSON: %w", err) } @@ -196,7 +187,7 @@ This email has been delivered via integration %s version %s. } func validateExecuteRequest(req *sdk.ExecutionRequest) error { - if req == nil || req.Input == nil || req.Input.DSSEnvelope == nil { + if req == nil || req.Input == nil || req.Input.Attestation == nil { return errors.New("invalid input") } diff --git a/app/controlplane/extensions/sdk/v1/fanout.go b/app/controlplane/extensions/sdk/v1/fanout.go index 4ec37d74a..e959a77f3 100644 --- a/app/controlplane/extensions/sdk/v1/fanout.go +++ b/app/controlplane/extensions/sdk/v1/fanout.go @@ -27,6 +27,7 @@ import ( "github.com/chainloop-dev/chainloop/internal/attestation/renderer/chainloop" "github.com/chainloop-dev/chainloop/internal/servicelogger" "github.com/go-kratos/kratos/v2/log" + "github.com/in-toto/in-toto-golang/in_toto" "github.com/invopop/jsonschema" schema_validator "github.com/santhosh-tekuri/jsonschema/v5" @@ -122,10 +123,16 @@ type ExecutionRequest struct { // An execute method will receive either the envelope or a material as input // The material will contain its content as well as the metadata type ExecuteInput struct { - DSSEnvelope *dsse.Envelope + Attestation *ExecuteAttestation Material *ExecuteMaterial } +type ExecuteAttestation struct { + Envelope *dsse.Envelope + Statement *in_toto.Statement + Predicate chainloop.NormalizablePredicate +} + type ExecuteMaterial struct { *chainloop.NormalizedMaterial // Content of the material already downloaded diff --git a/app/controlplane/internal/dispatcher/dispatcher.go b/app/controlplane/internal/dispatcher/dispatcher.go index 7c8164c3c..afadea8e2 100644 --- a/app/controlplane/internal/dispatcher/dispatcher.go +++ b/app/controlplane/internal/dispatcher/dispatcher.go @@ -159,11 +159,29 @@ func (d *FanOutDispatcher) Run(ctx context.Context, envelope *dsse.Envelope, org return fmt.Errorf("calculating dispatch queue: %w", err) } + // get the in_toto statement from the envelope if present + statement, err := chainloop.ExtractStatement(envelope) + if err != nil { + return fmt.Errorf("extracting statement: %w", err) + } + + // Iterate over the materials in the attestation and dispatch them to the integrations that are subscribed to them + predicate, err := chainloop.ExtractPredicate(envelope) + if err != nil { + return fmt.Errorf("extracting predicate: %w", err) + } + + var attestationInput = &sdk.ExecuteAttestation{ + Envelope: envelope, + Statement: statement, + Predicate: predicate, + } + // Send the envelope to the integrations that are subscribed to it for _, integration := range queue.attestations { req := generateRequest(integration) req.Input = &sdk.ExecuteInput{ - DSSEnvelope: envelope, + Attestation: attestationInput, } go func(backend sdk.FanOut) { @@ -171,12 +189,6 @@ func (d *FanOutDispatcher) Run(ctx context.Context, envelope *dsse.Envelope, org }(integration.backend) } - // Iterate over the materials in the attestation and dispatch them to the integrations that are subscribed to them - predicate, err := chainloop.ExtractPredicate(envelope) - if err != nil { - return err - } - for _, material := range predicate.GetMaterials() { // Find the backends that are subscribed to this material type, this includes // 1) Any integration backend that is listening to all material types @@ -190,6 +202,7 @@ func (d *FanOutDispatcher) Run(ctx context.Context, envelope *dsse.Envelope, org backends = append(backends, b...) } + // There are no backends that are subscribed to this material type if len(backends) == 0 { continue } @@ -210,14 +223,19 @@ func (d *FanOutDispatcher) Run(ctx context.Context, envelope *dsse.Envelope, org content = buf.Bytes() } + // Material information to be sent to the integration + var materialInput = &sdk.ExecuteMaterial{ + NormalizedMaterial: material, + Content: content, + } + // Execute the integration backends for _, b := range backends { req := generateRequest(b) req.Input = &sdk.ExecuteInput{ - Material: &sdk.ExecuteMaterial{ - NormalizedMaterial: material, - Content: content, - }, + // They receive both the attestation information and the specific material information + Material: materialInput, + Attestation: attestationInput, } go func() { @@ -237,7 +255,7 @@ func dispatch(ctx context.Context, backend sdk.FanOut, opts *sdk.ExecutionReques var inputType string switch { - case opts.Input.DSSEnvelope != nil: + case opts.Input.Attestation != nil: inputType = "DSSEnvelope" case opts.Input.Material != nil: inputType = fmt.Sprintf("Material:%s", opts.Input.Material.Type)