forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
debugging_wordcount.go
163 lines (139 loc) · 5.7 KB
/
debugging_wordcount.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// debugging_wordcount is an example that verifies word counts in Shakespeare
// and includes Beam best practices.
//
// This example, debugging_wordcount, is the third in a series of four
// successively more detailed 'word count' examples. You may first want to
// take a look at minimal_wordcount and wordcount. After you've looked at
// this example, then see the windowed_wordcount pipeline, for introduction
// of additional concepts.
//
// Basic concepts, also in the minimal_wordcount and wordcount examples:
// Reading text files; counting a PCollection; executing a Pipeline both locally
// and using a selected runner; defining DoFns.
//
// New Concepts:
//
// 1. Using the richer struct DoFn form and accessing optional arguments.
// 2. Logging using the Beam log package, even in a distributed environment
// 3. Testing your Pipeline via passert
//
// To change the runner, specify:
//
// --runner=YOUR_SELECTED_RUNNER
//
// The input file defaults to a public data set containing the text of of King
// Lear, by William Shakespeare. You can override it and choose your own input
// with --input.
package main
import (
"context"
"flag"
"fmt"
"reflect"
"regexp"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)
// TODO(herohde) 10/16/2017: support metrics and log level cutoff.
var (
input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.")
filter = flag.String("filter", "Flourish|stomach", "Regex filter pattern to use. Only words matching this pattern will be included.")
output = flag.String("output", "", "Output file (required).")
)
// Concept #1: a DoFn can also be a struct with methods for setup/teardown and
// element/bundle processing. It also allows configuration values to be made
// available at runtime.
func init() {
// To be correctly serialized on non-direct runners, struct form DoFns must be
// registered during initialization.
beam.RegisterType(reflect.TypeOf((*filterFn)(nil)).Elem())
}
// filterFn is a DoFn for filtering out certain words.
type filterFn struct {
// Filter is a regex that is serialized as json and available at runtime.
// Such fields must be exported.
Filter string `json:"filter"`
re *regexp.Regexp
}
func (f *filterFn) Setup() {
f.re = regexp.MustCompile(f.Filter)
}
// Concept #2: The Beam log package should used for all logging in runtime
// functions. The needed context is made available as an argument.
func (f *filterFn) ProcessElement(ctx context.Context, word string, count int, emit func(string, int)) {
if f.re.MatchString(word) {
// Log at the "INFO" level each element that we match.
log.Infof(ctx, "Matched: %v", word)
emit(word, count)
} else {
// Log at the "DEBUG" level each element that is not matched.
log.Debugf(ctx, "Did not match: %v", word)
}
}
// The below transforms are identical to the wordcount versions. If this was
// production code, common transforms would be placed in a separate package
// and shared directly rather than being copied.
var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
// extractFn is a DoFn that emits the words in a given line.
func extractFn(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}
// formatFn is a DoFn that formats a word and its count as a string.
func formatFn(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}
// CountWords is a composite transform that counts the words of an PCollection
// of lines. It expects a PCollection of type string and returns a PCollection
// of type KV<string,int>.
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
s = s.Scope("CountWords")
col := beam.ParDo(s, extractFn, lines)
return stats.Count(s, col)
}
func main() {
flag.Parse()
beam.Init()
// Concept #2: the beam logging package works both during pipeline
// construction and at runtime. It should always be used.
ctx := context.Background()
if *output == "" {
log.Exit(ctx, "No output provided")
}
if _, err := regexp.Compile(*filter); err != nil {
log.Exitf(ctx, "Invalid filter: %v", err)
}
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, *input)
counted := CountWords(s, lines)
filtered := beam.ParDo(s, &filterFn{Filter: *filter}, counted)
formatted := beam.ParDo(s, formatFn, filtered)
// Concept #3: passert is a set of convenient PTransforms that can be used
// when writing Pipeline level tests to validate the contents of
// PCollections. passert is best used in unit tests with small data sets
// but is demonstrated here as a teaching tool.
passert.Equals(s, formatted, "Flourish: 3", "stomach: 1")
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}