Skip to content

Commit

Permalink
Pass along external resource IDs in task event execution metadata (fl…
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan committed Mar 24, 2021
1 parent 75499d9 commit 47a3c02
Show file tree
Hide file tree
Showing 24 changed files with 280 additions and 35 deletions.
2 changes: 1 addition & 1 deletion flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.0.0
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v0.18.17
github.com/flyteorg/flyteidl v0.18.25
github.com/flyteorg/flytestdlib v0.3.13
github.com/go-logr/zapr v0.4.0 // indirect
github.com/go-test/deep v1.0.7
Expand Down
7 changes: 5 additions & 2 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 h1:cQyO5JQ2iuHnEcF3v24kdDMsgh04RjyFPDtuvD6PCE0=
Expand Down Expand Up @@ -227,8 +228,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.17 h1:74pPZ9PzITuzq+CgjMPb9EcFI5bVkf8mM5m4xmmlTmY=
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+NajuHs8=
github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down Expand Up @@ -753,6 +754,7 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down Expand Up @@ -1214,6 +1216,7 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78=
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
Expand Down
4 changes: 4 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
structpb "github.com/golang/protobuf/ptypes/struct"
)
Expand Down Expand Up @@ -72,6 +74,8 @@ type TaskInfo struct {
OccurredAt *time.Time
// Custom Event information that the plugin would like to expose to the front-end
CustomInfo *structpb.Struct
// Metadata around how a task was executed
Metadata *event.TaskExecutionMetadata
}

func (t *TaskInfo) String() string {
Expand Down
34 changes: 34 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type PluginContext interface {

// Returns the max allowed dataset size that the outputwriter will accept
MaxDatasetSizeBytes() int64

// Returns a handle to the Task's execution metadata.
TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata
}

// Defines a simplified interface to author plugins for k8s resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytestdlib/errors"
Expand Down Expand Up @@ -94,6 +96,13 @@ func (p Plugin) Status(ctx context.Context, tCtx webapi.StatusContext) (phase co
},
},
OccurredAt: &tNow,
Metadata: &event.TaskExecutionMetadata{
ExternalResources: []*event.ExternalResourceInfo{
{
ExternalId: "abc",
},
},
},
}), nil
}

Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/plugins/array/awsbatch/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
}

// Always attempt to augment phase with task logs.
logLinks, err := GetTaskLinks(ctx, tCtx.TaskExecutionMetadata(), e.jobStore, pluginState)
subTaskDetails, err := GetTaskLinks(ctx, tCtx.TaskExecutionMetadata(), e.jobStore, pluginState)
if err != nil {
return core.UnknownTransition, err
}

logger.Infof(ctx, "Exiting handle with phase [%v]", pluginState.State.CurrentPhase)

// Determine transition information from the state
phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, pluginState.State, logLinks)
phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, pluginState.State, subTaskDetails.LogLinks, subTaskDetails.SubTaskIDs)
if err != nil {
return core.UnknownTransition, err
}
Expand Down
29 changes: 24 additions & 5 deletions flyteplugins/go/tasks/plugins/array/awsbatch/task_links.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,22 @@ func GetJobTaskLog(jobSize int, accountID, region, queue, jobID string) *idlCore
}
}

type SubTaskDetails struct {
LogLinks []*idlCore.TaskLog
SubTaskIDs []*string
}

func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata, jobStore *JobStore, state *State) (
[]*idlCore.TaskLog, error) {
SubTaskDetails, error) {

logLinks := make([]*idlCore.TaskLog, 0, 4)
subTaskIDs := make([]*string, 0)

if state.GetExternalJobID() == nil {
return logLinks, nil
return SubTaskDetails{
LogLinks: logLinks,
SubTaskIDs: subTaskIDs,
}, nil
}

// TODO: Add tasktemplate container config to job config
Expand All @@ -58,14 +67,20 @@ func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata
})

