Skip to content

Commit

Permalink
HDFS-8314. Move HdfsServerConstants#IO_FILE_BUFFER_SIZE and SMALL_BUF…
Browse files Browse the repository at this point in the history
…FER_SIZE to the users. Contributed by Li Lu.
  • Loading branch information
Haohui Mai committed May 5, 2015
1 parent 9809a16 commit 4da8490
Show file tree
Hide file tree
Showing 16 changed files with 95 additions and 54 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -513,6 +513,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote
NameNode (Charles Lamb via Colin P. McCabe) NameNode (Charles Lamb via Colin P. McCabe)


HDFS-8314. Move HdfsServerConstants#IO_FILE_BUFFER_SIZE and
SMALL_BUFFER_SIZE to the users. (Li Lu via wheat9)

OPTIMIZATIONS OPTIMIZATIONS


HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -238,6 +238,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
new DFSHedgedReadMetrics(); new DFSHedgedReadMetrics();
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
private final Sampler<?> traceSampler; private final Sampler<?> traceSampler;
private final int smallBufferSize;


public DfsClientConf getConf() { public DfsClientConf getConf() {
return dfsClientConf; return dfsClientConf;
Expand Down Expand Up @@ -309,6 +310,7 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
this.stats = stats; this.stats = stats;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);


this.ugi = UserGroupInformation.getCurrentUser(); this.ugi = UserGroupInformation.getCurrentUser();


Expand Down Expand Up @@ -1902,7 +1904,7 @@ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
//connect to a datanode //connect to a datanode
IOStreamPair pair = connectToDN(datanodes[j], timeout, lb); IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
out = new DataOutputStream(new BufferedOutputStream(pair.out, out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsServerConstants.SMALL_BUFFER_SIZE)); smallBufferSize));
in = new DataInputStream(pair.in); in = new DataInputStream(pair.in);


if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2067,7 +2069,7 @@ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)


try { try {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsServerConstants.SMALL_BUFFER_SIZE)); smallBufferSize));
DataInputStream in = new DataInputStream(pair.in); DataInputStream in = new DataInputStream(pair.in);


new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
Expand Down
Expand Up @@ -70,6 +70,7 @@
import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
Expand Down Expand Up @@ -1514,4 +1515,14 @@ public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
.createKeyProviderCryptoExtension(keyProvider); .createKeyProviderCryptoExtension(keyProvider);
return cryptoProvider; return cryptoProvider;
} }

public static int getIoFileBufferSize(Configuration conf) {
return conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
}

public static int getSmallBufferSize(Configuration conf) {
return Math.min(getIoFileBufferSize(conf) / 2, 512);
}
} }
Expand Up @@ -71,7 +71,6 @@
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.hdfs.util.ByteArrayManager;
Expand All @@ -92,7 +91,6 @@
import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;


