/
registry.go
278 lines (244 loc) · 9.63 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xlangx
import (
"context"
"fmt"
"net/url"
"strings"
"sync"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
)
var defaultReg = newRegistry()
// RegisterHandler associates a namespace with a HandlerFunc which can be used to
// replace calls to a Beam ExpansionService.
//
// Then, expansion addresses of the forms
// "<namespace>" or
// "<namespace>:<configuration>"
// can be used with beam.CrossLanguage. Any configuration after the separator is
// provided to the HandlerFunc on call for the handler func to use at it's leisure.
func RegisterHandler(namespace string, handler HandlerFunc) {
if err := defaultReg.RegisterHandler(namespace, handler); err != nil {
panic(err)
}
}
// RegisterOverrideForUrn overrides which expansion address is used to
// expand a specific transform URN. The expansion address must be a URL
// or be a namespaced handler registered with RegisterHandler.
//
// When the expansion address is for a handler, it may take the forms
// "<namespace>" or
// "<namespace>:<configuration>"
func RegisterOverrideForUrn(urn, expansionAddr string) {
if err := defaultReg.RegisterOverrideForUrn(urn, expansionAddr); err != nil {
panic(err)
}
}
// HandlerParams is the parameter to an expansion service handler.
type HandlerParams struct {
// Additional parameterization string, if any.
Config string
Req *jobpb.ExpansionRequest
// Additional pipeline graph information for custom handling
// Not exported to avoid mutation.
edge *graph.MultiEdge
ext *graph.ExternalTransform
}
// CoderMarshaller returns a coder marshaller initialized with the request's namespace.
func (p *HandlerParams) CoderMarshaller() *graphx.CoderMarshaller {
cm := graphx.NewCoderMarshaller()
cm.Namespace = p.Req.Namespace
return cm
}
// PCol represents input or output pcollections to the cross language transform being expanded.
type PCol struct {
Index int // Positional index of this input or output
Local string // Local name of the PCollection (may be used in the cross language PTransform)
Coder *coder.Coder // Contains the full type and other coder information.
Bounded pipepb.IsBounded_Enum
namespace string
node *graph.Node
}
// ID produces a standard format globally namespaced id for a PCollection from the local identifier.
func (p *PCol) ID() string {
return fmt.Sprintf("n%v@%v", p.Local, p.namespace)
}
// WSID produces a standard format globally namespaced id for a WindowingStrategy from the local identifier.
func (p *PCol) WSID() string {
return fmt.Sprintf("ws%v@%v", p.Local, p.namespace)
}
// WindowingStrategy returns the id to this PCollection's windowing strategy, and the associated proto.
//
// TODO: intern windowing strategies.
func (p *PCol) WindowingStrategy(cm *graphx.CoderMarshaller) (string, *pipepb.WindowingStrategy) {
wspb, err := graphx.MarshalWindowingStrategy(cm, p.node.WindowingStrategy())
if err != nil {
panic(fmt.Errorf("unable to marshal windowing strategy for PCol %v: %w", p.Local, err))
}
return p.WSID(), wspb
}
func makePCol(node *graph.Node, index int, local, namespace string) PCol {
return PCol{
Index: index,
Local: local,
Coder: node.Coder,
Bounded: pipelinex.BoolToBounded(node.Bounded()),
namespace: namespace,
node: node,
}
}
// Outputs returns the provided output PCollections, if any, for expected outputs
// for this expansion service request.
//
// If no collections are returned, none are currently expected, but may be provided
// by the expansion.
func (p *HandlerParams) Outputs() []PCol {
out := make([]PCol, 0, len(p.ext.OutputsMap))
for local, i := range p.ext.OutputsMap {
out = append(out, makePCol(p.edge.Output[i].To, i, local, p.Req.Namespace))
}
return out
}
// Inputs returns the provided input PCollections, if any, for the PTransform to expand
// in this expansion service request.
func (p *HandlerParams) Inputs() []PCol {
out := make([]PCol, 0, len(p.ext.InputsMap))
for local, i := range p.ext.InputsMap {
out = append(out, makePCol(p.edge.Input[i].From, i, local, p.Req.Namespace))
}
return out
}
// HandlerFunc abstracts making an ExpansionService request.
type HandlerFunc func(context.Context, *HandlerParams) (*jobpb.ExpansionResponse, error)
type registry struct {
mu sync.Mutex
handlers map[string]HandlerFunc // namespace -> handlerfuncs
urnOverrides map[string]string // URNs -> expansionAddrs
}
func newRegistry() *registry {
return ®istry{
handlers: map[string]HandlerFunc{},
urnOverrides: map[string]string{},
}
}
// RegisterHandler associates a namespace, with a handler.
//
// Namespaces may not have the configuration separator ":" in them,
// nor may they be a restricted namespace, like "localhost" or "http".
func (r *registry) RegisterHandler(namespace string, handler HandlerFunc) error {
if err := validateNamespace(namespace); err != nil {
return fmt.Errorf("xlangx.RegisterHandler: %v", err)
}
r.mu.Lock()
defer r.mu.Unlock()
r.handlers[namespace] = handler
return nil
}
func validateNamespace(namespace string) error {
if strings.Contains(namespace, Separator) {
return fmt.Errorf("invalid namespace, provide a different one: %q contains the separator %q", namespace, Separator)
}
if _, ok := restricted[namespace]; ok {
return fmt.Errorf("invalide namespace, provide a different one: %q is a restricted namespace", namespace)
}
return nil
}
// RegisterOverrideForUrn instructs using expansionAddr for CrossLanguage
// transforms with urn. expansionAddr should either be registered with an
// Expansion handler, or an Expansion service address.
func (r *registry) RegisterOverrideForUrn(urn, expansionAddr string) error {
r.mu.Lock()
defer r.mu.Unlock()
if err := r.validateAddr(expansionAddr); err != nil {
return fmt.Errorf("xlangx.RegisterExpansionForUrn(%q,%q) error: %v", urn, expansionAddr, err)
}
r.urnOverrides[urn] = expansionAddr
return nil
}
func (r *registry) validateAddr(expansionAddr string) error {
u, err := url.Parse(expansionAddr)
if err == nil && u.Scheme != "" && u.Host != "" {
// This is likely a URL, so allow it.
return nil
}
// Otherwise, let's check that we have a handler registered.
ns, _ := parseAddr(expansionAddr)
if _, ok := r.handlers[ns]; !ok {
return fmt.Errorf("expansionAddr %q trying to use unregistered namespace: %q", expansionAddr, ns)
}
return nil
}
// getHandlerFunc returns HandlerFunc and the config string to put into the params when called.
func (r *registry) getHandlerFunc(urn, expansionAddr string) (HandlerFunc, string) {
r.mu.Lock()
defer r.mu.Unlock()
// By the time this is called, we want *some* kind of HandlerFunc at all,
// So first we check for the hard override.
ns, config := parseAddr(expansionAddr)
if ns == hardOverrideNamespace {
// We have the override namespace and config we must use, so skip the urn step.
expansionAddr = config // The expansionAddr becomes the full config, in case of service.
ns, config = parseAddr(config)
} else if addr, ok := r.urnOverrides[urn]; ok {
// If there is no hard override, check the urn overrides.
expansionAddr = addr
ns, config = parseAddr(addr)
}
// Now that overrides have been handled, we can look up if there's a handler, and return that.
if h, ok := r.handlers[ns]; ok {
return h, config
}
// Otherwise, we query the expansion service address, passing it to the func as a config.
return QueryExpansionService, expansionAddr
}
const (
// Separator is the canonical separator between a namespace and optional configuration.
Separator = ":"
hardOverrideNamespace = "hardoverride"
)
// Require takes an expansionAddr and requires cross language expansion
// to use it and it's associated handler. If the transform's urn has a
// specific override, it will be ignored.
//
// Intended for use by cross language wrappers to permit
// per-call overrides of the expansion address within a
// single pipeline, such as for testing purposes.
func Require(expansionAddr string) string {
return hardOverrideNamespace + Separator + expansionAddr
}
// restricted namespaces to prevent some awkward edge cases.
var restricted = map[string]struct{}{
hardOverrideNamespace: {}, // Special handler for overriding.
"localhost": {},
"http": {},
"https": {},
"tcp": {},
"udp": {},
}
// parseAddr takes an expansion address, and separates it into the namespace,
// and config string if any.
func parseAddr(expansionAddr string) (ns, config string) {
split := strings.SplitN(expansionAddr, Separator, 2)
if len(split) == 1 {
return expansionAddr, ""
}
return split[0], split[1]
}