/
run.go
105 lines (85 loc) · 3.04 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package cleanupcmd
import (
"context"
"os"
"path/filepath"
"github.com/ccremer/clustercode/pkg/api/v1alpha1"
"github.com/ccremer/clustercode/pkg/internal/pipe"
internaltypes "github.com/ccremer/clustercode/pkg/internal/types"
pipeline "github.com/ccremer/go-command-pipeline"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type Command struct {
Log logr.Logger
SourceRootDir string
Namespace string
TaskName string
}
type commandContext struct {
context.Context
dependencyResolver pipeline.DependencyResolver[*commandContext]
kube client.Client
task *v1alpha1.Task
intermediaryFiles []string
}
// Execute runs the command and returns an error, if any.
func (c *Command) Execute(ctx context.Context) error {
pctx := &commandContext{
dependencyResolver: pipeline.NewDependencyRecorder[*commandContext](),
Context: ctx,
}
p := pipeline.NewPipeline[*commandContext]().WithBeforeHooks(pipe.DebugLogger[*commandContext](c.Log), pctx.dependencyResolver.Record)
p.WithSteps(
p.NewStep("create client", c.createClient),
p.NewStep("fetch task", c.fetchTask),
p.NewStep("list intermediary files", c.listIntermediaryFiles),
p.NewStep("delete intermediary files", c.deleteFiles),
p.NewStep("delete source file", c.deleteSourceFile),
)
return p.RunWithContext(pctx)
}
func (c *Command) createClient(ctx *commandContext) error {
kube, err := pipe.NewKubeClient(ctx)
ctx.kube = kube
return err
}
func (c *Command) fetchTask(ctx *commandContext) error {
ctx.dependencyResolver.MustRequireDependencyByFuncName(c.createClient)
log := c.getLogger()
task := &v1alpha1.Task{}
if err := ctx.kube.Get(ctx, types.NamespacedName{Namespace: c.Namespace, Name: c.TaskName}, task); err != nil {
return err
}
ctx.task = task
log.Info("fetched task")
return nil
}
func (c *Command) listIntermediaryFiles(ctx *commandContext) error {
ctx.dependencyResolver.MustRequireDependencyByFuncName(c.fetchTask)
intermediaryFiles, err := filepath.Glob(filepath.Join(c.SourceRootDir, internaltypes.IntermediateSubMountPath, ctx.task.Spec.TaskId.String()+"*"))
ctx.intermediaryFiles = intermediaryFiles
return err
}
func (c *Command) deleteFiles(ctx *commandContext) error {
ctx.dependencyResolver.MustRequireDependencyByFuncName(c.listIntermediaryFiles)
log := c.getLogger()
for _, file := range ctx.intermediaryFiles {
log.Info("deleting file", "file", file)
if err := os.Remove(file); err != nil {
log.Info("could not delete file, ignoring", "file", file, "error", err.Error())
}
}
return nil
}
func (c *Command) deleteSourceFile(ctx *commandContext) error {
ctx.dependencyResolver.MustRequireDependencyByFuncName(c.fetchTask)
log := c.getLogger()
sourceFile := filepath.Join(c.SourceRootDir, internaltypes.SourceSubMountPath, ctx.task.Spec.SourceUrl.GetPath())
log.Info("deleting file", "file", sourceFile)
return os.Remove(sourceFile)
}
func (c *Command) getLogger() logr.Logger {
return c.Log.WithValues("task_name", c.TaskName, "namespace", c.Namespace)
}