Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
179 lines (153 sloc) 6.73 KB
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package beam
import (
"reflect"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
)
// IMPLEMENTATION NOTE: functions and types in this file are assumed to be
// simple forwards from the graph package (or subpackages) for the purpose of
// removing such package dependencies from end-user pipeline code. So:
//
// PLEASE DO NOT ADD NON-FORWARDING FUNCTIONALITY HERE.
//
// Instead, add such functionality in the core packages and add pipeline author
// oriented documentation.
// TODO(herohde) 7/13/2017: these forwards alone pull in runtime. Is there a use
// case for separate package?
// RegisterType inserts "external" types into a global type registry to bypass
// serialization and preserve full method information. It should be called in
// init() only.
// TODO(wcn): the canonical definition of "external" is in v1.proto. We need user
// facing copy for this important concept.
func RegisterType(t reflect.Type) {
runtime.RegisterType(t)
}
// RegisterFunction allows function registration. It is beneficial for performance
// and is needed for functions -- such as custom coders -- serialized during unit
// tests, where the underlying symbol table is not available. It should be called
// in init() only. Returns the external key for the function.
func RegisterFunction(fn interface{}) {
runtime.RegisterFunction(fn)
}
// RegisterInit registers an Init hook. Hooks are expected to be able to
// figure out whether they apply on their own, notably if invoked in a remote
// execution environment. They are all executed regardless of the runner.
func RegisterInit(hook func()) {
runtime.RegisterInit(hook)
}
// RegisterCoder registers a user defined coder for a given type, and will
// be used if there is no existing beam coder for that type.
// Must be called prior to beam.Init(), preferably in an init() function.
//
// The coder used for a given type follows this ordering:
// 1. Coders for Known Beam types.
// 2. Coders registered for specific types
// 3. Coders registered for interfaces types
// 4. Default coder (JSON)
//
// Coders for interface types are iterated over to check if a type
// satisfies them, and the most recent one registered will be used.
//
// Repeated registrations of the same type overrides prior ones.
//
// RegisterCoder additionally registers the type, and coder functions
// as per RegisterType and RegisterFunction to avoid redundant calls.
//
// Supported Encoder Signatures
//
// func(T) []byte
// func(reflect.Type, T) []byte
// func(T) ([]byte, error)
// func(reflect.Type, T) ([]byte, error)
//
// Supported Decoder Signatures
//
// func([]byte) T
// func(reflect.Type, []byte) T
// func([]byte) (T, error)
// func(reflect.Type, []byte) (T, error)
//
// where T is the matching user type.
func RegisterCoder(t reflect.Type, encoder, decoder interface{}) {
runtime.RegisterType(t)
runtime.RegisterFunction(encoder)
runtime.RegisterFunction(decoder)
coder.RegisterCoder(t, encoder, decoder)
}
// ElementEncoder encapsulates being able to encode an element into a writer.
type ElementEncoder = coder.ElementEncoder
// ElementDecoder encapsulates being able to decode an element from a reader.
type ElementDecoder = coder.ElementDecoder
// Init is the hook that all user code must call after flags processing and
// other static initialization, for now.
func Init() {
runtime.Init()
}
// Initialized exposes the initialization status for runners.
func Initialized() bool {
return runtime.Initialized()
}
// PipelineOptions are global options for the active pipeline. Options can
// be defined any time before execution and are re-created by the harness on
// remote execution workers. Global options should be used sparingly.
var PipelineOptions = runtime.GlobalOptions
// We forward typex types used in UserFn signatures to avoid having such code
// depend on the typex package directly.
// FullType represents the tree structure of data types processed by the graph.
// It allows representation of composite types, such as KV<int, string> or
// CoGBK<int, int>, as well as "generic" such types, KV<int,T> or CoGBK<X,Y>,
// where the free "type variables" are the fixed universal types: T, X, etc.
type FullType = typex.FullType
// T is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type T = typex.T
// U is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type U = typex.U
// V is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type V = typex.V
// W is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type W = typex.W
// X is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type X = typex.X
// Y is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type Y = typex.Y
// Z is a Universal Type used to represent "generic" types in DoFn and
// PCollection signatures. Each universal type is distinct from all others.
type Z = typex.Z
// EventTime represents the time of the event that generated an element.
// This is distinct from the time when an element is processed.
type EventTime = typex.EventTime
type Window = typex.Window
// These are the reflect.Type instances of the universal types, which are used
// when binding actual types to "generic" DoFns that use Universal Types.
var (
TType = typex.TType
UType = typex.UType
VType = typex.VType
WType = typex.WType
XType = typex.XType
YType = typex.YType
ZType = typex.ZType
)
// EventTimeType is the reflect.Type of EventTime.
var EventTimeType = typex.EventTimeType
You can’t perform that action at this time.