/
kafka.go
287 lines (258 loc) · 10.8 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package kafkaio contains cross-language functionality for using Apache Kafka
// (http://kafka.apache.org/). These transforms only work on runners that
// support cross-language transforms.
//
// Setup
//
// Transforms specified here are cross-language transforms implemented in a
// different SDK (listed below). During pipeline construction, the Go SDK will
// need to connect to an expansion service containing information on these
// transforms in their native SDK.
//
// To use an expansion service, it must be run as a separate process accessible
// during pipeline construction. The address of that process must be passed to
// the transforms in this package.
//
// The version of the expansion service should match the version of the Beam SDK
// being used. For numbered releases of Beam, these expansions services are
// released to the Maven repository as modules. For development versions of
// Beam, it is recommended to build and run it from source using Gradle.
//
// Current supported SDKs, including expansion service modules and reference
// documentation:
// * Java
// - Vendored Module: beam-sdks-java-io-expansion-service
// - Run via Gradle: ./gradlew :sdks:java:io:expansion-service:runExpansionService
// - Reference Class: org.apache.beam.sdk.io.kafka.KafkaIO
package kafkaio
// TODO(BEAM-12492): Implement an API for specifying Kafka type serializers and
// deserializers.
import (
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)
func init() {
beam.RegisterType(reflect.TypeOf((*readPayload)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*writePayload)(nil)).Elem())
}
type policy string
const (
ByteArrayDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
ByteArraySerializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
// ProcessingTime is a timestamp policy that assigns processing time to
// each record. Specifically, this is the timestamp when the record becomes
// "current" in the reader. Further documentation can be found in Java's
// KafkaIO documentation.
ProcessingTime policy = "ProcessingTime"
// CreateTime is a timestamp policy based on the CREATE_TIME timestamps of
// kafka records. Requires the records to have a type set to
// org.apache.kafka.common.record.TimestampTypeCREATE_TIME. Further
// documentation can be found in Java's KafkaIO documentation.
CreateTime policy = "CreateTime"
// LogAppendTime is a timestamp policy that assigns Kafka's log append time
// (server side ingestion time) to each record. Further documentation can
// be found in Java's KafkaIO documentation.
LogAppendTime policy = "LogAppendTime"
readURN = "beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
writeURN = "beam:transform:org.apache.beam:kafka_write:v1"
)
// Read is a cross-language PTransform which reads from Kafka and returns a
// KV pair for each item in the specified Kafka topics. By default, this runs
// as an unbounded transform and outputs keys and values as byte slices.
// These properties can be changed through optional parameters.
//
// Read requires the address for an expansion service for Kafka Read transforms,
// a comma-seperated list of bootstrap server addresses (see the Kafka property
// "bootstrap.servers" for details), and at least one topic to read from.
//
// Read also accepts optional parameters as readOptions. All optional parameters
// are predefined in this package as functions that return readOption. To set
// an optional parameter, call the function within Read's function signature.
//
// Example of Read with required and optional parameters:
//
// expansionAddr := "localhost:1234"
// bootstrapServer := "bootstrap-server:1234"
// topic := "topic_name"
// pcol := kafkaio.Read( s, expansionAddr, bootstrapServer, []string{topic},
// kafkaio.MaxNumRecords(100), kafkaio.CommitOffsetInFinalize(true))
func Read(s beam.Scope, addr string, servers string, topics []string, opts ...readOption) beam.PCollection {
s = s.Scope("kafkaio.Read")
if len(topics) == 0 {
panic("kafkaio.Read requires at least one topic to read from.")
}
rpl := readPayload{
ConsumerConfig: map[string]string{"bootstrap.servers": servers},
Topics: topics,
KeyDeserializer: ByteArrayDeserializer,
ValueDeserializer: ByteArrayDeserializer,
TimestampPolicy: string(ProcessingTime),
}
rcfg := readConfig{
pl: &rpl,
key: reflectx.ByteSlice,
val: reflectx.ByteSlice,
}
for _, opt := range opts {
opt(&rcfg)
}
pl := beam.CrossLanguagePayload(rpl)
outT := beam.UnnamedOutput(typex.NewKV(typex.New(rcfg.key), typex.New(rcfg.val)))
out := beam.CrossLanguage(s, readURN, pl, addr, nil, outT)
return out[beam.UnnamedOutputTag()]
}
type readOption func(*readConfig)
type readConfig struct {
pl *readPayload
key reflect.Type
val reflect.Type
}
// ConsumerConfigs is a Read option that adds consumer properties to the
// Consumer configuration of the transform. Each usage of this adds the given
// elements to the existing map without removing existing elements.
//
// Note that the "bootstrap.servers" property is automatically set by
// kafkaio.Read and does not need to be specified via this option.
func ConsumerConfigs(cfgs map[string]string) readOption {
return func(cfg *readConfig) {
for k, v := range cfgs {
cfg.pl.ConsumerConfig[k] = v
}
}
}
// StartReadTimestamp is a Read option that specifies a start timestamp in
// milliseconds epoch, so only records after that timestamp will be read.
//
// This results in failures if one or more partitions don't contain messages
// with a timestamp larger than or equal to the one specified, or if the
// message format version in a partition is before 0.10.0, meaning messages do
// not have timestamps.
func StartReadTimestamp(ts int64) readOption {
return func(cfg *readConfig) {
cfg.pl.StartReadTime = &ts
}
}
// MaxNumRecords is a Read option that specifies the maximum amount of records
// to be read. Setting this will cause the Read to execute as a bounded
// transform. Useful for tests tests and demo applications.
func MaxNumRecords(num int64) readOption {
return func(cfg *readConfig) {
cfg.pl.MaxNumRecords = &num
}
}
// MaxReadSecs is a Read option that specifies the maximum amount of time in
// seconds the transform executes. Setting this will cause the Read to execute
// as a bounded transform. Useful for tests and demo applications.
func MaxReadSecs(secs int64) readOption {
return func(cfg *readConfig) {
cfg.pl.MaxReadTime = &secs
}
}
// CommitOffsetInFinalize is a Read option that specifies whether to commit
// offsets when finalizing.
//
// Default: false
func CommitOffsetInFinalize(enabled bool) readOption {
return func(cfg *readConfig) {
cfg.pl.CommitOffsetInFinalize = enabled
}
}
// TimestampPolicy is a Read option that specifies the timestamp policy to use
// for extracting timestamps from the KafkaRecord. Must be one of the predefined
// constant timestamp policies in this package.
//
// Default: kafkaio.ProcessingTime
func TimestampPolicy(name policy) readOption {
return func(cfg *readConfig) {
cfg.pl.TimestampPolicy = string(name)
}
}
// readPayload should produce a schema matching the expected cross-language
// payload for Kafka reads. An example of this on the receiving end can be
// found in the Java SDK class
// org.apache.beam.sdk.io.kafka.KafkaIO.Read.External.Configuration.
type readPayload struct {
ConsumerConfig map[string]string
Topics []string
KeyDeserializer string
ValueDeserializer string
StartReadTime *int64
MaxNumRecords *int64
MaxReadTime *int64
CommitOffsetInFinalize bool
TimestampPolicy string
}
// Write is a cross-language PTransform which writes KV data to a specified
// Kafka topic. By default, this assumes keys and values to be received as
// byte slices. This can be changed through optional parameters.
//
// Write requires the address for an expansion service for Kafka Write
// transforms, a comma-seperated list of bootstrap server addresses (see the
// Kafka property "bootstrap.servers" for details), and a topic to write to.
//
// Write also accepts optional parameters as writeOptions. All optional
// parameters are predefined in this package as functions that return
// writeOption. To set an optional parameter, call the function within Write's
// function signature.
//
// Example of Write with required and optional parameters:
//
// expansionAddr := "localhost:1234"
// bootstrapServer := "bootstrap-server:1234"
// topic := "topic_name"
// pcol := kafkaio.Read(s, expansionAddr, bootstrapServer, topic,
// kafkaio.ValueSerializer("foo.BarSerializer"))
func Write(s beam.Scope, addr, servers, topic string, col beam.PCollection, opts ...writeOption) {
s = s.Scope("kafkaio.Write")
wpl := writePayload{
ProducerConfig: map[string]string{"bootstrap.servers": servers},
Topic: topic,
KeySerializer: ByteArraySerializer,
ValueSerializer: ByteArraySerializer,
}
for _, opt := range opts {
opt(&wpl)
}
pl := beam.CrossLanguagePayload(wpl)
beam.CrossLanguage(s, writeURN, pl, addr, beam.UnnamedInput(col), nil)
}
type writeOption func(*writePayload)
// ProducerConfigs is a Write option that adds producer properties to the
// Producer configuration of the transform. Each usage of this adds the given
// elements to the existing map without removing existing elements.
//
// Note that the "bootstrap.servers" property is automatically set by
// kafkaio.Write and does not need to be specified via this option.
func ProducerConfigs(cfgs map[string]string) writeOption {
return func(pl *writePayload) {
for k, v := range cfgs {
pl.ProducerConfig[k] = v
}
}
}
// writePayload should produce a schema matching the expected cross-language
// payload for Kafka writes. An example of this on the receiving end can be
// found in the Java SDK class
// org.apache.beam.sdk.io.kafka.KafkaIO.Write.External.Configuration.
type writePayload struct {
ProducerConfig map[string]string
Topic string
KeySerializer string
ValueSerializer string
}