/
init.go
170 lines (149 loc) · 5.95 KB
/
init.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package init contains the harness initialization code defined by the FnAPI.
// It is a separate package to avoid flags in the harness package and must be
// imported by the runner to ensure the init hook is registered.
package init
import (
"context"
"encoding/json"
"flag"
"strings"
"time"
"fmt"
"os"
"runtime/debug"
"golang.org/x/exp/slices"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness"
// Import gcs filesystem so that it can be used to upload heap dumps.
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/syscallx"
)
var (
// These flags handle the invocation by the container boot code.
worker = flag.Bool("worker", false, "Whether binary is running in worker mode.")
id = flag.String("id", "", "Local identifier (required in worker mode).")
loggingEndpoint = flag.String("logging_endpoint", "", "Local logging gRPC endpoint (required in worker mode).")
controlEndpoint = flag.String("control_endpoint", "", "Local control gRPC endpoint (required in worker mode).")
//lint:ignore U1000 semiPersistDir flag is passed in through the boot container, will need to be removed later
semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional in worker mode).")
options = flag.String("options", "", "JSON-encoded pipeline options (required in worker mode).")
)
type exitMode int
const (
// Terminate means the hook should exit itself when the worker harness returns.
Terminate exitMode = iota
// Return means that the hook should return out, and allow the calling code to
// determine if and when the process exits.
// This may cause errors that caused worker failure to be ignored.
Return
)
var (
// ShutdownMode allows the runner to set how the worker harness should exit.
ShutdownMode = Terminate
)
func init() {
runtime.RegisterInit(hook)
}
// hook starts the harness, if in worker mode. Otherwise, is a no-op.
func hook() {
if !*worker {
return
}
// Extract environment variables. These are optional runner supported capabilities.
// Expected env variables:
// RUNNER_CAPABILITIES : list of runner supported capability urn.
// STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting.
statusEndpoint := os.Getenv("STATUS_ENDPOINT")
runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ")
// Initialization logging
//
// We use direct output to stderr here, because it is expected that logging
// will be captured by the framework -- which may not be functional if
// harness.Main returns. We want to be sure any error makes it out.
if *options != "" {
var opt runtime.RawOptionsWrapper
if err := json.Unmarshal([]byte(*options), &opt); err != nil {
fmt.Fprintf(os.Stderr, "Failed to parse pipeline options '%v': %v\n", *options, err)
os.Exit(1)
}
runtime.GlobalOptions.Import(opt.Options)
var experiments []string
if e, ok := opt.Options.Options["experiments"]; ok {
experiments = strings.Split(e, ",")
}
// TODO(zechenj18) 2023-12-07: Remove once the data sampling URN is properly sent in via the capabilities
if slices.Contains(experiments, "enable_data_sampling") {
runnerCapabilities = append(runnerCapabilities, graphx.URNDataSampling)
}
}
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "Worker panic: %v\n", r)
debug.PrintStack()
switch ShutdownMode {
case Terminate:
os.Exit(2)
case Return:
return
default:
panic(fmt.Sprintf("unknown ShutdownMode: %v", ShutdownMode))
}
}
}()
// Since Init() is hijacking main, it's appropriate to do as main
// does, and establish the background context here.
// We produce a cancelFn here so runs in Loopback mode and similar can clean up
// any leftover goroutines.
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
ctx = grpcx.WriteWorkerID(ctx, *id)
memLimit := memoryLimit()
if err := syscallx.SetProcessMemoryCeiling(memLimit, memLimit); err != nil && err != syscallx.ErrUnsupported {
fmt.Println("Error Setting Rlimit ", err)
}
options := harness.Options{
StatusEndpoint: statusEndpoint,
RunnerCapabilities: runnerCapabilities,
}
if err := harness.MainWithOptions(ctx, *loggingEndpoint, *controlEndpoint, options); err != nil {
fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err)
switch ShutdownMode {
case Terminate:
os.Exit(1)
case Return:
return
default:
panic(fmt.Sprintf("unknown ShutdownMode: %v", ShutdownMode))
}
}
fmt.Fprintln(os.Stderr, "Worker exited successfully!")
for {
// Just hang around until we're terminated.
time.Sleep(time.Hour)
}
}
// memoryLimits returns 90% of the physical memory on the machine. If it cannot determine
// that value, it returns 2GB. This is an imperfect heuristic. It aims to
// ensure there is enough memory for the process without causing an OOM.
func memoryLimit() uint64 {
if size, err := syscallx.PhysicalMemorySize(); err == nil {
return (size * 90) / 100
}
return 2 << 30
}