/
DataStreamer.java
2163 lines (1957 loc) · 73.4 KB
/
DataStreamer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.tracing.Span;
import org.apache.hadoop.tracing.SpanContext;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalListener;
import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
/*********************************************************************
*
* The DataStreamer class is responsible for sending data packets to the
* datanodes in the pipeline. It retrieves a new blockid and block locations
* from the namenode, and starts streaming packets to the pipeline of
* Datanodes. Every packet has a sequence number associated with
* it. When all the packets for a block are sent out and acks for each
* if them are received, the DataStreamer closes the current block.
*
* The DataStreamer thread picks up packets from the dataQueue, sends it to
* the first datanode in the pipeline and moves it from the dataQueue to the
* ackQueue. The ResponseProcessor receives acks from the datanodes. When an
* successful ack for a packet is received from all datanodes, the
* ResponseProcessor removes the corresponding packet from the ackQueue.
*
* In case of error, all outstanding packets are moved from ackQueue. A new
* pipeline is setup by eliminating the bad datanode from the original
* pipeline. The DataStreamer now starts sending packets from the dataQueue.
*
*********************************************************************/
@InterfaceAudience.Private
class DataStreamer extends Daemon {
static final Logger LOG = LoggerFactory.getLogger(DataStreamer.class);
private class RefetchEncryptionKeyPolicy {
private int fetchEncryptionKeyTimes = 0;
private InvalidEncryptionKeyException lastException;
private final DatanodeInfo src;
RefetchEncryptionKeyPolicy(DatanodeInfo src) {
this.src = src;
}
boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException {
if (fetchEncryptionKeyTimes >= 2) {
// hit the same exception twice connecting to the node, so
// throw the exception and exclude the node.
throw lastException;
}
// Don't exclude this node just yet.
// Try again with a new encryption key.
LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to "
+ this.src + ": ", lastException);
// The encryption key used is invalid.
dfsClient.clearDataEncryptionKey();
return true;
}
/**
* Record a connection exception.
*/
void recordFailure(final InvalidEncryptionKeyException e)
throws InvalidEncryptionKeyException {
fetchEncryptionKeyTimes++;
lastException = e;
}
}
private class StreamerStreams implements java.io.Closeable {
private Socket sock = null;
private DataOutputStream out = null;
private DataInputStream in = null;
StreamerStreams(final DatanodeInfo src,
final long writeTimeout, final long readTimeout,
final Token<BlockTokenIdentifier> blockToken)
throws IOException {
sock = createSocketForPipeline(src, 2, dfsClient);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
IOStreamPair saslStreams = dfsClient.saslClient
.socketSend(sock, unbufOut, unbufIn, dfsClient, blockToken, src);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
in = new DataInputStream(unbufIn);
}
void sendTransferBlock(final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final String[] targetStorageIDs,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
//send the TRANSFER_BLOCK request
new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
dfsClient.clientName, targets, targetStorageTypes,
targetStorageIDs);
out.flush();
//ack
BlockOpResponseProto transferResponse = BlockOpResponseProto
.parseFrom(PBHelperClient.vintPrefixed(in));
if (SUCCESS != transferResponse.getStatus()) {
throw new IOException("Failed to add a datanode. Response status: "
+ transferResponse.getStatus());
}
}
@Override
public void close() throws IOException {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
IOUtils.closeSocket(sock);
}
}
static class BlockToWrite {
private ExtendedBlock currentBlock;
BlockToWrite(ExtendedBlock block) {
setCurrentBlock(block);
}
synchronized ExtendedBlock getCurrentBlock() {
return currentBlock == null ? null : new ExtendedBlock(currentBlock);
}
synchronized long getNumBytes() {
return currentBlock == null ? 0 : currentBlock.getNumBytes();
}
synchronized void setCurrentBlock(ExtendedBlock block) {
currentBlock = (block == null || block.getLocalBlock() == null) ?
null : new ExtendedBlock(block);
}
synchronized void setNumBytes(long numBytes) {
assert currentBlock != null;
currentBlock.setNumBytes(numBytes);
}
synchronized void setGenerationStamp(long generationStamp) {
assert currentBlock != null;
currentBlock.setGenerationStamp(generationStamp);
}
@Override
public synchronized String toString() {
return currentBlock == null ? "null" : currentBlock.toString();
}
}
/**
* Create a socket for a write pipeline
*
* @param first the first datanode
* @param length the pipeline length
* @param client client
* @return the socket connected to the first datanode
*/
static Socket createSocketForPipeline(final DatanodeInfo first,
final int length, final DFSClient client) throws IOException {
final DfsClientConf conf = client.getConf();
final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
LOG.debug("Connecting to datanode {}", dnAddr);
final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
final Socket sock = client.socketFactory.createSocket();
final int timeout = client.getDatanodeReadTimeout(length);
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(),
conf.getSocketTimeout());
sock.setTcpNoDelay(conf.getDataTransferTcpNoDelay());
sock.setSoTimeout(timeout);
sock.setKeepAlive(true);
if (conf.getSocketSendBufferSize() > 0) {
sock.setSendBufferSize(conf.getSocketSendBufferSize());
}
LOG.debug("Send buf size {}", sock.getSendBufferSize());
return sock;
}
/**
* if this file is lazy persist
*
* @param stat the HdfsFileStatus of a file
* @return if this file is lazy persist
*/
static boolean isLazyPersist(HdfsFileStatus stat) {
return stat.getStoragePolicy() == HdfsConstants.MEMORY_STORAGE_POLICY_ID;
}
/**
* release a list of packets to ByteArrayManager
*
* @param packets packets to be release
* @param bam ByteArrayManager
*/
private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
for(DFSPacket p : packets) {
p.releaseBuffer(bam);
}
packets.clear();
}
class LastExceptionInStreamer extends ExceptionLastSeen {
/**
* Check if there already is an exception.
*/
@Override
synchronized void check(boolean resetToNull) throws IOException {
final IOException thrown = get();
if (thrown != null) {
if (LOG.isTraceEnabled()) {
// wrap and print the exception to know when the check is called
LOG.trace("Got Exception while checking, " + DataStreamer.this,
new Throwable(thrown));
}
super.check(resetToNull);
}
}
}
enum ErrorType {
NONE, INTERNAL, EXTERNAL
}
static class ErrorState {
ErrorType error = ErrorType.NONE;
private int badNodeIndex = -1;
private boolean waitForRestart = true;
private int restartingNodeIndex = -1;
private long restartingNodeDeadline = 0;
private final long datanodeRestartTimeout;
ErrorState(long datanodeRestartTimeout) {
this.datanodeRestartTimeout = datanodeRestartTimeout;
}
synchronized void resetInternalError() {
if (hasInternalError()) {
error = ErrorType.NONE;
}
badNodeIndex = -1;
restartingNodeIndex = -1;
restartingNodeDeadline = 0;
waitForRestart = true;
}
synchronized void reset() {
error = ErrorType.NONE;
badNodeIndex = -1;
restartingNodeIndex = -1;
restartingNodeDeadline = 0;
waitForRestart = true;
}
synchronized boolean hasInternalError() {
return error == ErrorType.INTERNAL;
}
synchronized boolean hasExternalError() {
return error == ErrorType.EXTERNAL;
}
synchronized boolean hasError() {
return error != ErrorType.NONE;
}
synchronized boolean hasDatanodeError() {
return error == ErrorType.INTERNAL && isNodeMarked();
}
synchronized void setInternalError() {
this.error = ErrorType.INTERNAL;
}
synchronized void setExternalError() {
if (!hasInternalError()) {
this.error = ErrorType.EXTERNAL;
}
}
synchronized void setBadNodeIndex(int index) {
this.badNodeIndex = index;
}
synchronized int getBadNodeIndex() {
return badNodeIndex;
}
synchronized int getRestartingNodeIndex() {
return restartingNodeIndex;
}
synchronized void initRestartingNode(int i, String message,
boolean shouldWait) {
restartingNodeIndex = i;
if (shouldWait) {
restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
// If the data streamer has already set the primary node
// bad, clear it. It is likely that the write failed due to
// the DN shutdown. Even if it was a real failure, the pipeline
// recovery will take care of it.
badNodeIndex = -1;
} else {
this.waitForRestart = false;
}
LOG.info(message);
}
synchronized boolean isRestartingNode() {
return restartingNodeIndex >= 0;
}
synchronized boolean isNodeMarked() {
return badNodeIndex >= 0 || (isRestartingNode() && doWaitForRestart());
}
/**
* This method is used when no explicit error report was received, but
* something failed. The first node is a suspect or unsure about the cause
* so that it is marked as failed.
*/
synchronized void markFirstNodeIfNotMarked() {
// There should be no existing error and no ongoing restart.
if (!isNodeMarked()) {
badNodeIndex = 0;
}
}
synchronized void adjustState4RestartingNode() {
// Just took care of a node error while waiting for a node restart
if (restartingNodeIndex >= 0) {
// If the error came from a node further away than the restarting
// node, the restart must have been complete.
if (badNodeIndex > restartingNodeIndex) {
restartingNodeIndex = -1;
} else if (badNodeIndex < restartingNodeIndex) {
// the node index has shifted.
restartingNodeIndex--;
} else if (waitForRestart) {
throw new IllegalStateException("badNodeIndex = " + badNodeIndex
+ " = restartingNodeIndex = " + restartingNodeIndex);
}
}
if (!isRestartingNode()) {
error = ErrorType.NONE;
}
badNodeIndex = -1;
}
synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
if (restartingNodeIndex >= 0) {
if (error == ErrorType.NONE) {
throw new IllegalStateException("error=false while checking" +
" restarting node deadline");
}
// check badNodeIndex
if (badNodeIndex == restartingNodeIndex) {
// ignore, if came from the restarting node
badNodeIndex = -1;
}
// not within the deadline
if (Time.monotonicNow() >= restartingNodeDeadline) {
// expired. declare the restarting node dead
restartingNodeDeadline = 0;
final int i = restartingNodeIndex;
restartingNodeIndex = -1;
LOG.warn("Datanode " + i + " did not restart within "
+ datanodeRestartTimeout + "ms: " + nodes[i]);
// Mark the restarting node as failed. If there is any other failed
// node during the last pipeline construction attempt, it will not be
// overwritten/dropped. In this case, the restarting node will get
// excluded in the following attempt, if it still does not come up.
if (badNodeIndex == -1) {
badNodeIndex = i;
}
}
}
}
boolean doWaitForRestart() {
return waitForRestart;
}
}
private volatile boolean streamerClosed = false;
protected final BlockToWrite block; // its length is number of bytes acked
protected Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
private volatile StorageType[] storageTypes = null;
private volatile String[] storageIDs = null;
private final ErrorState errorState;
private volatile BlockConstructionStage stage; // block construction stage
protected long bytesSent = 0; // number of bytes that've been sent
private final boolean isLazyPersistFile;
private long lastPacket;
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<>();
/** Restarting Nodes */
private List<DatanodeInfo> restartingNodes = new ArrayList<>();
/** The times have retried to recover pipeline, for the same packet. */
private volatile int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */
private boolean isHflushed = false;
/** Append on an existing block? */
private final boolean isAppend;
private long currentSeqno = 0;
private long lastQueuedSeqno = -1;
private long lastAckedSeqno = -1;
private long bytesCurBlock = 0; // bytes written in current block
private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
private Socket s;
protected final DFSClient dfsClient;
protected final String src;
/** Only for DataTransferProtocol.writeBlock(..) */
final DataChecksum checksum4WriteBlock;
final Progressable progress;
protected final HdfsFileStatus stat;
// appending to existing partial block
private volatile boolean appendChunk = false;
// both dataQueue and ackQueue are protected by dataQueue lock
protected final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
private final Map<Long, Long> packetSendTime = new HashMap<>();
private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
private final AtomicReference<CachingStrategy> cachingStrategy;
private final ByteArrayManager byteArrayManager;
//persist blocks on namenode
private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
private boolean failPacket = false;
private final long dfsclientSlowLogThresholdMs;
private long artificialSlowdown = 0;
// List of congested data nodes. The stream will back off if the DataNodes
// are congested
private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
private int lastCongestionBackoffTime;
private int maxPipelineRecoveryRetries;
protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
private final String[] favoredNodes;
private final EnumSet<AddBlockFlag> addBlockFlags;
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage,
boolean isAppend, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) {
this.block = new BlockToWrite(block);
this.dfsClient = dfsClient;
this.src = src;
this.progress = progress;
this.stat = stat;
this.checksum4WriteBlock = checksum;
this.cachingStrategy = cachingStrategy;
this.byteArrayManager = byteArrayManage;
this.isLazyPersistFile = isLazyPersist(stat);
this.isAppend = isAppend;
this.favoredNodes = favoredNodes;
final DfsClientConf conf = dfsClient.getConf();
this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
this.addBlockFlags = flags;
this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries();
}
/**
* construction with tracing info
*/
DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
String src, Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) {
this(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, false, favoredNodes, flags);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
/**
* Construct a data streamer for appending to the last partial block
* @param lastBlock last block of the file to be appended
* @param stat status of the file to be appended
*/
DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient,
String src, Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage) {
this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, true, null, null);
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
}
/**
* Set pipeline in construction
*
* @param lastBlock the last block of a file
* @throws IOException
*/
void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{
// setup pipeline to append to the last block XXX retries??
setPipeline(lastBlock);
if (nodes.length < 1) {
throw new IOException("Unable to retrieve blocks locations " +
" for last block " + block + " of file " + src);
}
}
void setAccessToken(Token<BlockTokenIdentifier> t) {
this.accessToken = t;
}
private void setPipeline(LocatedBlock lb) {
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
}
private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
String[] storageIDs) {
this.nodes = nodes;
this.storageTypes = storageTypes;
this.storageIDs = storageIDs;
}
/**
* Initialize for data streaming
*/
private void initDataStreaming() {
this.setName("DataStreamer for file " + src +
" block " + block);
if (LOG.isDebugEnabled()) {
LOG.debug("nodes {} storageTypes {} storageIDs {}",
Arrays.toString(nodes),
Arrays.toString(storageTypes),
Arrays.toString(storageIDs));
}
response = new ResponseProcessor(nodes);
response.start();
stage = BlockConstructionStage.DATA_STREAMING;
lastPacket = Time.monotonicNow();
}
protected void endBlock() {
LOG.debug("Closing old block {}", block);
this.setName("DataStreamer for file " + src);
closeResponder();
closeStream();
setPipeline(null, null, null);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
private boolean shouldStop() {
return streamerClosed || errorState.hasError() || !dfsClient.clientRunning;
}
/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
*/
@Override
public void run() {
TraceScope scope = null;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (errorState.hasError()) {
closeResponder();
}
DFSPacket one;
try {
// process datanode IO errors if any
boolean doSleep = processDatanodeOrExternalError();
synchronized (dataQueue) {
// wait for a packet to be sent.
while ((!shouldStop() && dataQueue.isEmpty()) || doSleep) {
long timeout = 1000;
if (stage == BlockConstructionStage.DATA_STREAMING) {
timeout = sendHeartbeat();
}
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
doSleep = false;
}
if (shouldStop()) {
continue;
}
// get packet to be sent.
try {
backOffIfNecessary();
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
one = dataQueue.getFirst(); // regular data packet
SpanContext[] parents = one.getTraceParents();
if (parents != null && parents.length > 0) {
// The original code stored multiple parents in the DFSPacket, and
// use them ALL here when creating a new Span. We only use the
// last one FOR NOW. Moreover, we don't activate the Span for now.
scope = dfsClient.getTracer().
newScope("dataStreamer", parents[0], false);
//scope.getSpan().setParents(parents);
}
}
// get new block from namenode.
LOG.debug("stage={}, {}", stage, this);
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: {}", this);
setPipeline(nextBlockOutputStream());
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
setupPipelineForAppendOrRecovery();
if (streamerClosed) {
continue;
}
initDataStreaming();
}
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > stat.getBlockSize()) {
throw new IOException("BlockSize " + stat.getBlockSize() +
" < lastByteOffsetInBlock, " + this + ", " + one);
}
if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
waitForAllAcks();
if(shouldStop()) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}
// send the packet
SpanContext spanContext = null;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
if (scope != null) {
one.setSpan(scope.span());
spanContext = scope.span().getContext();
scope.close();
}
scope = null;
dataQueue.removeFirst();
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(), Time.monotonicNow());
dataQueue.notifyAll();
}
}
LOG.debug("{} sending {}", this, one);
// write out data to remote datanode
try (TraceScope ignored = dfsClient.getTracer().
newScope("DataStreamer#writeTo", spanContext)) {
sendPacket(one);
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
errorState.markFirstNodeIfNotMarked();
throw e;
}
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
if (shouldStop()) {
continue;
}
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
try {
waitForAllAcks();
} catch (IOException ioe) {
// No need to do a close recovery if the last packet was acked.
// i.e. ackQueue is empty. waitForAllAcks() can get an exception
// (e.g. connection reset) while sending a heartbeat packet,
// if the DN sends the final ack and closes the connection.
synchronized (dataQueue) {
if (!ackQueue.isEmpty()) {
throw ioe;
}
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
if (progress != null) { progress.progress(); }
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
// Log warning if there was a real error.
if (!errorState.isRestartingNode()) {
// Since their messages are descriptive enough, do not always
// log a verbose stack-trace WARN for quota exceptions.
if (e instanceof QuotaExceededException) {
LOG.debug("DataStreamer Quota Exception", e);
} else {
LOG.warn("DataStreamer Exception", e);
}
}
lastException.set(e);
assert !(e instanceof NullPointerException);
errorState.setInternalError();
if (!errorState.isNodeMarked()) {
// Not a datanode issue
streamerClosed = true;
}
} finally {
if (scope != null) {
scope.close();
scope = null;
}
}
}
closeInternal();
}
private void waitForAllAcks() throws IOException {
// wait until all data packets have been successfully acked
synchronized (dataQueue) {
while (!shouldStop() && !ackQueue.isEmpty()) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(sendHeartbeat());
} catch (InterruptedException e) {
LOG.debug("Thread interrupted ", e);
}
}
}
}
private void sendPacket(DFSPacket packet) throws IOException {
// write out data to remote datanode
try {
packet.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
errorState.markFirstNodeIfNotMarked();
throw e;
}
lastPacket = Time.monotonicNow();
}
private long sendHeartbeat() throws IOException {
final long heartbeatInterval = dfsClient.getConf().getSocketTimeout()/2;
long timeout = heartbeatInterval - (Time.monotonicNow() - lastPacket);
if (timeout <= 0) {
sendPacket(createHeartbeatPacket());
timeout = heartbeatInterval;
}
return timeout;
}
private void closeInternal() {
closeResponder(); // close and join
closeStream();
streamerClosed = true;
release();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
}
/**
* release the DFSPackets in the two queues
*
*/
void release() {
synchronized (dataQueue) {
releaseBuffer(dataQueue, byteArrayManager);
releaseBuffer(ackQueue, byteArrayManager);
}
}
/**
* wait for the ack of seqno
*
* @param seqno the sequence number to be acked
* @throws IOException
*/
void waitForAckedSeqno(long seqno) throws IOException {
try (TraceScope ignored = dfsClient.getTracer().
newScope("waitForAckedSeqno")) {
LOG.debug("{} waiting for ack for: {}", this, seqno);
int dnodes = nodes != null ? nodes.length : 3;
int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes);
long begin = Time.monotonicNow();
try {
synchronized (dataQueue) {
while (!streamerClosed) {
checkClosed();
if (lastAckedSeqno >= seqno) {
break;
}
try {
dataQueue.wait(1000); // when we receive an ack, we notify on
long duration = Time.monotonicNow() - begin;
if (duration > writeTimeout) {
LOG.error("No ack received, took {}ms (threshold={}ms). "
+ "File being written: {}, block: {}, "
+ "Write pipeline datanodes: {}.",
duration, writeTimeout, src, block, nodes);
throw new InterruptedIOException("No ack received after " +
duration / 1000 + "s and a timeout of " +
writeTimeout / 1000 + "s");
}
// dataQueue
} catch (InterruptedException ie) {
throw new InterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
}
}
}
checkClosed();
} catch (ClosedChannelException cce) {
LOG.debug("Closed channel exception", cce);
}
long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs) {
LOG.warn("Slow waitForAckedSeqno took {}ms (threshold={}ms). File being"
+ " written: {}, block: {}, Write pipeline datanodes: {}.",
duration, dfsclientSlowLogThresholdMs, src, block, nodes);
}
}
}
/**
* wait for space of dataQueue and queue the packet
*
* @param packet the DFSPacket to be queued
* @throws IOException
*/
void waitAndQueuePacket(DFSPacket packet) throws IOException {
synchronized (dataQueue) {
try {
// If queue is full, then wait till we have enough space
boolean firstWait = true;
try {
while (!streamerClosed && dataQueue.size() + ackQueue.size() >
dfsClient.getConf().getWriteMaxPackets()) {
if (firstWait) {
Span span = Tracer.getCurrentSpan();
if (span != null) {
span.addTimelineAnnotation("dataQueue.wait");
}
firstWait = false;
}
try {
dataQueue.wait();
} catch (InterruptedException e) {
// If we get interrupted while waiting to queue data, we still need to get rid
// of the current packet. This is because we have an invariant that if
// currentPacket gets full, it will get queued before the next writeChunk.
//
// Rather than wait around for space in the queue, we should instead try to
// return to the caller as soon as possible, even though we slightly overrun
// the MAX_PACKETS length.
Thread.currentThread().interrupt();
break;
}
}
} finally {
Span span = Tracer.getCurrentSpan();
if ((span != null) && (!firstWait)) {
span.addTimelineAnnotation("end.wait");
}
}
checkClosed();
queuePacket(packet);
} catch (ClosedChannelException cce) {
LOG.debug("Closed channel exception", cce);
}
}