-
Notifications
You must be signed in to change notification settings - Fork 539
/
launcher.go
55 lines (46 loc) · 2.03 KB
/
launcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package webapi
import (
"context"
"time"
pluginErrors "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flyte/flytestdlib/cache"
"github.com/flyteorg/flyte/flytestdlib/logger"
)
func launch(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionContext, cache cache.AutoRefresh,
state *State) (newState *State, phaseInfo core.PhaseInfo, err error) {
rMeta, r, err := p.Create(ctx, tCtx)
if err != nil {
logger.Errorf(ctx, "Failed to create resource. Error: %v", err)
return state, core.PhaseInfoRetryableFailure(pluginErrors.TaskFailedWithError, err.Error(), nil), nil
}
// If the plugin also returned the created resource, check to see if it's already in a terminal state.
logger.Infof(ctx, "Created Resource Name [%s] and Meta [%v]", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), rMeta)
if r != nil {
phase, err := p.Status(ctx, newPluginContext(rMeta, r, "", tCtx))
if err != nil {
logger.Errorf(ctx, "Failed to check resource status. Error: %v", err)
return nil, core.PhaseInfo{}, err
}
if phase.Phase().IsTerminal() {
logger.Infof(ctx, "Resource has already terminated ID:[%s], Phase:[%s]",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), phase.Phase())
return state, phase, nil
}
}
// Store the created resource name, and update our state.
state.ResourceMeta = rMeta
state.Phase = PhaseResourcesCreated
state.PhaseVersion = 2
cacheItem := CacheItem{
State: *state,
}
// Also, add to the AutoRefreshCache so we start getting updates through background refresh.
_, err = cache.GetOrCreate(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), cacheItem)
if err != nil {
logger.Errorf(ctx, "Failed to add item to cache. Error: %v", err)
return nil, core.PhaseInfo{}, err
}
return state, core.PhaseInfoQueued(time.Now(), state.PhaseVersion, "launched"), nil
}