/
TextIO.java
1315 lines (1135 loc) · 50.9 KB
/
TextIO.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static org.apache.commons.compress.utils.CharsetNames.UTF_8;
import com.google.auto.value.AutoValue;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
/**
* {@link PTransform}s for reading and writing text files.
*
* <h2>Reading text files</h2>
*
* <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
* instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
* file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link
* PCollection} you can use {@link FileIO} to match them and {@link TextIO#readFiles} to read them.
*
* <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to
* one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n', or
* specified delimiter see {@link TextIO.Read#withDelimiter}).
*
* <h3>Filepattern expansion and watching</h3>
*
* <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the
* combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link
* #readFiles()} allow streaming of new files matching the filepattern(s).
*
* <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link #readFiles()}
* allows them in case the filepattern contains a glob wildcard character. Use {@link
* Read#withEmptyMatchTreatment} or {@link
* FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link #readFiles()} to configure
* this behavior.
*
* <p>Example 1: reading a file or filepattern.
*
* <pre>{@code
* Pipeline p = ...;
*
* // A simple Read of a local file (only runs locally):
* PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
* }</pre>
*
* <p>Example 2: reading a PCollection of filenames.
*
* <pre>{@code
* Pipeline p = ...;
*
* // E.g. the filenames might be computed from other data in the pipeline, or
* // read from a data source.
* PCollection<String> filenames = ...;
*
* // Read all files in the collection.
* PCollection<String> lines =
* filenames
* .apply(FileIO.matchAll())
* .apply(FileIO.readMatches())
* .apply(TextIO.readFiles());
* }</pre>
*
* <p>Example 3: streaming new files matching a filepattern.
*
* <pre>{@code
* Pipeline p = ...;
*
* PCollection<String> lines = p.apply(TextIO.read()
* .from("/local/path/to/files/*")
* .watchForNewFiles(
* // Check for new files every minute
* Duration.standardMinutes(1),
* // Stop watching the filepattern if no new files appear within an hour
* afterTimeSinceNewOutput(Duration.standardHours(1))));
* }</pre>
*
* <h3>Reading a very large number of files</h3>
*
* <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
* thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
* scalability. Note that it may decrease performance if the filepattern matches only a small number
* of files.
*
* <h2>Writing text files</h2>
*
* <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
* {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
*
* <p>For example:
*
* <pre>{@code
* // A simple Write to a local file (only runs locally):
* PCollection<String> lines = ...;
* lines.apply(TextIO.write().to("/path/to/file.txt"));
*
* // Same as above, only with Gzip compression:
* PCollection<String> lines = ...;
* lines.apply(TextIO.write().to("/path/to/file.txt"))
* .withSuffix(".txt")
* .withCompression(Compression.GZIP));
* }</pre>
*
* <p>Any existing files with the same names as generated output files will be overwritten.
*
* <p>If you want better control over how filenames are generated than the default policy allows, a
* custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
*
* <h3>Advanced features</h3>
*
* <p>{@link TextIO} supports all features of {@link FileIO#write} and {@link FileIO#writeDynamic},
* such as writing windowed/unbounded data, writing data to multiple destinations, and so on, by
* providing a {@link Sink} via {@link #sink()}.
*
* <p>For example, to write events of different type to different filenames:
*
* <pre>{@code
* PCollection<Event> events = ...;
* events.apply(FileIO.<EventType, Event>writeDynamic()
* .by(Event::getTypeName)
* .via(TextIO.sink(), Event::toString)
* .to(type -> nameFilesUsingWindowPaneAndShard(".../events/" + type + "/data", ".txt")));
* }</pre>
*
* <p>For backwards compatibility, {@link TextIO} also supports the legacy {@link
* DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}.
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class TextIO {
private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
/**
* A {@link PTransform} that reads from one or more text files and returns a bounded {@link
* PCollection} containing one element for each line of the input files.
*/
public static Read read() {
return new AutoValue_TextIO_Read.Builder()
.setCompression(Compression.AUTO)
.setHintMatchesManyFiles(false)
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
.build();
}
/**
* A {@link PTransform} that works like {@link #read}, but reads each file in a {@link
* PCollection} of filepatterns.
*
* <p>Can be applied to both bounded and unbounded {@link PCollection PCollections}, so this is
* suitable for reading a {@link PCollection} of filepatterns arriving as a stream. However, every
* filepattern is expanded once at the moment it is processed, rather than watched for new files
* matching the filepattern to appear. Likewise, every file is read once, rather than watched for
* new entries.
*
* @deprecated You can achieve The functionality of {@link #readAll()} using {@link FileIO}
* matching plus {@link #readFiles()}. This is the preferred method to make composition
* explicit. {@link ReadAll} will not receive upgrades and will be removed in a future version
* of Beam.
*/
@Deprecated
public static ReadAll readAll() {
return new AutoValue_TextIO_ReadAll.Builder()
.setCompression(Compression.AUTO)
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
.build();
}
/**
* Like {@link #read}, but reads each file in a {@link PCollection} of {@link
* FileIO.ReadableFile}, returned by {@link FileIO#readMatches}.
*/
public static ReadFiles readFiles() {
return new AutoValue_TextIO_ReadFiles.Builder()
// 64MB is a reasonable value that allows to amortize the cost of opening files,
// but is not so large as to exhaust a typical runner's maximum amount of output per
// ProcessElement call.
.setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
.build();
}
/**
* A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
* matching a sharding pattern), with each element of the input collection encoded into its own
* line.
*/
public static Write write() {
return new TextIO.Write();
}
/**
* A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
* matching a sharding pattern), with each element of the input collection encoded into its own
* line.
*
* <p>This version allows you to apply {@link TextIO} writes to a PCollection of a custom type
* {@link UserT}. A format mechanism that converts the input type {@link UserT} to the String that
* will be written to the file must be specified. If using a custom {@link DynamicDestinations}
* object this is done using {@link DynamicDestinations#formatRecord}, otherwise the {@link
* TypedWrite#withFormatFunction} can be used to specify a format function.
*
* <p>The advantage of using a custom type is that is it allows a user-provided {@link
* DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the
* custom type when choosing a destination.
*/
public static <UserT> TypedWrite<UserT, Void> writeCustomType() {
return new AutoValue_TextIO_TypedWrite.Builder<UserT, Void>()
.setFilenamePrefix(null)
.setTempDirectory(null)
.setShardTemplate(null)
.setFilenameSuffix(null)
.setFilenamePolicy(null)
.setDynamicDestinations(null)
.setDelimiter(new char[] {'\n'})
.setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
.setWindowedWrites(false)
.setNumShards(0)
.setNoSpilling(false)
.build();
}
/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
abstract @Nullable ValueProvider<String> getFilepattern();
abstract MatchConfiguration getMatchConfiguration();
abstract boolean getHintMatchesManyFiles();
abstract Compression getCompression();
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract byte @Nullable [] getDelimiter();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setFilepattern(ValueProvider<String> filepattern);
abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration);
abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
abstract Builder setCompression(Compression compression);
abstract Builder setDelimiter(byte @Nullable [] delimiter);
abstract Read build();
}
/**
* Reads text files that reads from the file(s) with the given filename or filename pattern.
*
* <p>This can be a local path (if running locally), or a Google Cloud Storage filename or
* filename pattern of the form {@code "gs://<bucket>/<filepath>"} (if running locally or using
* remote execution service).
*
* <p>Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java
* Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
*
* <p>If it is known that the filepattern will match a very large number of files (at least tens
* of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
*/
public Read from(String filepattern) {
checkArgument(filepattern != null, "filepattern can not be null");
return from(StaticValueProvider.of(filepattern));
}
/** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */
public Read from(ValueProvider<String> filepattern) {
checkArgument(filepattern != null, "filepattern can not be null");
return toBuilder().setFilepattern(filepattern).build();
}
/** Sets the {@link MatchConfiguration}. */
public Read withMatchConfiguration(MatchConfiguration matchConfiguration) {
return toBuilder().setMatchConfiguration(matchConfiguration).build();
}
/** @deprecated Use {@link #withCompression}. */
@Deprecated
public Read withCompressionType(TextIO.CompressionType compressionType) {
return withCompression(compressionType.canonical);
}
/**
* Reads from input sources using the specified compression type.
*
* <p>If no compression type is specified, the default is {@link Compression#AUTO}.
*/
public Read withCompression(Compression compression) {
return toBuilder().setCompression(compression).build();
}
/**
* See {@link MatchConfiguration#continuously}.
*
* <p>This works only in runners supporting splittable {@link
* org.apache.beam.sdk.transforms.DoFn}.
*/
public Read watchForNewFiles(
Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
return withMatchConfiguration(
getMatchConfiguration().continuously(pollInterval, terminationCondition));
}
/**
* Hints that the filepattern specified in {@link #from(String)} matches a very large number of
* files.
*
* <p>This hint may cause a runner to execute the transform differently, in a way that improves
* performance for this case, but it may worsen performance if the filepattern matches only a
* small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
* happen less efficiently within individual files).
*/
public Read withHintMatchesManyFiles() {
return toBuilder().setHintMatchesManyFiles(true).build();
}
/** See {@link MatchConfiguration#withEmptyMatchTreatment}. */
public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
}
/** Set the custom delimiter to be used in place of the default ones ('\r', '\n' or '\r\n'). */
public Read withDelimiter(byte[] delimiter) {
checkArgument(delimiter != null, "delimiter can not be null");
checkArgument(!isSelfOverlapping(delimiter), "delimiter must not self-overlap");
return toBuilder().setDelimiter(delimiter).build();
}
static boolean isSelfOverlapping(byte[] s) {
// s self-overlaps if v exists such as s = vu = wv with u and w non empty
for (int i = 1; i < s.length - 1; ++i) {
if (ByteBuffer.wrap(s, 0, i).equals(ByteBuffer.wrap(s, s.length - i, i))) {
return true;
}
}
return false;
}
@Override
public PCollection<String> expand(PBegin input) {
checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
}
// All other cases go through FileIO + ReadFiles
return input
.apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
.apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
.apply(
"Read Matches",
FileIO.readMatches()
.withCompression(getCompression())
.withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
.apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
}
// Helper to create a source specific to the requested compression type.
protected FileBasedSource<String> getSource() {
return CompressedSource.from(
new TextSource(
getFilepattern(),
getMatchConfiguration().getEmptyMatchTreatment(),
getDelimiter()))
.withCompression(getCompression());
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add(
DisplayData.item("compressionType", getCompression().toString())
.withLabel("Compression Type"))
.addIfNotNull(DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern"))
.include("matchConfiguration", getMatchConfiguration())
.addIfNotNull(
DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
.withLabel("Custom delimiter to split records"));
}
}
/////////////////////////////////////////////////////////////////////////////
/**
* Implementation of {@link #readAll}.
*
* @deprecated See {@link #readAll()} for details.
*/
@Deprecated
@AutoValue
public abstract static class ReadAll
extends PTransform<PCollection<String>, PCollection<String>> {
abstract MatchConfiguration getMatchConfiguration();
abstract Compression getCompression();
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract byte @Nullable [] getDelimiter();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration);
abstract Builder setCompression(Compression compression);
abstract Builder setDelimiter(byte @Nullable [] delimiter);
abstract ReadAll build();
}
/** Sets the {@link MatchConfiguration}. */
public ReadAll withMatchConfiguration(MatchConfiguration configuration) {
return toBuilder().setMatchConfiguration(configuration).build();
}
/** @deprecated Use {@link #withCompression}. */
@Deprecated
public ReadAll withCompressionType(TextIO.CompressionType compressionType) {
return withCompression(compressionType.canonical);
}
/**
* Reads from input sources using the specified compression type.
*
* <p>If no compression type is specified, the default is {@link Compression#AUTO}.
*/
public ReadAll withCompression(Compression compression) {
return toBuilder().setCompression(compression).build();
}
/** Same as {@link Read#withEmptyMatchTreatment}. */
public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
}
/** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */
public ReadAll watchForNewFiles(
Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
return withMatchConfiguration(
getMatchConfiguration().continuously(pollInterval, terminationCondition));
}
ReadAll withDelimiter(byte[] delimiter) {
return toBuilder().setDelimiter(delimiter).build();
}
@Override
public PCollection<String> expand(PCollection<String> input) {
return input
.apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
.apply(
FileIO.readMatches()
.withCompression(getCompression())
.withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
.apply(readFiles().withDelimiter(getDelimiter()));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add(
DisplayData.item("compressionType", getCompression().toString())
.withLabel("Compression Type"))
.addIfNotNull(
DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
.withLabel("Custom delimiter to split records"))
.include("matchConfiguration", getMatchConfiguration());
}
}
/** Implementation of {@link #readFiles}. */
@AutoValue
public abstract static class ReadFiles
extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<String>> {
abstract long getDesiredBundleSizeBytes();
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract byte @Nullable [] getDelimiter();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
abstract Builder setDelimiter(byte @Nullable [] delimiter);
abstract ReadFiles build();
}
@VisibleForTesting
ReadFiles withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
}
/** Like {@link Read#withDelimiter}. */
public ReadFiles withDelimiter(byte[] delimiter) {
return toBuilder().setDelimiter(delimiter).build();
}
@Override
public PCollection<String> expand(PCollection<FileIO.ReadableFile> input) {
return input.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
getDesiredBundleSizeBytes(),
new CreateTextSourceFn(getDelimiter()),
StringUtf8Coder.of()));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.addIfNotNull(
DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
.withLabel("Custom delimiter to split records"));
}
private static class CreateTextSourceFn
implements SerializableFunction<String, FileBasedSource<String>> {
private byte[] delimiter;
private CreateTextSourceFn(byte[] delimiter) {
this.delimiter = delimiter;
}
@Override
public FileBasedSource<String> apply(String input) {
return new TextSource(
StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter);
}
}
}
// ///////////////////////////////////////////////////////////////////////////
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class TypedWrite<UserT, DestinationT>
extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
/** The prefix of each file written, combined with suffix and shardTemplate. */
abstract @Nullable ValueProvider<ResourceId> getFilenamePrefix();
/** The suffix of each file written, combined with prefix and shardTemplate. */
abstract @Nullable String getFilenameSuffix();
/** The base directory used for generating temporary files. */
abstract @Nullable ValueProvider<ResourceId> getTempDirectory();
/** The delimiter between string records. */
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract char[] getDelimiter();
/** An optional header to add to each file. */
abstract @Nullable String getHeader();
/** An optional footer to add to each file. */
abstract @Nullable String getFooter();
/** Requested number of shards. 0 for automatic. */
abstract int getNumShards();
/** The shard template of each file written, combined with prefix and suffix. */
abstract @Nullable String getShardTemplate();
/** A policy for naming output files. */
abstract @Nullable FilenamePolicy getFilenamePolicy();
/** Allows for value-dependent {@link DynamicDestinations} to be vended. */
abstract @Nullable DynamicDestinations<UserT, DestinationT, String> getDynamicDestinations();
/** A destination function for using {@link DefaultFilenamePolicy}. */
abstract @Nullable SerializableFunction<UserT, Params> getDestinationFunction();
/** A default destination for empty PCollections. */
abstract @Nullable Params getEmptyDestination();
/** A function that converts UserT to a String, for writing to the file. */
abstract @Nullable SerializableFunction<UserT, String> getFormatFunction();
/** Whether to write windowed output files. */
abstract boolean getWindowedWrites();
/** Whether to skip the spilling of data caused by having maxNumWritersPerBundle. */
abstract boolean getNoSpilling();
/**
* The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
* {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
*/
abstract WritableByteChannelFactory getWritableByteChannelFactory();
abstract Builder<UserT, DestinationT> toBuilder();
@AutoValue.Builder
abstract static class Builder<UserT, DestinationT> {
abstract Builder<UserT, DestinationT> setFilenamePrefix(
@Nullable ValueProvider<ResourceId> filenamePrefix);
abstract Builder<UserT, DestinationT> setTempDirectory(
@Nullable ValueProvider<ResourceId> tempDirectory);
abstract Builder<UserT, DestinationT> setShardTemplate(@Nullable String shardTemplate);
abstract Builder<UserT, DestinationT> setFilenameSuffix(@Nullable String filenameSuffix);
abstract Builder<UserT, DestinationT> setHeader(@Nullable String header);
abstract Builder<UserT, DestinationT> setFooter(@Nullable String footer);
abstract Builder<UserT, DestinationT> setDelimiter(char[] delimiter);
abstract Builder<UserT, DestinationT> setFilenamePolicy(
@Nullable FilenamePolicy filenamePolicy);
abstract Builder<UserT, DestinationT> setDynamicDestinations(
@Nullable DynamicDestinations<UserT, DestinationT, String> dynamicDestinations);
abstract Builder<UserT, DestinationT> setDestinationFunction(
@Nullable SerializableFunction<UserT, Params> destinationFunction);
abstract Builder<UserT, DestinationT> setEmptyDestination(Params emptyDestination);
abstract Builder<UserT, DestinationT> setFormatFunction(
@Nullable SerializableFunction<UserT, String> formatFunction);
abstract Builder<UserT, DestinationT> setNumShards(int numShards);
abstract Builder<UserT, DestinationT> setWindowedWrites(boolean windowedWrites);
abstract Builder<UserT, DestinationT> setNoSpilling(boolean noSpilling);
abstract Builder<UserT, DestinationT> setWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory);
abstract TypedWrite<UserT, DestinationT> build();
}
/**
* Writes to text files with the given prefix. The given {@code prefix} can reference any {@link
* FileSystem} on the classpath. This prefix is used by the {@link DefaultFilenamePolicy} to
* generate filenames.
*
* <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
* to define the base output directory and file prefix, a shard identifier (see {@link
* #withNumShards(int)}), and a common suffix (if supplied using {@link #withSuffix(String)}).
*
* <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case
* {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set.
* Custom filename policies do not automatically see this prefix - you should explicitly pass
* the prefix into your {@link FilenamePolicy} object if you need this.
*
* <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to
* infer a directory for temporary files.
*/
public TypedWrite<UserT, DestinationT> to(String filenamePrefix) {
return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
}
/** Like {@link #to(String)}. */
@Experimental(Kind.FILESYSTEM)
public TypedWrite<UserT, DestinationT> to(ResourceId filenamePrefix) {
return toResource(StaticValueProvider.of(filenamePrefix));
}
/** Like {@link #to(String)}. */
public TypedWrite<UserT, DestinationT> to(ValueProvider<String> outputPrefix) {
return toResource(
NestedValueProvider.of(outputPrefix, FileBasedSink::convertToFileResourceIfPossible));
}
/**
* Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
* directory for temporary files must be specified using {@link #withTempDirectory}.
*/
public TypedWrite<UserT, DestinationT> to(FilenamePolicy filenamePolicy) {
return toBuilder().setFilenamePolicy(filenamePolicy).build();
}
/**
* Use a {@link DynamicDestinations} object to vend {@link FilenamePolicy} objects. These
* objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
* temporary files must be specified using {@link #withTempDirectory}.
*
* @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()}
* instead.
*/
@Deprecated
public <NewDestinationT> TypedWrite<UserT, NewDestinationT> to(
DynamicDestinations<UserT, NewDestinationT, String> dynamicDestinations) {
return (TypedWrite)
toBuilder().setDynamicDestinations((DynamicDestinations) dynamicDestinations).build();
}
/**
* Write to dynamic destinations using the default filename policy. The destinationFunction maps
* the input record to a {@link DefaultFilenamePolicy.Params} object that specifies where the
* records should be written (base filename, file suffix, and shard template). The
* emptyDestination parameter specified where empty files should be written for when the written
* {@link PCollection} is empty.
*
* @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()}
* instead.
*/
@Deprecated
public TypedWrite<UserT, Params> to(
SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
return (TypedWrite)
toBuilder()
.setDestinationFunction(destinationFunction)
.setEmptyDestination(emptyDestination)
.build();
}
/** Like {@link #to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
public TypedWrite<UserT, DestinationT> toResource(ValueProvider<ResourceId> filenamePrefix) {
return toBuilder().setFilenamePrefix(filenamePrefix).build();
}
/**
* Specifies a format function to convert {@link UserT} to the output type. If {@link
* #to(DynamicDestinations)} is used, {@link DynamicDestinations#formatRecord(Object)} must be
* used instead.
*
* @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()}
* instead.
*/
@Deprecated
public TypedWrite<UserT, DestinationT> withFormatFunction(
@Nullable SerializableFunction<UserT, String> formatFunction) {
return toBuilder().setFormatFunction(formatFunction).build();
}
/** Set the base directory used to generate temporary files. */
@Experimental(Kind.FILESYSTEM)
public TypedWrite<UserT, DestinationT> withTempDirectory(
ValueProvider<ResourceId> tempDirectory) {
return toBuilder().setTempDirectory(tempDirectory).build();
}
/** Set the base directory used to generate temporary files. */
@Experimental(Kind.FILESYSTEM)
public TypedWrite<UserT, DestinationT> withTempDirectory(ResourceId tempDirectory) {
return withTempDirectory(StaticValueProvider.of(tempDirectory));
}
/**
* Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
* used when using one of the default filename-prefix to() overrides - i.e. not when using
* either {@link #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}.
*
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
public TypedWrite<UserT, DestinationT> withShardNameTemplate(String shardTemplate) {
return toBuilder().setShardTemplate(shardTemplate).build();
}
/**
* Configures the filename suffix for written files. This option may only be used when using one
* of the default filename-prefix to() overrides - i.e. not when using either {@link
* #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}.
*
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
public TypedWrite<UserT, DestinationT> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
}
/**
* Configures the number of output shards produced overall (when using unwindowed writes) or
* per-window (when using windowed writes).
*
* <p>For unwindowed writes, constraining the number of shards is likely to reduce the
* performance of a pipeline. Setting this value is not recommended unless you require a
* specific number of output files.
*
* @param numShards the number of shards to use, or 0 to let the system decide.
*/
public TypedWrite<UserT, DestinationT> withNumShards(int numShards) {
checkArgument(numShards >= 0);
return toBuilder().setNumShards(numShards).build();
}
/**
* Forces a single file as output and empty shard name template.
*
* <p>For unwindowed writes, constraining the number of shards is likely to reduce the
* performance of a pipeline. Setting this value is not recommended unless you require a
* specific number of output files.
*
* <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
*/
public TypedWrite<UserT, DestinationT> withoutSharding() {
return withNumShards(1).withShardNameTemplate("");
}
/**
* Specifies the delimiter after each string written.
*
* <p>Defaults to '\n'.
*/
public TypedWrite<UserT, DestinationT> withDelimiter(char[] delimiter) {
return toBuilder().setDelimiter(delimiter).build();
}
/**
* Adds a header string to each file. A newline after the header is added automatically.
*
* <p>A {@code null} value will clear any previously configured header.
*/
public TypedWrite<UserT, DestinationT> withHeader(@Nullable String header) {
return toBuilder().setHeader(header).build();
}
/**
* Adds a footer string to each file. A newline after the footer is added automatically.
*
* <p>A {@code null} value will clear any previously configured footer.
*/
public TypedWrite<UserT, DestinationT> withFooter(@Nullable String footer) {
return toBuilder().setFooter(footer).build();
}
/**
* Returns a transform for writing to text files like this one but that has the given {@link
* WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
* default is value is {@link Compression#UNCOMPRESSED}.
*
* <p>A {@code null} value will reset the value to the default value mentioned above.
*/
public TypedWrite<UserT, DestinationT> withWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory) {
return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
}
/**
* Returns a transform for writing to text files like this one but that compresses output using
* the given {@link Compression}. The default value is {@link Compression#UNCOMPRESSED}.
*/
public TypedWrite<UserT, DestinationT> withCompression(Compression compression) {
checkArgument(compression != null, "compression can not be null");
return withWritableByteChannelFactory(
FileBasedSink.CompressionType.fromCanonical(compression));
}
/**
* Preserves windowing of input elements and writes them to files based on the element's window.
*
* <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
* {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
*/
public TypedWrite<UserT, DestinationT> withWindowedWrites() {
return toBuilder().setWindowedWrites(true).build();
}
/** See {@link WriteFiles#withNoSpilling()}. */
public TypedWrite<UserT, DestinationT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}
private DynamicDestinations<UserT, DestinationT, String> resolveDynamicDestinations() {
DynamicDestinations<UserT, DestinationT, String> dynamicDestinations =
getDynamicDestinations();
if (dynamicDestinations == null) {
if (getDestinationFunction() != null) {
// In this case, DestinationT == Params
dynamicDestinations =
(DynamicDestinations)
DynamicFileDestinations.toDefaultPolicies(
getDestinationFunction(), getEmptyDestination(), getFormatFunction());
} else {
// In this case, DestinationT == Void
FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
if (usedFilenamePolicy == null) {
usedFilenamePolicy =
DefaultFilenamePolicy.fromStandardParameters(
getFilenamePrefix(),
getShardTemplate(),
getFilenameSuffix(),
getWindowedWrites());
}
dynamicDestinations =
(DynamicDestinations)
DynamicFileDestinations.constant(usedFilenamePolicy, getFormatFunction());
}
}
return dynamicDestinations;
}
@Override
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
checkState(
getFilenamePrefix() != null || getTempDirectory() != null,
"Need to set either the filename prefix or the tempDirectory of a TextIO.Write "
+ "transform.");
List<?> allToArgs =
Lists.newArrayList(
getFilenamePolicy(),
getDynamicDestinations(),
getFilenamePrefix(),
getDestinationFunction());
checkArgument(
1
== Iterables.size(
allToArgs.stream()
.filter(Predicates.notNull()::apply)
.collect(Collectors.toList())),
"Exactly one of filename policy, dynamic destinations, filename prefix, or destination "
+ "function must be set");
if (getDynamicDestinations() != null) {
checkArgument(
getFormatFunction() == null,
"A format function should not be specified "
+ "with DynamicDestinations. Use DynamicDestinations.formatRecord instead");
}
if (getFilenamePolicy() != null || getDynamicDestinations() != null) {
checkState(
getShardTemplate() == null && getFilenameSuffix() == null,
"shardTemplate and filenameSuffix should only be used with the default "
+ "filename policy");
}
ValueProvider<ResourceId> tempDirectory = getTempDirectory();
if (tempDirectory == null) {
tempDirectory = getFilenamePrefix();
}
WriteFiles<UserT, DestinationT, String> write =
WriteFiles.to(
new TextSink<>(
tempDirectory,
resolveDynamicDestinations(),