forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
forward.go
127 lines (105 loc) · 5 KB
/
forward.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package beam
import (
"reflect"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
)
// IMPLEMENTATION NOTE: functions and types in this file are assumed to be
// simple forwards from the graph package (or subpackages) for the purpose of
// removing such package dependencies from end-user pipeline code. So:
//
// PLEASE DO NOT ADD NON-FORWARDING FUNCTIONALITY HERE.
//
// Instead, add such functionality in the core packages and add pipeline author
// oriented documentation.
// TODO(herohde) 7/13/2017: these forwards alone pull in runtime. Is there a use
// case for separate package?
// RegisterType inserts "external" types into a global type registry to bypass
// serialization and preserve full method information. It should be called in
// init() only.
// TODO(wcn): the canonical definition of "external" is in v1.proto. We need user
// facing copy for this important concept.
func RegisterType(t reflect.Type) {
runtime.RegisterType(t)
}
// RegisterFunction allows function registration. It is beneficial for performance
// and is needed for functions -- such as custom coders -- serialized during unit
// tests, where the underlying symbol table is not available. It should be called
// in init() only. Returns the external key for the function.
func RegisterFunction(fn interface{}) {
runtime.RegisterFunction(fn)
}
// RegisterInit registers an Init hook. Hooks are expected to be able to
// figure out whether they apply on their own, notably if invoked in a remote
// execution environment. They are all executed regardless of the runner.
func RegisterInit(hook func()) {
runtime.RegisterInit(hook)
}
// Init is the hook that all user code must call after flags processing and
// other static initialization, for now.
func Init() {
runtime.Init()
}
// PipelineOptions are global options for the active pipeline. Options can
// be defined any time before execution and are re-created by the harness on
// remote execution workers. Global options should be used sparingly.
var PipelineOptions = runtime.GlobalOptions
// We forward typex types used in UserFn signatures to avoid having such code
// depend on the typex package directly.
// FullType represents the tree structure of data types processed by the graph.
// It allows representation of composite types, such as KV<int, string> or
// CoGBK<int, int>, as well as "generic" such types, KV<int,T> or CoGBK<X,Y>,
// where the free "type variables" are the fixed universal types: T, X, etc.
type FullType = typex.FullType
// T is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type T = typex.T
// U is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type U = typex.U
// V is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type V = typex.V
// W is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type W = typex.W
// X is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type X = typex.X
// Y is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type Y = typex.Y
// Z is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type Z = typex.Z
// EventTime represents the time of the event that generated an element.
// This is distinct from the time when an element is processed.
type EventTime = typex.EventTime
type Window = typex.Window
// These are the reflect.Type instances of the universal types, which are used
// when binding actual types to "generic" DoFns that use Universal Types.
var (
TType = typex.TType
UType = typex.UType
VType = typex.VType
WType = typex.WType
XType = typex.XType
YType = typex.YType
ZType = typex.ZType
)
// EventTimeType is the reflect.Type of EventTime.
var EventTimeType = typex.EventTimeType