forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gbk.go
146 lines (134 loc) · 5.83 KB
/
gbk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package beam
import (
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
// GroupByKey is a PTransform that takes a PCollection of type KV<A,B>,
// groups the values by key and windows, and returns a PCollection of type
// GBK<A,B> representing a map from each distinct key and window of the
// input PCollection to an iterable over all the values associated with
// that key in the input per window. Each key in the output PCollection is
// unique within each window.
//
// GroupByKey is analogous to converting a multi-map into a uni-map, and
// related to GROUP BY in SQL. It corresponds to the "shuffle" step between
// the Mapper and the Reducer in the MapReduce framework.
//
// Two keys of type A are compared for equality by first encoding each of the
// keys using the Coder of the keys of the input PCollection, and then
// comparing the encoded bytes. This admits efficient parallel evaluation.
// Note that this requires that the Coder of the keys be deterministic.
//
// By default, input and output PCollections share a key Coder and iterable
// values in the input and output PCollection share an element Coder.
//
// GroupByKey is a key primitive in data-parallel processing, since it is the
// main way to efficiently bring associated data together into one location.
// It is also a key determiner of the performance of a data-parallel pipeline.
//
// See CoGroupByKey for a way to group multiple input PCollections by a common
// key at once.
func GroupByKey(s Scope, a PCollection) PCollection {
return CoGroupByKey(s, a)
}
// TODO(herohde) 5/30/2017: add windowing aspects to above documentation.
// TODO(herohde) 6/23/2017: support createWithFewKeys and other variants?
// TryGroupByKey inserts a GBK transform into the pipeline. Returns
// an error on failure.
func TryGroupByKey(s Scope, a PCollection) (PCollection, error) {
return TryCoGroupByKey(s, a)
}
// CoGroupByKey inserts a CoGBK transform into the pipeline.
func CoGroupByKey(s Scope, cols ...PCollection) PCollection {
return Must(TryCoGroupByKey(s, cols...))
}
func addCoGBKCtx(err error, s Scope) error {
return errors.WithContextf(err, "inserting CoGroupByKey in scope %s", s)
}
// TryCoGroupByKey inserts a CoGBK transform into the pipeline. Returns
// an error on failure.
func TryCoGroupByKey(s Scope, cols ...PCollection) (PCollection, error) {
if !s.IsValid() {
return PCollection{}, addCoGBKCtx(errors.New("invalid scope"), s)
}
if len(cols) < 1 {
return PCollection{}, addCoGBKCtx(errors.New("need at least 1 pcollection"), s)
}
for i, in := range cols {
if !in.IsValid() {
return PCollection{}, addCoGBKCtx(errors.Errorf("invalid pcollection to CoGBK: index %v", i), s)
}
}
var in []*graph.Node
for _, s := range cols {
in = append(in, s.n)
}
edge, err := graph.NewCoGBK(s.real, s.scope, in)
if err != nil {
return PCollection{}, err
}
ret := PCollection{edge.Output[0].To}
ret.SetCoder(NewCoder(ret.Type()))
return ret, nil
}
// Reshuffle copies a PCollection of the same kind and using the same element
// coder, and maintains the same windowing information. Importantly, it allows
// the result PCollection to be processed with a different sharding, in a
// different stage than the input PCollection.
//
// For example, if a computation needs a lot of parallelism but
// produces only a small amount of output data, then the computation
// producing the data can run with as much parallelism as needed,
// while the output file is written with a smaller amount of
// parallelism, using the following pattern:
//
// pc := bigHairyComputationNeedingParallelism(scope) // PCollection<string>
// resharded := beam.Reshuffle(scope, pc) // PCollection<string>
//
// Another use case is when one has a non-deterministic DoFn followed by one
// that performs externally-visible side effects. Inserting a Reshuffle
// between these DoFns ensures that retries of the second DoFn will always be
// the same, which is necessary to make side effects idempotent.
//
// A Reshuffle will force a break in the optimized pipeline. Consequently,
// this operation should be used sparingly, only after determining that the
// pipeline without reshuffling is broken in some way and performing an extra
// operation is worth the cost.
func Reshuffle(s Scope, col PCollection) PCollection {
return Must(TryReshuffle(s, col))
}
// TryReshuffle inserts a Reshuffle into the pipeline, and returns an error if
// the pcollection's unable to be reshuffled.
func TryReshuffle(s Scope, col PCollection) (PCollection, error) {
addContext := func(err error, s Scope) error {
return errors.WithContextf(err, "inserting Reshard in scope %s", s)
}
if !s.IsValid() {
return PCollection{}, addContext(errors.New("invalid scope"), s)
}
if !col.IsValid() {
return PCollection{}, addContext(errors.New("invalid pcollection"), s)
}
edge, err := graph.NewReshuffle(s.real, s.scope, col.n)
if err != nil {
return PCollection{}, addContext(err, s)
}
col.n.WindowingStrategy()
ret := PCollection{edge.Output[0].To}
ret.SetCoder(NewCoder(ret.Type()))
return ret, nil
}