/
wordcount.go
218 lines (199 loc) · 9.12 KB
/
wordcount.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Wordcount is an example using cross-language BigQuery transforms to read and write to BigQuery.
// This example runs a batch pipeline that reads from the public table "shakespeare" described here:
// https://cloud.google.com/bigquery/public-data#sample_tables. It reads the data of word counts per
// different work, aggregates them to find total word counts in all works, as well as the average
// number of times a word appears if it appears in a work, and then writes all that data to a given
// output table.
//
// This example is only expected to work on Dataflow, and requires a cross-language expansion
// service that can expand BigQuery read and write transforms. An address to a persistent expansion
// service can be provided as a flag, or if none is specified then the SDK will attempt to
// automatically start an appropriate expansion service.
//
// Running an Expansion Server
//
// If the automatic expansion service functionality is not available for your environment, or if
// you want improved performance, you will need to start a persistent expansion service. These
// instructions will cover running the Java SchemaIO Expansion Service, and therefore requires a JDK
// installation in a version supported by Beam. Depending on whether you are running this from a
// numbered Beam release, or a development environment, there are two sources you may use for the
// Expansion service.
//
// Numbered release: The expansion service jar is vendored as module
// org.apache.beam:beam-sdks-java-io-google-cloud-platform-expansion-service in Maven Repository.
// This jar can be executed directly with the following command:
// `java -jar <jar_name> <port_number>`
//
// Development env: This requires that the JAVA_HOME environment variable points to your JDK
// installation. From the root `beam/` directory of the Apache Beam repository, the jar can be
// built (or built and run) with the following commands:
// ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build
// ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:runExpansionService -PconstructionService.port=<port_num>
//
// Running the Example on GCP
//
// An example command for executing this pipeline on GCP is as follows:
// export PROJECT="$(gcloud config get-value project)"
// export TEMP_LOCATION="gs://MY-BUCKET/temp"
// export REGION="us-central1"
// export JOB_NAME="bigquery-wordcount-`date +%Y%m%d-%H%M%S`"
// export OUTPUT_TABLE="123.45.67.89:1234"
// export EXPANSION_ADDR="localhost:1234"
// export OUTPUT_TABLE="project_id:dataset_id.table_id"
// go run ./sdks/go/examples/kafka/types/types.go \
// --runner=DataflowRunner \
// --temp_location=$TEMP_LOCATION \
// --staging_location=$STAGING_LOCATION \
// --project=$PROJECT \
// --region=$REGION \
// --job_name="${JOB_NAME}" \
// --bootstrap_servers=$BOOTSTRAP_SERVER \
// --expansion_addr=$EXPANSION_ADDR \
// --out_table=$OUTPUT_TABLE
//
// Running the Example From a Git Clone
//
// When running on a development environment, a custom container will likely need to be provided
// for the cross-language SDK. First this will require building and pushing the SDK container to
// container repository, such as Docker Hub.
//
// export DOCKER_ROOT="Your Docker Repository Root"
// ./gradlew :sdks:java:container:java8:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
// docker push $DOCKER_ROOT/beam_java8_sdk:latest
//
// For runners in local mode, simply building the container using the default values for
// docker-repository-root and docker-tag will work to have it accessible locally.
//
// Additionally, you must provide the location of your custom container to the pipeline with the
// --sdk_harness_container_image_override flag for Java, or --environment_config flag for Go. For
// example:
//
// --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest" \
// --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest
package main
import (
"context"
"flag"
"math"
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
var (
// Set this to the address of the expansion service to use for BigQuery read and write, or leave
// unspecified to attempt to automatically start an expansion service.
expansionAddr = flag.String("expansion_addr", "",
"Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
// Set this required option to specify where to write the output. If the table does not exist,
// a new one will be created. If the table already exists, elements will be appended to it.
outTable = flag.String("out_table", "", "Output table (required).")
)
func init() {
register.Combiner3[WordsAccum, ShakesRow, CountsRow](&WordsCombine{})
}
// ShakesRow is a struct corresponding to the schema of the Shakespeare input table. In order to
// be read properly, field names must match names from the BigQuery table, so some fields must
// include underlines.
type ShakesRow struct {
Word string `beam:"word"`
Word_count int64 `beam:"word_count"`
Corpus string `beam:"corpus"`
Corpus_date int64 `beam:"corpus_date"`
}
// CountsRow is a struct corresponding to the schema of the output table. For writes, field names
// are derived from the Beam schema names specified below as struct tags.
type CountsRow struct {
// Word is the word being counted.
Word string `beam:"word"`
// WordCount is the count of how many times the word appears in all works combined.
WordCount int64 `beam:"word_count"`
// CorpusCount is the count of how many works the word appears in.
CorpusCount int64 `beam:"corpus_count"`
// AvgCount is the average number of times a word appears in all works that it appears in. In
// other words, this is equivalent to WordCount divided by CorpusCount.
AvgCount float64 `beam:"avg_count"`
}
// WordsAccum is an accumulator for combining Shakespeare word counts in order to get averages of
// word counts.
type WordsAccum struct {
// Word is the word being counted.
Word string
// Count is the number of times this word has appeared, or in other words the number of corpuses
// it appears in (assuming that the input never repeats a word and corpus pair.
Count int64
// Sum is the sum of word counts from inputs.
Sum int64
}
// WordsCombine is a CombineFn that adds up word counts and calculates average number of counts.
type WordsCombine struct{}
// CreateAccumulator creates a default WordsAccum.
func (fn *WordsCombine) CreateAccumulator() WordsAccum {
return WordsAccum{}
}
// AddInput sums up word counts and increments the corpus count.
func (fn *WordsCombine) AddInput(a WordsAccum, row ShakesRow) WordsAccum {
a.Word = row.Word
a.Count += 1
a.Sum += row.Word_count
return a
}
// MergeAccumulators sums up the various counts being accumulated.
func (fn *WordsCombine) MergeAccumulators(a, v WordsAccum) WordsAccum {
return WordsAccum{Word: a.Word, Count: a.Count + v.Count, Sum: a.Sum + v.Sum}
}
// ExtractOutput calculates the average and fills out the output rows.
func (fn *WordsCombine) ExtractOutput(a WordsAccum) CountsRow {
row := CountsRow{
Word: a.Word,
WordCount: a.Sum,
CorpusCount: a.Count,
}
if a.Count == 0 {
row.AvgCount = math.NaN()
} else {
row.AvgCount = float64(a.Sum) / float64(a.Count)
}
return row
}
func main() {
flag.Parse()
beam.Init()
p := beam.NewPipeline()
s := p.Root()
// Read from the public BigQuery table.
inType := reflect.TypeOf((*ShakesRow)(nil)).Elem()
rows := bigqueryio.Read(s, inType,
bigqueryio.FromTable("bigquery-public-data:samples.shakespeare"),
bigqueryio.ReadExpansionAddr(*expansionAddr))
// Combine the data per word.
keyed := beam.ParDo(s, func(elm ShakesRow) (string, ShakesRow) {
return elm.Word, elm
}, rows)
counts := beam.CombinePerKey(s, &WordsCombine{}, keyed)
countVals := beam.DropKey(s, counts)
// Write the data to the given BigQuery table destination, creating the table if needed.
bigqueryio.Write(s, *outTable, countVals,
bigqueryio.CreateDisposition(bigqueryio.CreateIfNeeded),
bigqueryio.WriteExpansionAddr(*expansionAddr))
ctx := context.Background()
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "Failed to execute job: %v", err)
}
}