-
Notifications
You must be signed in to change notification settings - Fork 31
/
main.go
111 lines (104 loc) · 3.15 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"syscall"
dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
_init "github.com/argoproj-labs/argo-dataflow/runner/init"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar"
"github.com/argoproj-labs/argo-dataflow/sdks/golang"
"github.com/argoproj-labs/argo-dataflow/shared/builtin"
"github.com/argoproj-labs/argo-dataflow/shared/builtin/cat"
"github.com/argoproj-labs/argo-dataflow/shared/builtin/dedupe"
"github.com/argoproj-labs/argo-dataflow/shared/builtin/expand"
"github.com/argoproj-labs/argo-dataflow/shared/builtin/filter"
"github.com/argoproj-labs/argo-dataflow/shared/builtin/flatten"
"github.com/argoproj-labs/argo-dataflow/shared/builtin/group"
_map "github.com/argoproj-labs/argo-dataflow/shared/builtin/map"
"github.com/argoproj-labs/argo-dataflow/shared/debug"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apimachinery/pkg/api/resource"
)
var logger = sharedutil.NewLogger()
func init() {
// https://mmcloughlin.com/posts/your-pprof-is-showing
http.DefaultServeMux = http.NewServeMux()
if debug.Enabled("pprof") {
logger.Info("enabling pprof debug endpoints - do not do this in production")
http.HandleFunc("/debug/pprof/", pprof.Index)
http.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
http.HandleFunc("/debug/pprof/profile", pprof.Profile)
http.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
http.HandleFunc("/debug/pprof/trace", pprof.Trace)
} else {
logger.Info("not enabling pprof debug endpoints")
}
}
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM)
defer stop()
start := func(f builtin.Process) error {
return golang.StartWithContext(ctx, f)
}
err := func() error {
switch os.Args[1] {
case "cat":
return start(cat.New())
case "dedupe":
x := os.Args[3]
maxSize, err := resource.ParseQuantity(x)
if err != nil {
return fmt.Errorf("failed to parse %q as resource quanity: %w", x, err)
}
p, err := dedupe.New(ctx, os.Args[2], maxSize)
if err != nil {
return err
}
http.Handle("/metrics", promhttp.Handler())
return start(p)
case "expand":
return start(expand.New())
case "filter":
p, err := filter.New(os.Args[2])
if err != nil {
return err
}
return start(p)
case "flatten":
return start(flatten.New())
case "group":
p, err := group.New(dfv1.PathGroups, os.Args[2], os.Args[3], dfv1.GroupFormat(os.Args[4]))
if err != nil {
return err
}
return start(p)
case "init":
return _init.Exec(ctx)
case "map":
p, err := _map.New(os.Args[2])
if err != nil {
return err
}
return start(p)
case "sidecar":
return sidecar.Exec(ctx)
default:
return fmt.Errorf("unknown comand")
}
}()
if errors.Is(err, context.Canceled) {
logger.Info("ignoring context cancelled error, expected")
} else if err != nil {
if err := ioutil.WriteFile("/dev/termination-log", []byte(err.Error()), 0o600); err != nil {
logger.Info(fmt.Sprintf("failed to write termination-log: %v", err))
}
panic(err)
}
}