/
session.go
232 lines (201 loc) · 6.84 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package harness
import (
"context"
"fmt"
"io"
"sync"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/session"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/golang/protobuf/proto"
)
// capture is set by the capture hook below.
var capture io.WriteCloser
var (
sessionLock sync.Mutex
bufPool = sync.Pool{
New: func() interface{} {
return proto.NewBuffer(nil)
},
}
)
func isEnabled(option string) bool {
return runtime.GlobalOptions.Get(option) == "true"
}
func recordMessage(opcode session.Kind, pb *session.Entry) error {
if !isEnabled("session_recording") {
return nil
}
// This is called inline with the message handling code.
// It'd be nicer to be a bit more well-behaved and not block the main thread
// of execution. However, this is for recording profiles, and shouldn't be called
// when measuring performance, so maybe this perf hit isn't a big deal.
// The format of the file is a sequence of message elements. Each element consists of
// three parts.
// 1) Varint encoded length of the EntryHeader
// 2) Encoded EntryHeader message. This is a lightweight message designed to contain
// enough information for a consumer to determine whether the Entry is of interest
// which allows optionally skipping that expensive decode.
// 3) Encoded Entry message.
body := bufPool.Get().(*proto.Buffer)
defer bufPool.Put(body)
if err := body.Marshal(pb); err != nil {
return errors.Wrap(err, "unable to marshal message for session recording")
}
eh := &session.EntryHeader{
Kind: pb.Kind,
Len: int64(len(body.Bytes())),
}
hdr := bufPool.Get().(*proto.Buffer)
defer bufPool.Put(hdr)
if err := hdr.Marshal(eh); err != nil {
return errors.Wrap(err, "unable to marshal message header for session recording")
}
l := bufPool.Get().(*proto.Buffer)
defer bufPool.Put(l)
if err := l.EncodeVarint(uint64(len(hdr.Bytes()))); err != nil {
return errors.Wrap(err, "unable to write entry header length")
}
// Acquire the lock to write the file.
sessionLock.Lock()
defer sessionLock.Unlock()
if _, err := capture.Write(l.Bytes()); err != nil {
return errors.Wrap(err, "unable to write entry header length")
}
if _, err := capture.Write(hdr.Bytes()); err != nil {
return errors.Wrap(err, "unable to write entry header")
}
if _, err := capture.Write(body.Bytes()); err != nil {
return errors.Wrap(err, "unable to write entry body")
}
return nil
}
func recordInstructionRequest(req *fnpb.InstructionRequest) error {
return recordMessage(session.Kind_INSTRUCTION_REQUEST,
&session.Entry{
Kind: session.Kind_INSTRUCTION_REQUEST,
Msg: &session.Entry_InstReq{
InstReq: req,
},
})
}
func recordInstructionResponse(resp *fnpb.InstructionResponse) error {
return recordMessage(session.Kind_INSTRUCTION_RESPONSE,
&session.Entry{
Kind: session.Kind_INSTRUCTION_RESPONSE,
Msg: &session.Entry_InstResp{
InstResp: resp,
},
})
}
func recordStreamReceive(data *fnpb.Elements) error {
return recordMessage(session.Kind_DATA_RECEIVED,
&session.Entry{
Kind: session.Kind_DATA_RECEIVED,
Msg: &session.Entry_Elems{
Elems: data,
},
})
}
func recordStreamSend(data *fnpb.Elements) error {
return recordMessage(session.Kind_DATA_SENT,
&session.Entry{
Kind: session.Kind_DATA_SENT,
Msg: &session.Entry_Elems{
Elems: data,
},
})
}
func recordLogEntries(entries *fnpb.LogEntry_List) error {
return recordMessage(session.Kind_LOG_ENTRIES,
&session.Entry{
Kind: session.Kind_LOG_ENTRIES,
Msg: &session.Entry_LogEntries{
LogEntries: entries,
},
})
}
func recordHeader() error {
return recordMessage(session.Kind_HEADER,
&session.Entry{
Kind: session.Kind_HEADER,
Msg: &session.Entry_Header{
Header: &session.Header{
Version: "0.0.1",
MaxMsgLen: 4000000, // TODO(wcn): get from DataChannelManager.
},
},
})
}
// TODO(wcn): footer is designed to be the last thing recorded in the log. However,
// there's currently no coordination with the logging channel, so this isn't true.
func recordFooter() error {
return recordMessage(session.Kind_FOOTER, &session.Entry{
Kind: session.Kind_FOOTER,
Msg: &session.Entry_Footer{
Footer: &session.Footer{},
},
})
}
// CaptureHook writes the messaging content consumed and
// produced by the worker, allowing the data to be used as
// an input for the session runner. Since workers can exist
// in a variety of environments, this allows the runner
// to tailor the behavior best for its particular needs.
type CaptureHook io.WriteCloser
// CaptureHookFactory produces a CaptureHook from the supplied
// options.
type CaptureHookFactory func([]string) CaptureHook
var captureHookRegistry = make(map[string]CaptureHookFactory)
func init() {
hf := func(opts []string) hooks.Hook {
return hooks.Hook{
Init: func(ctx context.Context) (context.Context, error) {
if len(opts) > 0 {
name, opts := hooks.Decode(opts[0])
capture = captureHookRegistry[name](opts)
}
return ctx, nil
},
}
}
hooks.RegisterHook("session", hf)
}
// RegisterCaptureHook registers a CaptureHookFactory for the
// supplied identifier.
func RegisterCaptureHook(name string, c CaptureHookFactory) {
if _, exists := captureHookRegistry[name]; exists {
panic(fmt.Sprintf("RegisterSessionCaptureHook: %s registered twice", name))
}
captureHookRegistry[name] = c
}
// EnableCaptureHook is called to request the use of a hook in a pipeline.
// It updates the supplied pipelines to capture this request.
func EnableCaptureHook(name string, opts []string) {
if _, exists := captureHookRegistry[name]; !exists {
panic(fmt.Sprintf("EnableHook: %s not registered", name))
}
if exists, opts := hooks.IsEnabled("session"); exists {
n, _ := hooks.Decode(opts[0])
if n != name {
panic(fmt.Sprintf("EnableHook: can't enable hook %s, hook %s already enabled", name, n))
}
}
hooks.EnableHook("session", hooks.Encode(name, opts))
}