Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (i *Integration) Execute(ctx context.Context, req *sdk.ExecutionRequest) er
i.Logger.Infow("msg", "Uploading attestation", "repo", registrationConfig.Repository, "workflowID", req.WorkflowID)

// Perform the upload of the json marshalled attestation
jsonContent, err := json.Marshal(req.Input.DSSEnvelope)
jsonContent, err := json.Marshal(req.Input.Attestation.Envelope)
if err != nil {
return fmt.Errorf("marshaling the envelope: %w", err)
}
Expand All @@ -206,7 +206,7 @@ func validateExecuteRequest(req *sdk.ExecutionRequest) error {
return errors.New("execution input not received")
}

if req.Input.DSSEnvelope == nil {
if req.Input.Attestation == nil {
return errors.New("execution input invalid, the envelope is empty")
}

Expand Down
15 changes: 3 additions & 12 deletions app/controlplane/extensions/core/smtp/v1/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/chainloop-dev/chainloop/app/controlplane/extensions/sdk/v1"
"github.com/go-kratos/kratos/v2/log"
"github.com/in-toto/in-toto-golang/in_toto"
)

type Integration struct {
Expand Down Expand Up @@ -160,16 +159,8 @@ func (i *Integration) Execute(_ context.Context, req *sdk.ExecutionRequest) erro
return errors.New("invalid attachment configuration")
}

// get the attestation
decodedPayload, err := req.Input.DSSEnvelope.DecodeB64Payload()
if err != nil {
return err
}
statement := &in_toto.Statement{}
if err := json.Unmarshal(decodedPayload, statement); err != nil {
return fmt.Errorf("un-marshaling predicate: %w", err)
}
jsonBytes, err := json.MarshalIndent(statement, "", " ")
// marshal the statement
jsonBytes, err := json.MarshalIndent(req.Input.Attestation.Statement, "", " ")
if err != nil {
return fmt.Errorf("error marshaling JSON: %w", err)
}
Expand All @@ -196,7 +187,7 @@ This email has been delivered via integration %s version %s.
}

func validateExecuteRequest(req *sdk.ExecutionRequest) error {
if req == nil || req.Input == nil || req.Input.DSSEnvelope == nil {
if req == nil || req.Input == nil || req.Input.Attestation == nil {
return errors.New("invalid input")
}

Expand Down
9 changes: 8 additions & 1 deletion app/controlplane/extensions/sdk/v1/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/chainloop-dev/chainloop/internal/attestation/renderer/chainloop"
"github.com/chainloop-dev/chainloop/internal/servicelogger"
"github.com/go-kratos/kratos/v2/log"
"github.com/in-toto/in-toto-golang/in_toto"
"github.com/invopop/jsonschema"
schema_validator "github.com/santhosh-tekuri/jsonschema/v5"

Expand Down Expand Up @@ -122,10 +123,16 @@ type ExecutionRequest struct {
// An execute method will receive either the envelope or a material as input
// The material will contain its content as well as the metadata
type ExecuteInput struct {
DSSEnvelope *dsse.Envelope
Attestation *ExecuteAttestation
Material *ExecuteMaterial
}

type ExecuteAttestation struct {
Envelope *dsse.Envelope
Statement *in_toto.Statement
Predicate chainloop.NormalizablePredicate
}

type ExecuteMaterial struct {
*chainloop.NormalizedMaterial
// Content of the material already downloaded
Expand Down
42 changes: 30 additions & 12 deletions app/controlplane/internal/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,24 +159,36 @@ func (d *FanOutDispatcher) Run(ctx context.Context, envelope *dsse.Envelope, org
return fmt.Errorf("calculating dispatch queue: %w", err)
}

// get the in_toto statement from the envelope if present
statement, err := chainloop.ExtractStatement(envelope)
if err != nil {
return fmt.Errorf("extracting statement: %w", err)
}

// Iterate over the materials in the attestation and dispatch them to the integrations that are subscribed to them
predicate, err := chainloop.ExtractPredicate(envelope)
if err != nil {
return fmt.Errorf("extracting predicate: %w", err)
}

var attestationInput = &sdk.ExecuteAttestation{
Envelope: envelope,
Statement: statement,
Predicate: predicate,
}

// Send the envelope to the integrations that are subscribed to it
for _, integration := range queue.attestations {
req := generateRequest(integration)
req.Input = &sdk.ExecuteInput{
DSSEnvelope: envelope,
Attestation: attestationInput,
}

go func(backend sdk.FanOut) {
_ = dispatch(ctx, backend, req, d.log)
}(integration.backend)
}

// Iterate over the materials in the attestation and dispatch them to the integrations that are subscribed to them
predicate, err := chainloop.ExtractPredicate(envelope)
if err != nil {
return err
}

for _, material := range predicate.GetMaterials() {
// Find the backends that are subscribed to this material type, this includes
// 1) Any integration backend that is listening to all material types
Expand All @@ -190,6 +202,7 @@ func (d *FanOutDispatcher) Run(ctx context.Context, envelope *dsse.Envelope, org
backends = append(backends, b...)
}

// There are no backends that are subscribed to this material type
if len(backends) == 0 {
continue
}
Expand All @@ -210,14 +223,19 @@ func (d *FanOutDispatcher) Run(ctx context.Context, envelope *dsse.Envelope, org
content = buf.Bytes()
}

// Material information to be sent to the integration
var materialInput = &sdk.ExecuteMaterial{
NormalizedMaterial: material,
Content: content,
}

// Execute the integration backends
for _, b := range backends {
req := generateRequest(b)
req.Input = &sdk.ExecuteInput{
Material: &sdk.ExecuteMaterial{
NormalizedMaterial: material,
Content: content,
},
// They receive both the attestation information and the specific material information
Material: materialInput,
Attestation: attestationInput,
}

go func() {
Expand All @@ -237,7 +255,7 @@ func dispatch(ctx context.Context, backend sdk.FanOut, opts *sdk.ExecutionReques

var inputType string
switch {
case opts.Input.DSSEnvelope != nil:
case opts.Input.Attestation != nil:
inputType = "DSSEnvelope"
case opts.Input.Material != nil:
inputType = fmt.Sprintf("Material:%s", opts.Input.Material.Type)
Expand Down