diff --git a/Gopkg.lock b/Gopkg.lock index b645586e24..9c65cf3783 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -421,6 +421,63 @@ revision = "3427c32cb71afc948325f299f040e53c1dd78979" version = "v1.2.0" +[[projects]] + branch = "master" + digest = "1:870f6bb378b4f7dfab0be83fbd1ad17c2a380e05c62b0a934b9d258ced24c002" + name = "github.com/lyft/flyteadmin" + packages = [ + "cmd/entrypoints", + "pkg/async/notifications", + "pkg/async/notifications/implementations", + "pkg/async/notifications/interfaces", + "pkg/async/notifications/mocks", + "pkg/async/schedule", + "pkg/async/schedule/aws", + "pkg/async/schedule/aws/interfaces", + "pkg/async/schedule/aws/mocks", + "pkg/async/schedule/interfaces", + "pkg/async/schedule/mocks", + "pkg/async/schedule/noop", + "pkg/clusterresource", + "pkg/common", + "pkg/common/mocks", + "pkg/config", + "pkg/data", + "pkg/data/implementations", + "pkg/data/interfaces", + "pkg/data/mocks", + "pkg/errors", + "pkg/executioncluster", + "pkg/executioncluster/mocks", + "pkg/flytek8s", + "pkg/manager/impl", + "pkg/manager/impl/executions", + "pkg/manager/impl/shared", + "pkg/manager/impl/testutils", + "pkg/manager/impl/util", + "pkg/manager/impl/validation", + "pkg/manager/interfaces", + "pkg/manager/mocks", + "pkg/repositories", + "pkg/repositories/config", + "pkg/repositories/errors", + "pkg/repositories/gormimpl", + "pkg/repositories/interfaces", + "pkg/repositories/mocks", + "pkg/repositories/models", + "pkg/repositories/transformers", + "pkg/rpc/adminservice", + "pkg/rpc/adminservice/util", + "pkg/runtime", + "pkg/runtime/interfaces", + "pkg/runtime/mocks", + "pkg/workflowengine/impl", + "pkg/workflowengine/interfaces", + "pkg/workflowengine/mocks", + ] + pruneopts = "UT" + revision = "cb881dd8b97d0183cbd89dc33231489f0b8f6b50" + [[projects]] digest = "1:4c02e347457c97ee8cfafb413554854fe236d715879ac0a43743017cd179de2e" name = "github.com/lyft/flyteidl" @@ -1127,6 +1184,54 @@ "github.com/jinzhu/gorm", "github.com/jinzhu/gorm/dialects/postgres", "github.com/lib/pq", + "github.com/lyft/flyteadmin/cmd/entrypoints", + "github.com/lyft/flyteadmin/pkg/async/notifications", + "github.com/lyft/flyteadmin/pkg/async/notifications/implementations", + "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces", + "github.com/lyft/flyteadmin/pkg/async/notifications/mocks", + "github.com/lyft/flyteadmin/pkg/async/schedule", + "github.com/lyft/flyteadmin/pkg/async/schedule/aws", + "github.com/lyft/flyteadmin/pkg/async/schedule/aws/interfaces", + "github.com/lyft/flyteadmin/pkg/async/schedule/aws/mocks", + "github.com/lyft/flyteadmin/pkg/async/schedule/interfaces", + "github.com/lyft/flyteadmin/pkg/async/schedule/mocks", + "github.com/lyft/flyteadmin/pkg/async/schedule/noop", + "github.com/lyft/flyteadmin/pkg/clusterresource", + "github.com/lyft/flyteadmin/pkg/common", + "github.com/lyft/flyteadmin/pkg/common/mocks", + "github.com/lyft/flyteadmin/pkg/config", + "github.com/lyft/flyteadmin/pkg/data", + "github.com/lyft/flyteadmin/pkg/data/implementations", + "github.com/lyft/flyteadmin/pkg/data/interfaces", + "github.com/lyft/flyteadmin/pkg/data/mocks", + "github.com/lyft/flyteadmin/pkg/errors", + "github.com/lyft/flyteadmin/pkg/executioncluster", + "github.com/lyft/flyteadmin/pkg/executioncluster/mocks", + "github.com/lyft/flyteadmin/pkg/flytek8s", + "github.com/lyft/flyteadmin/pkg/manager/impl", + "github.com/lyft/flyteadmin/pkg/manager/impl/executions", + "github.com/lyft/flyteadmin/pkg/manager/impl/shared", + "github.com/lyft/flyteadmin/pkg/manager/impl/testutils", + "github.com/lyft/flyteadmin/pkg/manager/impl/util", + "github.com/lyft/flyteadmin/pkg/manager/impl/validation", + "github.com/lyft/flyteadmin/pkg/manager/interfaces", + "github.com/lyft/flyteadmin/pkg/manager/mocks", + "github.com/lyft/flyteadmin/pkg/repositories", + "github.com/lyft/flyteadmin/pkg/repositories/config", + "github.com/lyft/flyteadmin/pkg/repositories/errors", + "github.com/lyft/flyteadmin/pkg/repositories/gormimpl", + "github.com/lyft/flyteadmin/pkg/repositories/interfaces", + "github.com/lyft/flyteadmin/pkg/repositories/mocks", + "github.com/lyft/flyteadmin/pkg/repositories/models", + "github.com/lyft/flyteadmin/pkg/repositories/transformers", + "github.com/lyft/flyteadmin/pkg/rpc/adminservice", + "github.com/lyft/flyteadmin/pkg/rpc/adminservice/util", + "github.com/lyft/flyteadmin/pkg/runtime", + "github.com/lyft/flyteadmin/pkg/runtime/interfaces", + "github.com/lyft/flyteadmin/pkg/runtime/mocks", + "github.com/lyft/flyteadmin/pkg/workflowengine/impl", + "github.com/lyft/flyteadmin/pkg/workflowengine/interfaces", + "github.com/lyft/flyteadmin/pkg/workflowengine/mocks", "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin", "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core", "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event", diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index 51a044d8e2..c5a5a2ca70 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -18,7 +18,7 @@ type CreateTaskExecutionModelInput struct { Request *admin.TaskExecutionEventRequest } -func addTaskSubmittedState(request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution, +func addTaskStartedState(request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution, closure *admin.TaskExecutionClosure) error { occurredAt, err := ptypes.Timestamp(request.Event.OccurredAt) if err != nil { @@ -99,8 +99,8 @@ func CreateTaskExecutionModel(input CreateTaskExecutionModelInput) (*models.Task // Different tasks may report different phases as their first event. // If the first event we receive for this execution is a valid // non-terminal phase, mark the execution start time. - if eventPhase == core.TaskExecution_QUEUED || eventPhase == core.TaskExecution_RUNNING { - err := addTaskSubmittedState(input.Request, taskExecution, closure) + if eventPhase == core.TaskExecution_RUNNING { + err := addTaskStartedState(input.Request, taskExecution, closure) if err != nil { return nil, err } @@ -136,11 +136,19 @@ func UpdateTaskExecutionModel(request *admin.TaskExecutionEventRequest, taskExec return errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal task execution closure with error: %+v", err) } + existingTaskPhase := taskExecutionModel.Phase taskExecutionModel.Phase = request.Event.Phase.String() taskExecutionModel.PhaseVersion = request.Event.PhaseVersion taskExecutionClosure.Phase = request.Event.Phase taskExecutionClosure.UpdatedAt = request.Event.OccurredAt taskExecutionClosure.Logs = request.Event.Logs + if (existingTaskPhase == core.TaskExecution_QUEUED.String() || existingTaskPhase == core.TaskExecution_UNDEFINED.String()) && taskExecutionModel.Phase == core.TaskExecution_RUNNING.String() { + err = addTaskStartedState(request, taskExecutionModel, &taskExecutionClosure) + if err != nil { + return err + } + } + if common.IsTaskExecutionTerminal(request.Event.Phase) { err := addTaskTerminalState(request, taskExecutionModel, &taskExecutionClosure) if err != nil { diff --git a/pkg/repositories/transformers/task_execution_test.go b/pkg/repositories/transformers/task_execution_test.go index fb521078f4..3795322a2c 100644 --- a/pkg/repositories/transformers/task_execution_test.go +++ b/pkg/repositories/transformers/task_execution_test.go @@ -47,7 +47,7 @@ var customInfo = ptypesStruct.Struct{ }, } -func TestAddTaskSubmittedState(t *testing.T) { +func TestAddTaskStartedState(t *testing.T) { var startedAt = time.Now().UTC() var startedAtProto, _ = ptypes.TimestampProto(startedAt) request := admin.TaskExecutionEventRequest{ @@ -58,7 +58,7 @@ func TestAddTaskSubmittedState(t *testing.T) { } taskExecutionModel := models.TaskExecution{} closure := &admin.TaskExecutionClosure{} - err := addTaskSubmittedState(&request, &taskExecutionModel, closure) + err := addTaskStartedState(&request, &taskExecutionModel, closure) assert.Nil(t, err) timestamp, err := ptypes.Timestamp(closure.StartedAt) @@ -140,7 +140,7 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) { expectedClosure := &admin.TaskExecutionClosure{ Phase: core.TaskExecution_QUEUED, - StartedAt: taskEventOccurredAtProto, + StartedAt: nil, CreatedAt: taskEventOccurredAtProto, UpdatedAt: taskEventOccurredAtProto, } @@ -169,7 +169,7 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) { Phase: "QUEUED", InputURI: "input uri", Closure: expectedClosureBytes, - StartedAt: &taskEventOccurredAt, + StartedAt: nil, TaskExecutionCreatedAt: &taskEventOccurredAt, TaskExecutionUpdatedAt: &taskEventOccurredAt, }, taskExecutionModel)