This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 53
/
monitor.go
55 lines (43 loc) · 1.56 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package webapi
import (
"context"
"github.com/flyteorg/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flytestdlib/cache"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
)
func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cache cache.AutoRefresh, state *State) (
newState *State, phaseInfo core.PhaseInfo, err error) {
newCacheItem := CacheItem{
State: *state,
}
item, err := cache.GetOrCreate(
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), newCacheItem)
if err != nil {
return nil, core.PhaseInfo{}, err
}
cacheItem, ok := item.(CacheItem)
if !ok {
logger.Errorf(ctx, "Error casting cache object into ExecutionState")
return nil, core.PhaseInfo{}, errors.Errorf(
errors.CacheFailed, "Failed to cast [%v]", cacheItem)
}
// If the cache has not syncd yet, just return
if cacheItem.Resource == nil {
return state, core.PhaseInfoRunning(0, nil), nil
}
newPhase, err := p.Status(ctx, newPluginContext(cacheItem.ResourceMeta, cacheItem.Resource, "", tCtx))
if err != nil {
return nil, core.PhaseInfoUndefined, err
}
newPluginPhase, err := ToPluginPhase(newPhase.Phase())
if err != nil {
return nil, core.PhaseInfoUndefined, err
}
if cacheItem.Phase != newPluginPhase {
logger.Infof(ctx, "Moving Phase for from %s to %s", cacheItem.Phase, newPluginPhase)
}
cacheItem.Phase = newPluginPhase
// If there were updates made to the state, we'll have picked them up automatically. Nothing more to do.
return &cacheItem.State, newPhase, nil
}