Skip to content

Commit

Permalink
Merge branch 'eng-3000-allow-users-to-opt-out-of-data-snapshots-2' in…
Browse files Browse the repository at this point in the history
…to eng-3000-allow-users-to-opt-out-of-data-snapshots-3
  • Loading branch information
likawind committed Jun 1, 2023
2 parents f335cfa + 383d03a commit b1a6102
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 31 deletions.
5 changes: 5 additions & 0 deletions src/golang/lib/models/shared/execution_timestamps.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type ExecutionTimestamps struct {
PendingAt *time.Time `json:"pending_at"`
RunningAt *time.Time `json:"running_at"`
FinishedAt *time.Time `json:"finished_at"`
DeletedAt *time.Time `json:"deleted_at"`
}

// ExecutionTimestampsJsonFieldByStatus returns the json_field
Expand Down Expand Up @@ -38,5 +39,9 @@ func ExecutionTimestampsJsonFieldByStatus(
return "registered_at", nil
}

if status == DeletedExecutionStatus {
return "deleted_at", nil
}

return "", errors.Newf("Execution status %s is not valid in timestamps", status)
}
100 changes: 74 additions & 26 deletions src/golang/lib/workflow/artifact/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package artifact
import (
"context"
"encoding/json"
"time"

"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/models"
Expand Down Expand Up @@ -37,7 +38,7 @@ type Artifact interface {

// PersistResult updates the artifact result in the database.
// Errors if InitializeResult() hasn't been called yet.
PersistResult(ctx context.Context, execState *shared.ExecutionState) error
PersistResult(ctx context.Context) error

// Finish is an end-of-lifecycle hook meant to do any final cleanup work.
Finish(ctx context.Context)
Expand All @@ -63,6 +64,13 @@ type Artifact interface {
// For now, it's primarily used for table artifact to limit
// the number of rows sent to client.
SampleContent(ctx context.Context) ([]byte, bool, error)

// SetExecState updates the execution state of the artifact.
// For now, it doesn't tries to 'merge' the incoming state with the current state
// e.g. if current exec state has a 'pending' status with 'PendingAt' timestamp,
// and the incoming state has a 'succeeded' status with 'SucceededAt' timestamp,
// the incoming state will completely replace the current state.
SetExecState(execState shared.ExecutionState)
}

type ArtifactImpl struct {
Expand All @@ -81,6 +89,7 @@ type ArtifactImpl struct {
shouldPersistContent bool

execPaths *utils.ExecPaths
execState *shared.ExecutionState

repo repos.Artifact
resultRepo repos.ArtifactResult
Expand Down Expand Up @@ -110,21 +119,29 @@ func NewArtifact(
return nil, errors.Newf("An Artifact signature must be provided for a cache-aware artifact.")
}

now := time.Now()
return &ArtifactImpl{
id: dbArtifact.ID,
signature: signature,
name: dbArtifact.Name,
description: dbArtifact.Description,
artifactType: dbArtifact.Type,
execPaths: execPaths,
repo: artifactRepo,
resultRepo: artifactResultRepo,
resultID: uuid.Nil,
resultMetadata: nil,
previewCacheManager: previewCacheManager,
resultsPersisted: false,
storageConfig: storageConfig,
db: db,
id: dbArtifact.ID,
signature: signature,
name: dbArtifact.Name,
description: dbArtifact.Description,
artifactType: dbArtifact.Type,
execPaths: execPaths,
repo: artifactRepo,
resultRepo: artifactResultRepo,
resultID: uuid.Nil,
resultMetadata: nil,
previewCacheManager: previewCacheManager,
resultsPersisted: false,
storageConfig: storageConfig,
shouldPersistContent: dbArtifact.ShouldPersist,
execState: &shared.ExecutionState{
Status: shared.RegisteredExecutionStatus,
Timestamps: &shared.ExecutionTimestamps{
RegisteredAt: &now,
},
},
db: db,
}, nil
}

Expand Down Expand Up @@ -157,6 +174,7 @@ func NewArtifactFromDBObjects(
execPaths: &utils.ExecPaths{
ArtifactContentPath: contentPath,
},
execState: &dbArtifactResult.ExecState.ExecutionState,
repo: artifactRepo,
resultRepo: artifactResultRepo,
resultID: artifactResultId,
Expand Down Expand Up @@ -218,14 +236,11 @@ func (a *ArtifactImpl) InitializeResult(ctx context.Context, dagResultID uuid.UU
return nil
}

func (a *ArtifactImpl) updateArtifactResultAfterComputation(
ctx context.Context,
execState *shared.ExecutionState,
) {
func (a *ArtifactImpl) updateArtifactResultAfterComputation(ctx context.Context) {
changes := map[string]interface{}{
models.ArtifactResultMetadata: nil,
models.ArtifactResultStatus: execState.Status,
models.ArtifactResultExecState: execState,
models.ArtifactResultStatus: a.execState.Status,
models.ArtifactResultExecState: a.execState,
}

metadataExists := utils.ObjectExistsInStorage(ctx, a.storageConfig, a.execPaths.ArtifactMetadataPath)
Expand Down Expand Up @@ -294,19 +309,20 @@ func (a *ArtifactImpl) updateArtifactTypeAfterComputation(
}
}

func (a *ArtifactImpl) PersistResult(ctx context.Context, execState *shared.ExecutionState) error {
func (a *ArtifactImpl) PersistResult(ctx context.Context) error {
if a.previewCacheManager != nil {
return errors.Newf("Artifact %s is cache-aware, so it cannot be persisted.", a.Name())
}

if a.resultsPersisted {
return errors.Newf("Artifact %s was already persisted!", a.name)
}
if !execState.Terminated() {
return errors.Newf("Artifact %s has unexpected execution state: %s", a.Name(), execState.Status)

if a.execState == nil {
return errors.Newf("Artifact %s doesn't have an execution state.", a.name)
}

a.updateArtifactResultAfterComputation(ctx, execState)
a.updateArtifactResultAfterComputation(ctx)
a.updateArtifactTypeAfterComputation(ctx)

a.resultsPersisted = true
Expand All @@ -315,8 +331,36 @@ func (a *ArtifactImpl) PersistResult(ctx context.Context, execState *shared.Exec

func (a *ArtifactImpl) DeleteContent(ctx context.Context) error {
storageObj := storage.NewStorage(a.storageConfig)

if a.Computed(ctx) {
return storageObj.Delete(ctx, a.execPaths.ArtifactContentPath)
now := time.Now()
if a.execState == nil {
a.SetExecState(shared.ExecutionState{
Status: shared.DeletedExecutionStatus,
Timestamps: &shared.ExecutionTimestamps{
DeletedAt: &now,
},
})
} else {
a.execState.Status = shared.DeletedExecutionStatus
a.execState.Timestamps.DeletedAt = &now
}

err := storageObj.Delete(ctx, a.execPaths.ArtifactContentPath)
if err != nil {
return err
}

_, err = a.resultRepo.Update(
ctx,
a.resultID,
map[string]interface{}{
models.ArtifactResultStatus: a.execState.Status,
models.ArtifactResultExecState: a.execState,
},
a.db,
)
return err
}

return nil
Expand Down Expand Up @@ -368,6 +412,10 @@ func (a *ArtifactImpl) GetContent(ctx context.Context) ([]byte, error) {
return content, nil
}

func (a *ArtifactImpl) SetExecState(execState shared.ExecutionState) {
a.execState = &execState
}

func (a *ArtifactImpl) SampleContent(ctx context.Context) ([]byte, bool, error) {
metadata, err := a.GetMetadata(ctx)
if err != nil {
Expand Down
7 changes: 2 additions & 5 deletions src/golang/lib/workflow/operator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,8 @@ func (bo *baseOperator) PersistResult(ctx context.Context) error {
artifactExecState.Status = shared.CanceledExecutionStatus
}

if artifactExecState.Status == shared.SucceededExecutionStatus && !outputArtifact.ShouldPersistContent() {
artifactExecState.Status = shared.DeletedExecutionStatus
}

err := outputArtifact.PersistResult(ctx, &artifactExecState)
outputArtifact.SetExecState(artifactExecState)
err := outputArtifact.PersistResult(ctx)
if err != nil {
log.Errorf("Error occurred when persisting artifact %s.", outputArtifact.Name())
}
Expand Down

0 comments on commit b1a6102

Please sign in to comment.