Skip to content

Commit

Permalink
get proper start time (#11)
Browse files Browse the repository at this point in the history
* get proper start time

* fix type

* fix test

* add undefined
  • Loading branch information
haoyuez committed Oct 9, 2019
1 parent cb881dd commit 922af83
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 7 deletions.
105 changes: 105 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type CreateTaskExecutionModelInput struct {
Request *admin.TaskExecutionEventRequest
}

func addTaskSubmittedState(request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution,
func addTaskStartedState(request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution,
closure *admin.TaskExecutionClosure) error {
occurredAt, err := ptypes.Timestamp(request.Event.OccurredAt)
if err != nil {
Expand Down Expand Up @@ -99,8 +99,8 @@ func CreateTaskExecutionModel(input CreateTaskExecutionModelInput) (*models.Task
// Different tasks may report different phases as their first event.
// If the first event we receive for this execution is a valid
// non-terminal phase, mark the execution start time.
if eventPhase == core.TaskExecution_QUEUED || eventPhase == core.TaskExecution_RUNNING {
err := addTaskSubmittedState(input.Request, taskExecution, closure)
if eventPhase == core.TaskExecution_RUNNING {
err := addTaskStartedState(input.Request, taskExecution, closure)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -136,11 +136,19 @@ func UpdateTaskExecutionModel(request *admin.TaskExecutionEventRequest, taskExec
return errors.NewFlyteAdminErrorf(codes.Internal,
"failed to unmarshal task execution closure with error: %+v", err)
}
existingTaskPhase := taskExecutionModel.Phase
taskExecutionModel.Phase = request.Event.Phase.String()
taskExecutionModel.PhaseVersion = request.Event.PhaseVersion
taskExecutionClosure.Phase = request.Event.Phase
taskExecutionClosure.UpdatedAt = request.Event.OccurredAt
taskExecutionClosure.Logs = request.Event.Logs
if (existingTaskPhase == core.TaskExecution_QUEUED.String() || existingTaskPhase == core.TaskExecution_UNDEFINED.String()) && taskExecutionModel.Phase == core.TaskExecution_RUNNING.String() {
err = addTaskStartedState(request, taskExecutionModel, &taskExecutionClosure)
if err != nil {
return err
}
}

if common.IsTaskExecutionTerminal(request.Event.Phase) {
err := addTaskTerminalState(request, taskExecutionModel, &taskExecutionClosure)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var customInfo = ptypesStruct.Struct{
},
}

func TestAddTaskSubmittedState(t *testing.T) {
func TestAddTaskStartedState(t *testing.T) {
var startedAt = time.Now().UTC()
var startedAtProto, _ = ptypes.TimestampProto(startedAt)
request := admin.TaskExecutionEventRequest{
Expand All @@ -58,7 +58,7 @@ func TestAddTaskSubmittedState(t *testing.T) {
}
taskExecutionModel := models.TaskExecution{}
closure := &admin.TaskExecutionClosure{}
err := addTaskSubmittedState(&request, &taskExecutionModel, closure)
err := addTaskStartedState(&request, &taskExecutionModel, closure)
assert.Nil(t, err)

timestamp, err := ptypes.Timestamp(closure.StartedAt)
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) {

expectedClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_QUEUED,
StartedAt: taskEventOccurredAtProto,
StartedAt: nil,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) {
Phase: "QUEUED",
InputURI: "input uri",
Closure: expectedClosureBytes,
StartedAt: &taskEventOccurredAt,
StartedAt: nil,
TaskExecutionCreatedAt: &taskEventOccurredAt,
TaskExecutionUpdatedAt: &taskEventOccurredAt,
}, taskExecutionModel)
Expand Down

0 comments on commit 922af83

Please sign in to comment.