forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wordcount.go
177 lines (159 loc) · 6.83 KB
/
wordcount.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// wordcount is an example that counts words in Shakespeare and includes Beam
// best practices.
//
// This example is the second in a series of four successively more detailed
// 'word count' examples. You may first want to take a look at minimal_wordcount.
// After you've looked at this example, then see the debugging_workcount
// pipeline, for introduction of additional concepts.
//
// For a detailed walkthrough of this example, see
//
// https://beam.apache.org/get-started/wordcount-example/
//
// Basic concepts, also in the minimal_wordcount example: Reading text files;
// counting a PCollection; writing to text files
//
// New Concepts:
//
// 1. Executing a Pipeline both locally and using the selected runner
// 2. Defining your own pipeline options
// 3. Using ParDo with static DoFns defined out-of-line
// 4. Building a composite transform
//
// Concept #1: you can execute this pipeline either locally or using by
// selecting another runner. These are now command-line options added by
// the 'beamx' package and not hard-coded as they were in the minimal_wordcount
// example. The 'beamx' package also registers all included runners and
// filesystems as a convenience.
//
// To change the runner, specify:
//
// --runner=YOUR_SELECTED_RUNNER
//
// To execute this pipeline, specify a local output file (if using the
// 'direct' runner) or a remote file on a supported distributed file system.
//
// --output=[YOUR_LOCAL_FILE | YOUR_REMOTE_FILE]
//
// The input file defaults to a public data set containing the text of of King
// Lear, by William Shakespeare. You can override it and choose your own input
// with --input.
package main
import (
"context"
"flag"
"fmt"
"log"
"regexp"
"strings"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)
// Concept #2: Defining your own configuration options. Pipeline options can
// just be standard Go flags (or be obtained any other way). Defining and
// configuring the pipeline is normal Go code.
var (
// By default, this example reads from a public dataset containing the text of
// King Lear. Set this option to choose a different input file or glob.
input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.")
// Set this required option to specify where to write the output.
output = flag.String("output", "", "Output file (required).")
)
// Concept #3: You can make your pipeline assembly code less verbose and by
// defining your DoFns statically out-of-line. A DoFn can be defined as a Go
// function and is conventionally suffixed "Fn". The argument and return types
// dictate the pipeline shape when used in a ParDo: for example,
//
// formatFn: string x int -> string
//
// indicate that it operates on a PCollection of type KV<string,int>, representing
// key value pairs of strings and ints, and outputs a PCollection of type string.
// Beam typechecks the pipeline before running it.
//
// DoFns that potentially output zero or multiple elements can also be Go functions,
// but have a different signature. For example,
//
// extractFn : string x func(string) -> ()
//
// uses an "emit" function argument instead of string return type to allow it to
// output any number of elements. It operates on a PCollection of type string and
// returns a PCollection of type string. Also, using named function transforms allows
// for easy reuse, modular testing, and an improved monitoring experience.
var (
wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
empty = beam.NewCounter("extract", "emptyLines")
lineLen = beam.NewDistribution("extract", "lineLenDistro")
)
// extractFn is a DoFn that emits the words in a given line.
func extractFn(ctx context.Context, line string, emit func(string)) {
lineLen.Update(ctx, int64(len(line)))
if len(strings.TrimSpace(line)) == 0 {
empty.Inc(ctx, 1)
}
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}
// formatFn is a DoFn that formats a word and its count as a string.
func formatFn(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}
// Concept #4: A composite PTransform is a Go function that adds
// transformations to a given pipeline. It is run at construction time and
// works on PCollections as values. For monitoring purposes, the pipeline
// allows scoped naming for composite transforms. The difference between a
// composite transform and a construction helper function is solely in whether
// a scoped name is used.
//
// For example, the CountWords function is a custom composite transform that
// bundles two transforms (ParDo and Count) as a reusable function.
// CountWords is a composite transform that counts the words of a PCollection
// of lines. It expects a PCollection of type string and returns a PCollection
// of type KV<string,int>. The Beam type checker enforces these constraints
// during pipeline construction.
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
s = s.Scope("CountWords")
// Convert lines of text into individual words.
col := beam.ParDo(s, extractFn, lines)
// Count the number of times each word occurs.
return stats.Count(s, col)
}
func main() {
// If beamx or Go flags are used, flags must be parsed first.
flag.Parse()
// beam.Init() is an initialization hook that must be called on startup. On
// distributed runners, it is used to intercept control.
beam.Init()
// Input validation is done as usual. Note that it must be after Init().
if *output == "" {
log.Fatal("No output provided")
}
// Concepts #3 and #4: The pipeline uses the named transform and DoFn.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, *input)
counted := CountWords(s, lines)
formatted := beam.ParDo(s, formatFn, counted)
textio.Write(s, *output, formatted)
// Concept #1: The beamx.Run convenience wrapper allows a number of
// pre-defined runners to be used via the --runner flag.
if err := beamx.Run(context.Background(), p); err != nil {
log.Fatalf("Failed to execute job: %v", err)
}
}