-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
stringsplit.go
233 lines (211 loc) · 8.83 KB
/
stringsplit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// An example of using a Splittable DoFn in the Go SDK with a portable runner.
//
// The following instructions describe how to execute this example in the
// Flink local runner.
//
// 1. From a command line, navigate to the top-level beam/ directory and run
// the Flink job server:
// ./gradlew :runners:flink:1.13:job-server:runShadow -Djob-host=localhost -Dflink-master=local
//
// 2. The job server is ready to receive jobs once it outputs a log like the
// following: `JobService started on localhost:8099`. Take note of the endpoint
// in that log message.
//
// 3. While the job server is running in one command line window, create a
// second one in the same directory and run this example with the following
// command, using the endpoint you noted from step 2:
// go run sdks/go/examples/stringsplit/stringsplit.go --runner=universal --endpoint=localhost:8099
//
// 4. Once the pipeline is complete, the job server can be closed with ctrl+C.
// To check the output of the pipeline, search the job server logs for the
// phrase "StringSplit Output".
package main
import (
"context"
"flag"
"reflect"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
beam.RegisterType(reflect.TypeOf((*StringSplitFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*LogFn)(nil)).Elem())
}
// StringSplitFn is a Splittable DoFn that splits strings into substrings of the
// specified size (for example, to be able to fit them in a small buffer).
// See ProcessElement for more details.
type StringSplitFn struct {
BufSize int64
}
// CreateInitialRestriction creates an offset range restriction for each element
// with the size of the restriction corresponding to the length of the string.
func (fn *StringSplitFn) CreateInitialRestriction(s string) offsetrange.Restriction {
rest := offsetrange.Restriction{Start: 0, End: int64(len(s))}
log.Debugf(context.Background(), "StringSplit CreateInitialRestriction: %v", rest)
return rest
}
// SplitRestriction performs initial splits so that each restriction is split
// into 5.
func (fn *StringSplitFn) SplitRestriction(_ string, rest offsetrange.Restriction) []offsetrange.Restriction {
splits := rest.EvenSplits(5)
log.Debugf(context.Background(), "StringSplit SplitRestrictions: %v -> %v", rest, splits)
return splits
}
// RestrictionSize returns the size as the difference between the restriction's
// start and end.
func (fn *StringSplitFn) RestrictionSize(_ string, rest offsetrange.Restriction) float64 {
size := rest.Size()
log.Debugf(context.Background(), "StringSplit RestrictionSize: %v -> %v", rest, size)
return size
}
// CreateTracker creates an offset range restriction tracker out of the offset
// range restriction, and wraps it a thread-safe restriction tracker.
func (fn *StringSplitFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}
// ProcessElement splits a string into substrings of a specified size (set in
// StringSplitFn.BufSize).
//
// Note that the substring blocks are not guaranteed to line up with the
// restriction boundaries. ProcessElement is expected to emit any substring
// block that begins in its restriction, even if it extends past the end of the
// restriction.
//
// Example: If BufSize is 100, then a restriction of 75 to 325 should emit the
// following substrings: [100, 200], [200, 300], [300, 400]
func (fn *StringSplitFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, elem string, emit func(string)) {
log.Debugf(ctx, "StringSplit ProcessElement: Tracker = %v", rt)
i := rt.GetRestriction().(offsetrange.Restriction).Start
if rem := i % fn.BufSize; rem != 0 {
i += fn.BufSize - rem // Skip to next multiple of BufSize.
}
strEnd := int64(len(elem))
for rt.TryClaim(i) == true {
if i+fn.BufSize > strEnd {
emit(elem[i:])
} else {
emit(elem[i : i+fn.BufSize])
}
i += fn.BufSize
}
}
// LogFn is a DoFn to log our split output.
type LogFn struct{}
// ProcessElement logs each element it receives.
func (fn *LogFn) ProcessElement(ctx context.Context, in string) {
log.Infof(ctx, "StringSplit Output:\n%v", in)
}
// FinishBundle waits a bit so the job server finishes receiving logs.
func (fn *LogFn) FinishBundle() {
time.Sleep(2 * time.Second)
}
// Use our StringSplitFn to split Shakespeare monologues into substrings and
// output them.
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
monologues := beam.Create(s, macbeth, juliet, helena)
split := beam.ParDo(s, &StringSplitFn{50}, monologues)
beam.ParDo0(s, &LogFn{}, split)
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "Failed to execute job: %v", err)
}
}
var macbeth = `Is this a dagger which I see before me,
The handle toward my hand? Come, let me clutch thee.
I have thee not, and yet I see thee still.
Art thou not, fatal vision, sensible
To feeling as to sight? or art thou but
A dagger of the mind, a false creation,
Proceeding from the heat-oppressed brain?
I see thee yet, in form as palpable
As this which now I draw.
Thou marshall'st me the way that I was going;
And such an instrument I was to use.
Mine eyes are made the fools o' the other senses,
Or else worth all the rest; I see thee still,
And on thy blade and dudgeon gouts of blood,
Which was not so before. There's no such thing:
It is the bloody business which informs
Thus to mine eyes. Now o'er the one halfworld
Nature seems dead, and wicked dreams abuse
The curtain'd sleep; witchcraft celebrates
Pale Hecate's offerings, and wither'd murder,
Alarum'd by his sentinel, the wolf,
Whose howl's his watch, thus with his stealthy pace.
With Tarquin's ravishing strides, towards his design
Moves like a ghost. Thou sure and firm-set earth,
Hear not my steps, which way they walk, for fear
Thy very stones prate of my whereabout,
And take the present horror from the time,
Which now suits with it. Whiles I threat, he lives:
Words to the heat of deeds too cold breath gives.
[A bell rings]
I go, and it is done; the bell invites me.
Hear it not, Duncan; for it is a knell
That summons thee to heaven or to hell.`
var juliet = `O Romeo, Romeo! wherefore art thou Romeo?
Deny thy father and refuse thy name;
Or, if thou wilt not, be but sworn my love,
And I'll no longer be a Capulet.
'Tis but thy name that is my enemy;
Thou art thyself, though not a Montague.
What's Montague? it is nor hand, nor foot,
Nor arm, nor face, nor any other part
Belonging to a man. O, be some other name!
What's in a name? that which we call a rose
By any other name would smell as sweet;
So Romeo would, were he not Romeo call'd,
Retain that dear perfection which he owes
Without that title. Romeo, doff thy name,
And for that name which is no part of thee
Take all myself.`
var helena = `Lo, she is one of this confederacy!
Now I perceive they have conjoin'd all three
To fashion this false sport, in spite of me.
Injurious Hermia! most ungrateful maid!
Have you conspired, have you with these contrived
To bait me with this foul derision?
Is all the counsel that we two have shared,
The sisters' vows, the hours that we have spent,
When we have chid the hasty-footed time
For parting us,--O, is it all forgot?
All school-days' friendship, childhood innocence?
We, Hermia, like two artificial gods,
Have with our needles created both one flower,
Both on one sampler, sitting on one cushion,
Both warbling of one song, both in one key,
As if our hands, our sides, voices and minds,
Had been incorporate. So we grow together,
Like to a double cherry, seeming parted,
But yet an union in partition;
Two lovely berries moulded on one stem;
So, with two seeming bodies, but one heart;
Two of the first, like coats in heraldry,
Due but to one and crowned with one crest.
And will you rent our ancient love asunder,
To join with men in scorning your poor friend?
It is not friendly, 'tis not maidenly:
Our sex, as well as I, may chide you for it,
Though I alone do feel the injury.`