import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -123,6 +121,7 @@
@InterfaceAudience.Private @InterfaceAudience.Private
class DataStreamer extends Daemon { class DataStreamer extends Daemon {
static final Log LOG = LogFactory.getLog(DataStreamer.class); static final Log LOG = LogFactory.getLog(DataStreamer.class);

/** /**
* Create a socket for a write pipeline * Create a socket for a write pipeline
* *
Expand Down Expand Up @@ -1145,7 +1144,7 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
unbufOut = saslStreams.out; unbufOut = saslStreams.out;
unbufIn = saslStreams.in; unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsServerConstants.SMALL_BUFFER_SIZE)); DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
in = new DataInputStream(unbufIn); in = new DataInputStream(unbufIn);


//send the TRANSFER_BLOCK request //send the TRANSFER_BLOCK request
Expand Down Expand Up @@ -1425,7 +1424,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
unbufOut = saslStreams.out; unbufOut = saslStreams.out;
unbufIn = saslStreams.in; unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsServerConstants.SMALL_BUFFER_SIZE)); DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
blockReplyStream = new DataInputStream(unbufIn); blockReplyStream = new DataInputStream(unbufIn);


// //
Expand Down
Expand Up @@ -118,6 +118,8 @@ public class Dispatcher {
/** The maximum number of concurrent blocks moves at a datanode */ /** The maximum number of concurrent blocks moves at a datanode */
private final int maxConcurrentMovesPerNode; private final int maxConcurrentMovesPerNode;


private final int ioFileBufferSize;

private static class GlobalBlockMap { private static class GlobalBlockMap {
private final Map<Block, DBlock> map = new HashMap<Block, DBlock>(); private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();


Expand Down Expand Up @@ -308,9 +310,9 @@ private void dispatch() {
unbufOut = saslStreams.out; unbufOut = saslStreams.out;
unbufIn = saslStreams.in; unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsServerConstants.IO_FILE_BUFFER_SIZE)); ioFileBufferSize));
in = new DataInputStream(new BufferedInputStream(unbufIn, in = new DataInputStream(new BufferedInputStream(unbufIn,
HdfsServerConstants.IO_FILE_BUFFER_SIZE)); ioFileBufferSize));


sendRequest(out, eb, accessToken); sendRequest(out, eb, accessToken);
receiveResponse(in); receiveResponse(in);
Expand Down Expand Up @@ -801,6 +803,7 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
this.saslClient = new SaslDataTransferClient(conf, this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf), DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf);
} }


public DistributedFileSystem getDistributedFileSystem() { public DistributedFileSystem getDistributedFileSystem() {
Expand Down
Expand Up @@ -24,9 +24,7 @@
import java.util.regex.Pattern; import java.util.regex.Pattern;


import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion; import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
Expand Down Expand Up @@ -56,12 +54,6 @@ public interface HdfsServerConstants {
// to 1k. // to 1k.
int MAX_PATH_LENGTH = 8000; int MAX_PATH_LENGTH = 8000;
int MAX_PATH_DEPTH = 1000; int MAX_PATH_DEPTH = 1000;
int IO_FILE_BUFFER_SIZE = new HdfsConfiguration().getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
// Used for writing header etc.
int SMALL_BUFFER_SIZE = Math.min(IO_FILE_BUFFER_SIZE / 2,
512);
// An invalid transaction ID that will never be seen in a real namesystem. // An invalid transaction ID that will never be seen in a real namesystem.
long INVALID_TXID = -12345; long INVALID_TXID = -12345;
// Number of generation stamps reserved for legacy blocks. // Number of generation stamps reserved for legacy blocks.
Expand Down
Expand Up @@ -33,7 +33,8 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;


Expand All @@ -60,6 +61,8 @@ public class BlockMetadataHeader {
*/ */
private final short version; private final short version;
private DataChecksum checksum = null; private DataChecksum checksum = null;

private static final HdfsConfiguration conf = new HdfsConfiguration();


@VisibleForTesting @VisibleForTesting
public BlockMetadataHeader(short version, DataChecksum checksum) { public BlockMetadataHeader(short version, DataChecksum checksum) {
Expand All @@ -85,7 +88,7 @@ public static DataChecksum readDataChecksum(File metaFile) throws IOException {
DataInputStream in = null; DataInputStream in = null;
try { try {
in = new DataInputStream(new BufferedInputStream( in = new DataInputStream(new BufferedInputStream(
new FileInputStream(metaFile), HdfsServerConstants.IO_FILE_BUFFER_SIZE)); new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf)));
return readDataChecksum(in, metaFile); return readDataChecksum(in, metaFile);
} finally { } finally {
IOUtils.closeStream(in); IOUtils.closeStream(in);
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
Expand All @@ -47,7 +48,6 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
Expand Down Expand Up @@ -246,7 +246,8 @@ class BlockReceiver implements Closeable {
out.getClass()); out.getClass());
} }
this.checksumOut = new DataOutputStream(new BufferedOutputStream( this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.getChecksumOut(), HdfsServerConstants.SMALL_BUFFER_SIZE)); streams.getChecksumOut(), DFSUtil.getSmallBufferSize(
datanode.getConf())));
// write data chunk header if creating a new replica // write data chunk header if creating a new replica
if (isCreate) { if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum); BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
Expand Down
Expand Up @@ -34,9 +34,10 @@


import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
Expand Down Expand Up @@ -104,8 +105,13 @@ class BlockSender implements java.io.Closeable {
* not sure if there will be much more improvement. * not sure if there will be much more improvement.
*/ */
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024; private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
private static final int IO_FILE_BUFFER_SIZE;
static {
HdfsConfiguration conf = new HdfsConfiguration();
IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf);
}
private static final int TRANSFERTO_BUFFER_SIZE = Math.max( private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
HdfsServerConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO); IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);


/** the block to read from */ /** the block to read from */
private final ExtendedBlock block; private final ExtendedBlock block;
Expand Down Expand Up @@ -298,7 +304,7 @@ class BlockSender implements java.io.Closeable {
// storage and computes the checksum. // storage and computes the checksum.
if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) { if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) {
checksumIn = new DataInputStream(new BufferedInputStream( checksumIn = new DataInputStream(new BufferedInputStream(
metaIn, HdfsServerConstants.IO_FILE_BUFFER_SIZE)); metaIn, IO_FILE_BUFFER_SIZE));


csum = BlockMetadataHeader.readDataChecksum(checksumIn, block); csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
keepMetaInOpen = true; keepMetaInOpen = true;
Expand Down Expand Up @@ -747,7 +753,7 @@ private long doSendBlock(DataOutputStream out, OutputStream baseStream,
pktBufSize += checksumSize * maxChunksPerPacket; pktBufSize += checksumSize * maxChunksPerPacket;
} else { } else {
maxChunksPerPacket = Math.max(1, maxChunksPerPacket = Math.max(1,
numberOfChunks(HdfsServerConstants.IO_FILE_BUFFER_SIZE)); numberOfChunks(IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum and data // Packet size includes both checksum and data
pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket; pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
} }
Expand Down
Expand Up @@ -2156,7 +2156,7 @@ public void run() {
unbufIn = saslStreams.in; unbufIn = saslStreams.in;


out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsServerConstants.SMALL_BUFFER_SIZE)); DFSUtil.getSmallBufferSize(conf)));
in = new DataInputStream(unbufIn); in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(), blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, true, DataNode.this, null, cachingStrategy); false, false, true, DataNode.this, null, cachingStrategy);
Expand Down
Expand Up @@ -48,7 +48,9 @@


