Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-17272. ABFS Streams to support IOStatistics API #2353

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -46,9 +46,13 @@
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;

/**
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
Expand All @@ -66,7 +70,7 @@ public class CryptoInputStream extends FilterInputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
ReadableByteChannel, CanUnbuffer, StreamCapabilities,
ByteBufferPositionedReadable {
ByteBufferPositionedReadable, IOStatisticsSource {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Decryptor decryptor;
Expand Down Expand Up @@ -867,8 +871,16 @@ public boolean hasCapability(String capability) {
+ " does not expose its stream capabilities.");
}
return ((StreamCapabilities) in).hasCapability(capability);
case StreamCapabilities.IOSTATISTICS:
return (in instanceof StreamCapabilities)
&& ((StreamCapabilities) in).hasCapability(capability);
default:
return false;
}
}

@Override
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(in);
}
}
Expand Up @@ -28,9 +28,13 @@
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import com.google.common.base.Preconditions;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;

/**
* CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
Expand All @@ -48,7 +52,7 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CryptoOutputStream extends FilterOutputStream implements
Syncable, CanSetDropBehind, StreamCapabilities {
Syncable, CanSetDropBehind, StreamCapabilities, IOStatisticsSource {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Encryptor encryptor;
Expand Down Expand Up @@ -313,4 +317,9 @@ public boolean hasCapability(String capability) {
}
return false;
}

@Override
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(out);
}
}
Expand Up @@ -24,6 +24,10 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;


/**
Expand All @@ -33,7 +37,8 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class BufferedFSInputStream extends BufferedInputStream
implements Seekable, PositionedReadable, HasFileDescriptor {
implements Seekable, PositionedReadable, HasFileDescriptor,
IOStatisticsSource, StreamCapabilities {
/**
* Creates a <code>BufferedFSInputStream</code>
* with the specified buffer size,
Expand Down Expand Up @@ -126,4 +131,26 @@ public FileDescriptor getFileDescriptor() throws IOException {
return null;
}
}

/**
* If the inner stream supports {@link StreamCapabilities},
* forward the probe to it.
* Otherwise: return false.
*
* @param capability string to query the stream support for.
* @return true if a capability is known to be supported.
*/
@Override
public boolean hasCapability(final String capability) {
if (in instanceof StreamCapabilities) {
return ((StreamCapabilities) in).hasCapability(capability);
} else {
return false;
}
}

@Override
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(in);
}
}
Expand Up @@ -38,6 +38,9 @@
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
Expand Down Expand Up @@ -134,7 +137,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
* For open()'s FSInputStream
* It verifies that data matches checksums.
*******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker {
private static class ChecksumFSInputChecker extends FSInputChecker implements
IOStatisticsSource {
private ChecksumFileSystem fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
Expand Down Expand Up @@ -270,6 +274,17 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
}
return nread;
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}
}

private static class FSDataBoundedInputStream extends FSDataInputStream {
Expand Down Expand Up @@ -395,7 +410,8 @@ public static long getChecksumLength(long size, int bytesPerSum) {

/** This class provides an output stream for a checksummed file.
* It generates checksums for data. */
private static class ChecksumFSOutputSummer extends FSOutputSummer {
private static class ChecksumFSOutputSummer extends FSOutputSummer
implements IOStatisticsSource, StreamCapabilities {
private FSDataOutputStream datas;
private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f;
Expand Down Expand Up @@ -449,6 +465,28 @@ protected void checkClosed() throws IOException {
throw new ClosedChannelException();
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}

/**
* Probe the inner stream for a capability.
*
* @param capability string to query the stream support for.
* @return true if a capability is known to be supported.
*/
@Override
public boolean hasCapability(final String capability) {
return datas.hasCapability(capability);
}
}

@Override
Expand Down
Expand Up @@ -29,6 +29,9 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.IdentityHashStore;

Expand All @@ -40,7 +43,7 @@ public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
ByteBufferPositionedReadable {
ByteBufferPositionedReadable, IOStatisticsSource {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
Expand Down Expand Up @@ -267,4 +270,15 @@ public void readFully(long position, ByteBuffer buf) throws IOException {
"unsupported by " + in.getClass().getCanonicalName());
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(in);
}
}
Expand Up @@ -24,13 +24,17 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;

/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream
implements Syncable, CanSetDropBehind, StreamCapabilities {
implements Syncable, CanSetDropBehind, StreamCapabilities,
IOStatisticsSource {
private final OutputStream wrappedStream;

private static class PositionCache extends FilterOutputStream {
Expand Down Expand Up @@ -155,4 +159,15 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
"not support setting the drop-behind caching setting.");
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* empty statistics if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance.
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
}
}
Expand Up @@ -24,6 +24,9 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -134,4 +137,23 @@ public void readFully(long position, byte[] buffer)
throws IOException {
readFully(position, buffer, 0, buffer.length);
}

/**
* toString method returns the superclass toString, but if the subclass
* implements {@link IOStatisticsSource} then those statistics are
* extracted and included in the output.
* That is: statistics of subclasses are automatically reported.
* @return a string value.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
sb.append('{');
if (this instanceof IOStatisticsSource) {
sb.append(IOStatisticsLogging.ioStatisticsSourceToString(
(IOStatisticsSource) this));
}
sb.append('}');
return sb.toString();
}
}
Expand Up @@ -26,14 +26,20 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

/**
* MultipartUploader is an interface for copying files multipart and across
* multiple nodes.
* <p></p>
* The interface extends {@link IOStatisticsSource} so that there is no
* need to cast an instance to see if is a source of statistics.
* However, implementations MAY return null for their actual statistics.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface MultipartUploader extends Closeable {
public interface MultipartUploader extends Closeable,
IOStatisticsSource {


/**
Expand Down