if err != nil {
return nil, errors.Wrapf(errors2.DownstreamSystemError, err, "Failed to retrieve a job from job store.")
return SubTaskDetails{
LogLinks: logLinks,
SubTaskIDs: subTaskIDs,
}, errors.Wrapf(errors2.DownstreamSystemError, err, "Failed to retrieve a job from job store.")
}

if job == nil {
logger.Debugf(ctx, "Job [%v] not found in jobs store. It might have been evicted. If reasonable, bump the max "+
"size of the LRU cache.", *state.GetExternalJobID())

return logLinks, nil
return SubTaskDetails{
LogLinks: logLinks,
SubTaskIDs: subTaskIDs,
}, nil
}

detailedArrayStatus := state.GetArrayStatus().Detailed
Expand All @@ -83,7 +98,11 @@ func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata
})
}
}
subTaskIDs = append(subTaskIDs, &subJob.ID)
}

return logLinks, nil
return SubTaskDetails{
LogLinks: logLinks,
SubTaskIDs: subTaskIDs,
}, nil
}
12 changes: 11 additions & 1 deletion flyteplugins/go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flytestdlib/errors"

"github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus"
Expand Down Expand Up @@ -168,14 +170,22 @@ func GetPhaseVersionOffset(currentPhase Phase, length int64) uint32 {
// Info fields will always be nil, because we're going to send log links individually. This simplifies our state
// handling as we don't have to keep an ever growing list of log links (our batch jobs can be 5000 sub-tasks, keeping
// all the log links takes up a lot of space).
func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog) (core.PhaseInfo, error) {
func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, subTaskIDs []*string) (core.PhaseInfo, error) {

phaseInfo := core.PhaseInfoUndefined
t := time.Now()
nowTaskInfo := &core.TaskInfo{
OccurredAt: &t,
Logs: logLinks,
}
if nowTaskInfo.Metadata == nil {
nowTaskInfo.Metadata = &event.TaskExecutionMetadata{}
}
for _, subTaskID := range subTaskIDs {
nowTaskInfo.Metadata.ExternalResources = append(nowTaskInfo.Metadata.ExternalResources, &event.ExternalResourceInfo{
ExternalId: *subTaskID,
})
}

switch p, version := state.GetPhase(); p {
case PhaseStart:
Expand Down
42 changes: 34 additions & 8 deletions flyteplugins/go/tasks/plugins/array/core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package core

import (
"context"
"fmt"
"testing"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins"
"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -48,14 +51,32 @@ func assertBitSetsEqual(t testing.TB, b1, b2 *bitarray.BitSet, len int) {
}
}

func assertTaskExecutionMetadata(t *testing.T, subTaskIDs []*string, metadata *event.TaskExecutionMetadata) {
assert.NotNil(t, metadata)
var externalResources = make([]*event.ExternalResourceInfo, len(subTaskIDs))
for i, subTaskID := range subTaskIDs {
externalResources[i] = &event.ExternalResourceInfo{
ExternalId: *subTaskID,
}
}
assert.True(t, proto.Equal(&event.TaskExecutionMetadata{
ExternalResources: externalResources,
}, metadata))
}

func TestMapArrayStateToPluginPhase(t *testing.T) {
ctx := context.Background()
var subTaskIDs = make([]*string, 3)
for i := 0; i < 3; i++ {
subTaskID := fmt.Sprintf("sub_task_%d", i)
subTaskIDs[i] = &subTaskID
}

t.Run("start", func(t *testing.T) {
s := State{
CurrentPhase: PhaseStart,
}
phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil)
phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs)
assert.NoError(t, err)
assert.Equal(t, core.PhaseInitializing, phaseInfo.Phase())
})
Expand All @@ -66,7 +87,7 @@ func TestMapArrayStateToPluginPhase(t *testing.T) {
PhaseVersion: 0,
}

phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil)
phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs)
assert.NoError(t, err)
assert.Equal(t, core.PhaseRunning, phaseInfo.Phase())
})
Expand All @@ -79,10 +100,11 @@ func TestMapArrayStateToPluginPhase(t *testing.T) {
ExecutionArraySize: 5,
}

phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil)
phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs)
assert.NoError(t, err)
assert.Equal(t, core.PhaseRunning, phaseInfo.Phase())
assert.Equal(t, uint32(368), phaseInfo.Version())
assertTaskExecutionMetadata(t, subTaskIDs, phaseInfo.Info().Metadata)
})

t.Run("write to discovery", func(t *testing.T) {
Expand All @@ -93,10 +115,11 @@ func TestMapArrayStateToPluginPhase(t *testing.T) {
ExecutionArraySize: 5,
}

phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil)
phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs)
assert.NoError(t, err)
assert.Equal(t, core.PhaseRunning, phaseInfo.Phase())
assert.Equal(t, uint32(548), phaseInfo.Version())
assertTaskExecutionMetadata(t, subTaskIDs, phaseInfo.Info().Metadata)
})

t.Run("success", func(t *testing.T) {
Expand All @@ -105,9 +128,10 @@ func TestMapArrayStateToPluginPhase(t *testing.T) {
PhaseVersion: 0,
}

phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil)
phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs)
assert.NoError(t, err)
assert.Equal(t, core.PhaseSuccess, phaseInfo.Phase())
assertTaskExecutionMetadata(t, subTaskIDs, phaseInfo.Info().Metadata)
})

t.Run("retryable failure", func(t *testing.T) {
Expand All @@ -116,9 +140,10 @@ func TestMapArrayStateToPluginPhase(t *testing.T) {
PhaseVersion: 0,
}

phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil)
phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs)
assert.NoError(t, err)
assert.Equal(t, core.PhaseRetryableFailure, phaseInfo.Phase())
assertTaskExecutionMetadata(t, subTaskIDs, phaseInfo.Info().Metadata)
})

t.Run("permanent failure", func(t *testing.T) {
Expand All @@ -127,9 +152,10 @@ func TestMapArrayStateToPluginPhase(t *testing.T) {
PhaseVersion: 0,
}

phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil)
phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs)
assert.NoError(t, err)
assert.Equal(t, core.PhasePermanentFailure, phaseInfo.Phase())
assertTaskExecutionMetadata(t, subTaskIDs, phaseInfo.Info().Metadata)
})

t.Run("All phases", func(t *testing.T) {
Expand All @@ -138,7 +164,7 @@ func TestMapArrayStateToPluginPhase(t *testing.T) {
CurrentPhase: p,
}

phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil)
phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs)
assert.NoError(t, err)
assert.NotEqual(t, core.PhaseUndefined, phaseInfo.Phase())
}
Expand Down
5 changes: 3 additions & 2 deletions flyteplugins/go/tasks/plugins/array/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
var nextState *arrayCore.State
var err error
var logLinks []*idlCore.TaskLog
var subTaskIDs []*string

switch p, _ := pluginState.GetPhase(); p {
case arrayCore.PhaseStart:
Expand All @@ -107,7 +108,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c

case arrayCore.PhaseCheckingSubTaskExecutions:

nextState, logLinks, err = LaunchAndCheckSubTasksState(ctx, tCtx, e.kubeClient, pluginConfig,
nextState, logLinks, subTaskIDs, err = LaunchAndCheckSubTasksState(ctx, tCtx, e.kubeClient, pluginConfig,
tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), pluginState)

case arrayCore.PhaseAssembleFinalOutput:
Expand Down Expand Up @@ -135,7 +136,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
}

// Determine transition information from the state
phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, nextState, logLinks)
phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, nextState, logLinks, subTaskIDs)
if err != nil {
return core.UnknownTransition, err
}
Expand Down
Loading

0 comments on commit 47a3c02

Please sign in to comment.