import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
Expand Down Expand Up @@ -109,7 +111,9 @@ class DataXceiver extends Receiver implements Runnable {
private final InputStream socketIn; private final InputStream socketIn;
private OutputStream socketOut; private OutputStream socketOut;
private BlockReceiver blockReceiver = null; private BlockReceiver blockReceiver = null;

private final int ioFileBufferSize;
private final int smallBufferSize;

/** /**
* Client Name used in previous operation. Not available on first request * Client Name used in previous operation. Not available on first request
* on the socket. * on the socket.
Expand All @@ -131,6 +135,8 @@ private DataXceiver(Peer peer, DataNode datanode,
this.datanode = datanode; this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer; this.dataXceiverServer = dataXceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(datanode.getConf());
this.smallBufferSize = DFSUtil.getSmallBufferSize(datanode.getConf());
remoteAddress = peer.getRemoteAddressString(); remoteAddress = peer.getRemoteAddressString();
final int colonIdx = remoteAddress.indexOf(':'); final int colonIdx = remoteAddress.indexOf(':');
remoteAddressWithoutPort = remoteAddressWithoutPort =
Expand Down Expand Up @@ -191,7 +197,7 @@ public void run() {
socketIn, datanode.getXferAddress().getPort(), socketIn, datanode.getXferAddress().getPort(),
datanode.getDatanodeId()); datanode.getDatanodeId());
input = new BufferedInputStream(saslStreams.in, input = new BufferedInputStream(saslStreams.in,
HdfsServerConstants.SMALL_BUFFER_SIZE); smallBufferSize);
socketOut = saslStreams.out; socketOut = saslStreams.out;
} catch (InvalidMagicNumberException imne) { } catch (InvalidMagicNumberException imne) {
if (imne.isHandshake4Encryption()) { if (imne.isHandshake4Encryption()) {
Expand Down Expand Up @@ -514,7 +520,7 @@ public void readBlock(final ExtendedBlock block,
long read = 0; long read = 0;
OutputStream baseStream = getOutputStream(); OutputStream baseStream = getOutputStream();
DataOutputStream out = new DataOutputStream(new BufferedOutputStream( DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsServerConstants.SMALL_BUFFER_SIZE)); baseStream, smallBufferSize));
checkAccess(out, true, block, blockToken, checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ); Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);


Expand Down Expand Up @@ -658,7 +664,7 @@ public void writeBlock(final ExtendedBlock block,
final DataOutputStream replyOut = new DataOutputStream( final DataOutputStream replyOut = new DataOutputStream(
new BufferedOutputStream( new BufferedOutputStream(
getOutputStream(), getOutputStream(),
HdfsServerConstants.SMALL_BUFFER_SIZE)); smallBufferSize));
checkAccess(replyOut, isClient, block, blockToken, checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE); Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);


Expand Down Expand Up @@ -717,7 +723,7 @@ public void writeBlock(final ExtendedBlock block,
unbufMirrorOut = saslStreams.out; unbufMirrorOut = saslStreams.out;
unbufMirrorIn = saslStreams.in; unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsServerConstants.SMALL_BUFFER_SIZE)); smallBufferSize));
mirrorIn = new DataInputStream(unbufMirrorIn); mirrorIn = new DataInputStream(unbufMirrorIn);


// Do not propagate allowLazyPersist to downstream DataNodes. // Do not propagate allowLazyPersist to downstream DataNodes.
Expand Down Expand Up @@ -932,7 +938,7 @@ public void blockChecksum(final ExtendedBlock block,
.getMetaDataInputStream(block); .getMetaDataInputStream(block);


final DataInputStream checksumIn = new DataInputStream( final DataInputStream checksumIn = new DataInputStream(
new BufferedInputStream(metadataIn, HdfsServerConstants.IO_FILE_BUFFER_SIZE)); new BufferedInputStream(metadataIn, ioFileBufferSize));
updateCurrentThreadName("Getting checksum for block " + block); updateCurrentThreadName("Getting checksum for block " + block);
try { try {
//read metadata file //read metadata file
Expand Down Expand Up @@ -1024,7 +1030,7 @@ public void copyBlock(final ExtendedBlock block,
// set up response stream // set up response stream
OutputStream baseStream = getOutputStream(); OutputStream baseStream = getOutputStream();
reply = new DataOutputStream(new BufferedOutputStream( reply = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsServerConstants.SMALL_BUFFER_SIZE)); baseStream, smallBufferSize));


// send status first // send status first
writeSuccessWithChecksumInfo(blockSender, reply); writeSuccessWithChecksumInfo(blockSender, reply);
Expand Down Expand Up @@ -1131,10 +1137,10 @@ public void replaceBlock(final ExtendedBlock block,
unbufProxyOut = saslStreams.out; unbufProxyOut = saslStreams.out;
unbufProxyIn = saslStreams.in; unbufProxyIn = saslStreams.in;


proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
HdfsServerConstants.SMALL_BUFFER_SIZE)); smallBufferSize));
proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn, proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
HdfsServerConstants.IO_FILE_BUFFER_SIZE)); ioFileBufferSize));


/* send request to the proxy */ /* send request to the proxy */
IoeDuringCopyBlockOperation = true; IoeDuringCopyBlockOperation = true;
Expand Down

0 comments on commit 4da8490

Please sign in to comment.