-
Notifications
You must be signed in to change notification settings - Fork 13k
/
BucketingSink.java
1140 lines (979 loc) · 41.2 KB
/
BucketingSink.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.fs.bucketing;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.fs.Clock;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.Iterator;
/**
* Sink that emits its input elements to {@link FileSystem} files within
* buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics.
*
* <p>
* When creating the sink a {@code basePath} must be specified. The base directory contains
* one directory for every bucket. The bucket directories themselves contain several part files,
* one for each parallel subtask of the sink. These part files contain the actual output data.
*
* <p>
* The sink uses a {@link Bucketer} to determine in which bucket directory each element should
* be written to inside the base directory. The {@code Bucketer} can, for example, use time or
* a property of the element to determine the bucket directory. The default {@code Bucketer} is a
* {@link DateTimeBucketer} which will create one new bucket every hour. You can specify
* a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. For example, use the
* {@link BasePathBucketer} if you don't want to have buckets but still want to write part-files
* in a fault-tolerant way.
*
* <p>
* The filenames of the part files contain the part prefix, the parallel subtask index of the sink
* and a rolling counter. For example the file {@code "part-1-17"} contains the data from
* {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. Per default
* the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}.
* When a part file becomes bigger than the user-specified batch size the current part file is closed,
* the part counter is increased and a new part file is created. The batch size defaults to {@code 384MB},
* this can be configured using {@link #setBatchSize(long)}.
*
* <p>
* In some scenarios, the open buckets are required to change based on time. In these cases, the sink
* needs to determine when a bucket has become inactive, in order to flush and close the part file.
* To support this there are two configurable settings:
* <ol>
* <li>the frequency to check for inactive buckets, configured by {@link #setInactiveBucketCheckInterval(long)},
* and</li>
* <li>the minimum amount of time a bucket has to not receive any data before it is considered inactive,
* configured by {@link #setInactiveBucketThreshold(long)}</li>
* </ol>
* Both of these parameters default to {@code 60,000 ms}, or {@code 1 min}.
*
* <p>
* Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
* pending files will be moved to {@code finished}.
*
* <p>
* If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
* had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending}
* state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that
* they do not contain data that arrived after the checkpoint from which we restore. If the {@code FileSystem} supports
* the {@code truncate()} method this will be used to reset the file back to its previous state. If not, a special
* file with the same name as the part file and the suffix {@code ".valid-length"} will be created that contains the
* length up to which the file contains valid data. When reading the file, it must be ensured that it is only read up
* to that point. The prefixes and suffixes for the different file states and valid-length files can be configured
* using the adequate setter method, e.g. {@link #setPendingSuffix(String)}.
*
* <p>
* <b>NOTE:</b>
* <ol>
* <li>
* If checkpointing is not enabled the pending files will never be moved to the finished state. In that case,
* the pending suffix/prefix can be set to {@code ""} to make the sink work in a non-fault-tolerant way but
* still provide output without prefixes and suffixes.
* </li>
* <li>
* The part files are written using an instance of {@link Writer}. By default, a
* {@link StringWriter} is used, which writes the result of {@code toString()} for
* every element, separated by newlines. You can configure the writer using the
* {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter}
* can be used to write Hadoop {@code SequenceFiles}.
* </li>
* </ol>
*
* <p>
* Example:
* <pre>{@code
* new BucketingSink<Tuple2<IntWritable, Text>>(outPath)
* .setWriter(new SequenceFileWriter<IntWritable, Text>())
* .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
* }</pre>
*
* This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
*
* @see DateTimeBucketer
* @see StringWriter
* @see SequenceFileWriter
*
* @param <T> Type of the elements emitted by this sink
*/
public class BucketingSink<T>
extends RichSinkFunction<T>
implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener,
CheckpointedRestoring<RollingSink.BucketState>, ProcessingTimeCallback {
private static final long serialVersionUID = 1L;
private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
// --------------------------------------------------------------------------------------------
// User configuration values
// --------------------------------------------------------------------------------------------
// These are initialized with some defaults but are meant to be changeable by the user
/**
* The default maximum size of part files (currently {@code 384 MB}).
*/
private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
/**
* The default time between checks for inactive buckets. By default, {60 sec}.
*/
private final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 1000L;
/**
* The default threshold (in {@code ms}) for marking a bucket as inactive and
* closing its part files. By default, {60 sec}.
*/
private final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L;
/**
* The suffix for {@code in-progress} part files. These are files we are
* currently writing to, but which were not yet confirmed by a checkpoint.
*/
private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
/**
* The prefix for {@code in-progress} part files. These are files we are
* currently writing to, but which were not yet confirmed by a checkpoint.
*/
private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
/**
* The suffix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private final String DEFAULT_PENDING_SUFFIX = ".pending";
/**
* The prefix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private final String DEFAULT_PENDING_PREFIX = "_";
/**
* When {@code truncate()} is not supported by the used {@link FileSystem}, we create
* a file along the part file with this suffix that contains the length up to which
* the part file is valid.
*/
private final String DEFAULT_VALID_SUFFIX = ".valid-length";
/**
* When {@code truncate()} is not supported by the used {@link FileSystem}, we create
* a file along the part file with this preffix that contains the length up to which
* the part file is valid.
*/
private final String DEFAULT_VALID_PREFIX = "_";
/**
* The default prefix for part files.
*/
private final String DEFAULT_PART_REFIX = "part";
/**
* The default timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}).
*/
private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
/**
* The base {@code Path} that stores all bucket directories.
*/
private final String basePath;
/**
* The {@code Bucketer} that is used to determine the path of bucket directories.
*/
private Bucketer<T> bucketer;
/**
* We have a template and call duplicate() for each parallel writer in open() to get the actual
* writer that is used for the part files.
*/
private Writer<T> writerTemplate;
private long batchSize = DEFAULT_BATCH_SIZE;
private long inactiveBucketCheckInterval = DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS;
private long inactiveBucketThreshold = DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS;
// These are the actually configured prefixes/suffixes
private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
private String pendingPrefix = DEFAULT_PENDING_PREFIX;
private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
private String validLengthPrefix= DEFAULT_VALID_PREFIX;
private String partPrefix = DEFAULT_PART_REFIX;
/**
* The timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}).
*/
private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS;
// --------------------------------------------------------------------------------------------
// Internal fields (not configurable by user)
// -------------------------------------------§-------------------------------------------------
/**
* We use reflection to get the .truncate() method, this is only available starting with Hadoop 2.7
*/
private transient Method refTruncate;
/**
* The state object that is handled by Flink from snapshot/restore. This contains state for
* every open bucket: the current in-progress part file path, its valid length and the pending part files.
*/
private transient State<T> state;
private transient ListState<State<T>> restoredBucketStates;
/**
* User-defined FileSystem parameters
*/
private Configuration fsConfig;
/**
* The FileSystem reference.
*/
private transient FileSystem fs;
private transient Clock clock;
private transient ProcessingTimeService processingTimeService;
/**
* Creates a new {@code BucketingSink} that writes files to the given base directory.
*
* <p>
* This uses a{@link DateTimeBucketer} as {@link Bucketer} and a {@link StringWriter} has writer.
* The maximum bucket size is set to 384 MB.
*
* @param basePath The directory to which to write the bucket files.
*/
public BucketingSink(String basePath) {
this.basePath = basePath;
this.bucketer = new DateTimeBucketer<>();
this.writerTemplate = new StringWriter<>();
}
/**
* Specify a custom {@code Configuration} that will be used when creating
* the {@link FileSystem} for writing.
*/
public BucketingSink<T> setFSConfig(Configuration config) {
this.fsConfig = new Configuration();
fsConfig.addAll(config);
return this;
}
/**
* Specify a custom {@code Configuration} that will be used when creating
* the {@link FileSystem} for writing.
*/
public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
this.fsConfig = new Configuration();
for(Map.Entry<String, String> entry : config) {
fsConfig.setString(entry.getKey(), entry.getValue());
}
return this;
}
@Override
@SuppressWarnings("unchecked")
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
if (this.writerTemplate instanceof InputTypeConfigurable) {
((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
}
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getSerializableListState("bucket-states");
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
for (State<T> recoveredState : restoredBucketStates.get()) {
handleRestoredBucketState(recoveredState);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
}
}
} else {
LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
state = new State<>();
processingTimeService =
((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
this.clock = new Clock() {
@Override
public long currentTimeMillis() {
return processingTimeService.getCurrentProcessingTime();
}
};
}
/**
* Create a file system with the user-defined {@code HDFS} configuration.
* @throws IOException
*/
private void initFileSystem() throws IOException {
if (fs != null) {
return;
}
org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
if (fsConfig != null) {
String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme());
hadoopConf.setBoolean(disableCacheName, true);
for (String key : fsConfig.keySet()) {
hadoopConf.set(key, fsConfig.getString(key, null));
}
}
fs = new Path(basePath).getFileSystem(hadoopConf);
}
@Override
public void close() throws Exception {
for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
closeCurrentPartFile(entry.getValue());
}
}
@Override
public void invoke(T value) throws Exception {
Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
BucketState<T> bucketState = state.getBucketState(bucketPath);
if (bucketState == null) {
bucketState = new BucketState<>(currentProcessingTime);
state.addBucketState(bucketPath, bucketState);
}
if (shouldRoll(bucketState)) {
openNewPartFile(bucketPath, bucketState);
}
bucketState.writer.write(value);
bucketState.lastWrittenToTime = currentProcessingTime;
}
/**
* Returns {@code true} if the current {@code part-file} should be closed and a new should be created.
* This happens if:
* <ol>
* <li>no file is created yet for the task to write to, or</li>
* <li>the current file has reached the maximum bucket size.</li>
* </ol>
*/
private boolean shouldRoll(BucketState<T> bucketState) throws IOException {
boolean shouldRoll = false;
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (!bucketState.isWriterOpen) {
shouldRoll = true;
LOG.debug("BucketingSink {} starting new bucket.", subtaskIndex);
} else {
long writePosition = bucketState.writer.getPos();
if (writePosition > batchSize) {
shouldRoll = true;
LOG.debug(
"BucketingSink {} starting new bucket because file position {} is above batch size {}.",
subtaskIndex,
writePosition,
batchSize);
}
}
return shouldRoll;
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
checkForInactiveBuckets(currentProcessingTime);
processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
}
/**
* Checks for inactive buckets, and closes them. Buckets are considered inactive if they have not been
* written to for a period greater than {@code inactiveBucketThreshold} ms. This enables in-progress
* files to be moved to the pending state and be finalised on the next checkpoint.
*/
private void checkForInactiveBuckets(long currentProcessingTime) throws Exception {
synchronized (state.bucketStates) {
for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
if (entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold) {
LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",
getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);
closeCurrentPartFile(entry.getValue());
}
}
}
}
/**
* Closes the current part file and opens a new one with a new bucket path, as returned by the
* {@link Bucketer}. If the bucket is not new, then this will create a new file with the same path
* as its predecessor, but with an increased rolling counter (see {@link BucketingSink}.
*/
private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception {
closeCurrentPartFile(bucketState);
if (!fs.exists(bucketPath)) {
try {
if (fs.mkdirs(bucketPath)) {
LOG.debug("Created new bucket directory: {}", bucketPath);
}
} catch (IOException e) {
throw new RuntimeException("Could not create new bucket path.", e);
}
}
// The following loop tries different partCounter values in ascending order until it reaches the minimum
// that is not yet used. This works since there is only one parallel subtask that tries names with this
// subtask id. Otherwise we would run into concurrency issues here. This is aligned with the way we now
// clean the base directory in case of rescaling.
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
}
// increase, so we don't have to check for this name next time
bucketState.partCounter++;
LOG.debug("Next part path is {}", partPath.toString());
bucketState.currentFile = partPath.toString();
Path inProgressPath = getInProgressPathFor(partPath);
if (bucketState.writer == null) {
bucketState.writer = writerTemplate.duplicate();
}
bucketState.writer.open(fs, inProgressPath);
bucketState.isWriterOpen = true;
}
/**
* Closes the current part file and moves it from the in-progress state to the pending state.
*/
private void closeCurrentPartFile(BucketState<T> bucketState) throws Exception {
if (bucketState.isWriterOpen) {
bucketState.writer.close();
bucketState.isWriterOpen = false;
}
if (bucketState.currentFile != null) {
Path currentPartPath = new Path(bucketState.currentFile);
Path inProgressPath = getInProgressPathFor(currentPartPath);
Path pendingPath = getPendingPathFor(currentPartPath);
fs.rename(inProgressPath, pendingPath);
LOG.debug("Moving in-progress bucket {} to pending file {}",
inProgressPath,
pendingPath);
bucketState.pendingFiles.add(currentPartPath.toString());
bucketState.currentFile = null;
}
}
/**
* Gets the truncate() call using reflection.
* <p>
* <b>NOTE:</b> This code comes from Flume.
*/
private Method reflectTruncate(FileSystem fs) {
Method m = null;
if(fs != null) {
Class<?> fsClass = fs.getClass();
try {
m = fsClass.getMethod("truncate", Path.class, long.class);
} catch (NoSuchMethodException ex) {
LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
" and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
return null;
}
// verify that truncate actually works
FSDataOutputStream outputStream;
Path testPath = new Path(UUID.randomUUID().toString());
try {
outputStream = fs.create(testPath);
outputStream.writeUTF("hello");
outputStream.close();
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
throw new RuntimeException("Could not create file for checking if truncate works.", e);
}
try {
m.invoke(fs, testPath, 2);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.debug("Truncate is not supported.", e);
m = null;
}
try {
fs.delete(testPath, false);
} catch (IOException e) {
LOG.error("Could not delete truncate test file.", e);
throw new RuntimeException("Could not delete truncate test file.", e);
}
}
return m;
}
private Path getPendingPathFor(Path path) {
return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix);
}
private Path getInProgressPathFor(Path path) {
return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix);
}
private Path getValidLengthPathFor(Path path) {
return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
synchronized (state.bucketStates) {
Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = state.bucketStates.entrySet().iterator();
while (bucketStatesIt.hasNext()) {
BucketState<T> bucketState = bucketStatesIt.next().getValue();
synchronized (bucketState.pendingFilesPerCheckpoint) {
Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt =
bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
while (pendingCheckpointsIt.hasNext()) {
Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next();
Long pastCheckpointId = entry.getKey();
List<String> pendingPaths = entry.getValue();
if (pastCheckpointId <= checkpointId) {
LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
for (String filename : pendingPaths) {
Path finalPath = new Path(filename);
Path pendingPath = getPendingPathFor(finalPath);
fs.rename(pendingPath, finalPath);
LOG.debug(
"Moving pending file {} to final location having completed checkpoint {}.",
pendingPath,
pastCheckpointId);
}
pendingCheckpointsIt.remove();
}
}
if (!bucketState.isWriterOpen &&
bucketState.pendingFiles.isEmpty() &&
bucketState.pendingFilesPerCheckpoint.isEmpty()) {
// We've dealt with all the pending files and the writer for this bucket is not currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
bucketStatesIt.remove();
}
}
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
restoredBucketStates.clear();
synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();
if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}
}
private void handleRestoredBucketState(State<T> restoredState) {
Preconditions.checkNotNull(restoredState);
for (BucketState<T> bucketState : restoredState.bucketStates.values()) {
// we can clean all the pending files since they were renamed to
// final files after this checkpoint was successful
// (we re-start from the last **successful** checkpoint)
bucketState.pendingFiles.clear();
handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);
// Now that we've restored the bucket to a valid state, reset the current file info
bucketState.currentFile = null;
bucketState.currentFileValidLength = -1;
bucketState.isWriterOpen = false;
handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.clear();
}
}
}
private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) {
restoredState.pendingFiles.clear();
handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength);
// Now that we've restored the bucket to a valid state, reset the current file info
restoredState.currentFile = null;
restoredState.currentFileValidLength = -1;
handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
synchronized (restoredState.pendingFilesPerCheckpoint) {
restoredState.pendingFilesPerCheckpoint.clear();
}
}
private void handlePendingInProgressFile(String file, long validLength) {
if (file != null) {
// We were writing to a file when the last checkpoint occurred. This file can either
// be still in-progress or became a pending file at some point after the checkpoint.
// Either way, we have to truncate it back to a valid state (or write a .valid-length
// file that specifies up to which length it is valid) and rename it to the final name
// before starting a new bucket file.
Path partPath = new Path(file);
try {
Path partPendingPath = getPendingPathFor(partPath);
Path partInProgressPath = getInProgressPathFor(partPath);
if (fs.exists(partPendingPath)) {
LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
// has been moved to pending in the mean time, rename to final location
fs.rename(partPendingPath, partPath);
} else if (fs.exists(partInProgressPath)) {
LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
// it was still in progress, rename to final path
fs.rename(partInProgressPath, partPath);
} else if (fs.exists(partPath)) {
LOG.debug("In-Progress file {} was already moved to final location {}.", file, partPath);
} else {
LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
"it was moved to final location by a previous snapshot restore", file);
}
// We use reflection to get the .truncate() method, this
// is only available starting with Hadoop 2.7
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
// truncate it or write a ".valid-length" file to specify up to which point it is valid
if (refTruncate != null) {
LOG.debug("Truncating {} to valid length {}", partPath, validLength);
// some-one else might still hold the lease from a previous try, we are
// recovering, after all ...
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
LOG.debug("Trying to recover file lease {}", partPath);
dfs.recoverLease(partPath);
boolean isclosed = dfs.isFileClosed(partPath);
StopWatch sw = new StopWatch();
sw.start();
while (!isclosed) {
if (sw.getTime() > asyncTimeout) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// ignore it
}
isclosed = dfs.isFileClosed(partPath);
}
}
Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, validLength);
if (!truncated) {
LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
// we must wait for the asynchronous truncate operation to complete
StopWatch sw = new StopWatch();
sw.start();
long newLen = fs.getFileStatus(partPath).getLen();
while (newLen != validLength) {
if (sw.getTime() > asyncTimeout) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// ignore it
}
newLen = fs.getFileStatus(partPath).getLen();
}
if (newLen != validLength) {
throw new RuntimeException("Truncate did not truncate to right length. Should be " + validLength + " is " + newLen + ".");
}
}
} else {
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
Path validLengthFilePath = getValidLengthPathFor(partPath);
if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
lengthFileOut.writeUTF(Long.toString(validLength));
lengthFileOut.close();
}
}
} catch (IOException e) {
LOG.error("Error while restoring BucketingSink state.", e);
throw new RuntimeException("Error while restoring BucketingSink state.", e);
} catch (InvocationTargetException | IllegalAccessException e) {
LOG.error("Could not invoke truncate.", e);
throw new RuntimeException("Could not invoke truncate.", e);
}
}
}
private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pendingFilesPerCheckpoint) {
// Move files that are confirmed by a checkpoint but did not get moved to final location
// because the checkpoint notification did not happen before a failure
LOG.debug("Moving pending files to final location on restore.");
Set<Long> pastCheckpointIds = pendingFilesPerCheckpoint.keySet();
for (Long pastCheckpointId : pastCheckpointIds) {
// All the pending files are buckets that have been completed but are waiting to be renamed
// to their final name
for (String filename : pendingFilesPerCheckpoint.get(pastCheckpointId)) {
Path finalPath = new Path(filename);
Path pendingPath = getPendingPathFor(finalPath);
try {
if (fs.exists(pendingPath)) {
LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
fs.rename(pendingPath, finalPath);
}
} catch (IOException e) {
LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
}
}
}
}
// --------------------------------------------------------------------------------------------
// Backwards compatibility with Flink 1.1
// --------------------------------------------------------------------------------------------
@Override
public void restoreState(RollingSink.BucketState state) throws Exception {
LOG.info("{} (taskIdx={}) restored bucket state from the RollingSink an older Flink version: {}",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
throw new RuntimeException("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
}
handleRestoredRollingSinkState(state);
}
// --------------------------------------------------------------------------------------------
// Setters for User configuration values
// --------------------------------------------------------------------------------------------
/**
* Sets the maximum bucket size in bytes.
*
* <p>
* When a bucket part file becomes larger than this size a new bucket part file is started and
* the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
*
* @param batchSize The bucket part file size in bytes.
*/
public BucketingSink<T> setBatchSize(long batchSize) {
this.batchSize = batchSize;
return this;
}
/**
* Sets the default time between checks for inactive buckets.
*
* @param interval The timeout, in milliseconds.
*/
public BucketingSink<T> setInactiveBucketCheckInterval(long interval) {
this.inactiveBucketCheckInterval = interval;
return this;
}
/**
* Sets the default threshold for marking a bucket as inactive and closing its part files.
* Buckets which haven't been written to for at least this period of time become inactive.
*
* @param threshold The timeout, in milliseconds.
*/
public BucketingSink<T> setInactiveBucketThreshold(long threshold) {
this.inactiveBucketThreshold = threshold;
return this;
}
/**
* Sets the {@link Bucketer} to use for determining the bucket files to write to.
*
* @param bucketer The bucketer to use.
*/
public BucketingSink<T> setBucketer(Bucketer<T> bucketer) {
this.bucketer = bucketer;
return this;
}
/**
* Sets the {@link Writer} to be used for writing the incoming elements to bucket files.
*
* @param writer The {@code Writer} to use.
*/
public BucketingSink<T> setWriter(Writer<T> writer) {
this.writerTemplate = writer;
return this;
}
/**
* Sets the suffix of in-progress part files. The default is {@code "in-progress"}.
*/
public BucketingSink<T> setInProgressSuffix(String inProgressSuffix) {
this.inProgressSuffix = inProgressSuffix;
return this;
}
/**
* Sets the prefix of in-progress part files. The default is {@code "_"}.
*/
public BucketingSink<T> setInProgressPrefix(String inProgressPrefix) {
this.inProgressPrefix = inProgressPrefix;
return this;
}
/**
* Sets the suffix of pending part files. The default is {@code ".pending"}.
*/
public BucketingSink<T> setPendingSuffix(String pendingSuffix) {
this.pendingSuffix = pendingSuffix;
return this;
}
/**
* Sets the prefix of pending part files. The default is {@code "_"}.
*/
public BucketingSink<T> setPendingPrefix(String pendingPrefix) {
this.pendingPrefix = pendingPrefix;
return this;
}
/**
* Sets the suffix of valid-length files. The default is {@code ".valid-length"}.
*/
public BucketingSink<T> setValidLengthSuffix(String validLengthSuffix) {
this.validLengthSuffix = validLengthSuffix;
return this;
}
/**
* Sets the prefix of valid-length files. The default is {@code "_"}.
*/
public BucketingSink<T> setValidLengthPrefix(String validLengthPrefix) {
this.validLengthPrefix = validLengthPrefix;