Skip to content

Commit

Permalink
IGNITE-3890: IGFS: Simplified IgfsInputStream hierarchy.
Browse files Browse the repository at this point in the history
  • Loading branch information
vozerov-gridgain committed Sep 13, 2016
1 parent 43f65fe commit 16c5a71
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 103 deletions.
Expand Up @@ -25,6 +25,7 @@
import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsMetrics; import org.apache.ignite.igfs.IgfsMetrics;
import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsOutputStream; import org.apache.ignite.igfs.IgfsOutputStream;
Expand Down Expand Up @@ -125,18 +126,18 @@ public IgfsAsyncImpl(IgfsImpl igfs) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, @Override public IgfsInputStream open(IgfsPath path, int bufSize,
int seqReadsBeforePrefetch) { int seqReadsBeforePrefetch) {
return igfs.open(path, bufSize, seqReadsBeforePrefetch); return igfs.open(path, bufSize, seqReadsBeforePrefetch);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path) { @Override public IgfsInputStream open(IgfsPath path) {
return igfs.open(path); return igfs.open(path);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) { @Override public IgfsInputStream open(IgfsPath path, int bufSize) {
return igfs.open(path, bufSize); return igfs.open(path, bufSize);
} }


Expand Down
Expand Up @@ -49,16 +49,6 @@ public interface IgfsEx extends IgniteFileSystem {
*/ */
public IgfsPaths proxyPaths(); public IgfsPaths proxyPaths();


/** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch)
throws IgniteException;

/** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException;

/** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException;

/** /**
* Gets global space counters. * Gets global space counters.
* *
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsInvalidPathException; import org.apache.ignite.igfs.IgfsInvalidPathException;
import org.apache.ignite.igfs.IgfsMetrics; import org.apache.ignite.igfs.IgfsMetrics;
import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsMode;
Expand Down Expand Up @@ -948,24 +949,24 @@ private IgfsEntryInfo primaryInfoForListing(IgfsPath path) throws IgniteCheckedE
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path) { @Override public IgfsInputStream open(IgfsPath path) {
return open(path, cfg.getStreamBufferSize(), cfg.getSequentialReadsBeforePrefetch()); return open(path, cfg.getStreamBufferSize(), cfg.getSequentialReadsBeforePrefetch());
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) { @Override public IgfsInputStream open(IgfsPath path, int bufSize) {
return open(path, bufSize, cfg.getSequentialReadsBeforePrefetch()); return open(path, bufSize, cfg.getSequentialReadsBeforePrefetch());
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(final IgfsPath path, final int bufSize, @Override public IgfsInputStream open(final IgfsPath path, final int bufSize,
final int seqReadsBeforePrefetch) { final int seqReadsBeforePrefetch) {
A.notNull(path, "path"); A.notNull(path, "path");
A.ensure(bufSize >= 0, "bufSize >= 0"); A.ensure(bufSize >= 0, "bufSize >= 0");
A.ensure(seqReadsBeforePrefetch >= 0, "seqReadsBeforePrefetch >= 0"); A.ensure(seqReadsBeforePrefetch >= 0, "seqReadsBeforePrefetch >= 0");


return safeOp(new Callable<IgfsInputStreamAdapter>() { return safeOp(new Callable<IgfsInputStream>() {
@Override public IgfsInputStreamAdapter call() throws Exception { @Override public IgfsInputStream call() throws Exception {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Open file for reading [path=" + path + ", bufSize=" + bufSize + ']'); log.debug("Open file for reading [path=" + path + ", bufSize=" + bufSize + ']');


Expand Down

This file was deleted.

Expand Up @@ -46,7 +46,7 @@
/** /**
* Input stream to read data from grid cache with separate blocks. * Input stream to read data from grid cache with separate blocks.
*/ */
public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondaryFileSystemPositionedReadable {
/** Empty chunks result. */ /** Empty chunks result. */
private static final byte[][] EMPTY_CHUNKS = new byte[0][]; private static final byte[][] EMPTY_CHUNKS = new byte[0][];


Expand Down Expand Up @@ -158,8 +158,8 @@ public synchronized long bytes() {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsEntryInfo fileInfo() { @Override public long length() {
return fileInfo; return fileInfo.length();
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -234,9 +234,16 @@ public synchronized long bytes() {
return readFromStore(pos, buf, off, len); return readFromStore(pos, buf, off, len);
} }


/** {@inheritDoc} */ /**
* Reads bytes from given position.
*
* @param pos Position to read from.
* @param len Number of bytes to read.
* @return Array of chunks with respect to chunk file representation.
* @throws IOException If read failed.
*/
@SuppressWarnings("IfMayBeConditional") @SuppressWarnings("IfMayBeConditional")
@Override public synchronized byte[][] readChunks(long pos, int len) throws IOException { public synchronized byte[][] readChunks(long pos, int len) throws IOException {
// Readable bytes in the file, starting from the specified position. // Readable bytes in the file, starting from the specified position.
long readable = fileInfo.length() - pos; long readable = fileInfo.length() - pos;


Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteLogger;
import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsOutOfSpaceException; import org.apache.ignite.igfs.IgfsOutOfSpaceException;
import org.apache.ignite.igfs.IgfsOutputStream; import org.apache.ignite.igfs.IgfsOutputStream;
Expand Down Expand Up @@ -381,7 +382,7 @@ private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final
break; break;


case OPEN_READ: { case OPEN_READ: {
IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) : IgfsInputStream igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());


long streamId = registerResource(ses, igfsIn); long streamId = registerResource(ses, igfsIn);
Expand All @@ -390,7 +391,7 @@ private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final
log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" + log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');


res.response(new IgfsInputStreamDescriptor(streamId, igfsIn.fileInfo().length())); res.response(new IgfsInputStreamDescriptor(streamId, igfsIn.length()));


break; break;
} }
Expand Down Expand Up @@ -514,7 +515,7 @@ private IgfsMessage processStreamControlRequest(IgfsClientSession ses, IgfsIpcCo
long pos = req.position(); long pos = req.position();
int size = req.length(); int size = req.length();


IgfsInputStreamAdapter igfsIn = (IgfsInputStreamAdapter)resource(ses, rsrcId); IgfsInputStreamImpl igfsIn = (IgfsInputStreamImpl)resource(ses, rsrcId);


if (igfsIn == null) if (igfsIn == null)
throw new IgniteCheckedException("Input stream not found (already closed?): " + rsrcId); throw new IgniteCheckedException("Input stream not found (already closed?): " + rsrcId);
Expand Down
Expand Up @@ -86,7 +86,7 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystemV2 {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize)
throws IgniteException { throws IgniteException {
return igfs.open(path, bufSize); return (IgfsSecondaryFileSystemPositionedReadable)igfs.open(path, bufSize);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
Expand Up @@ -116,6 +116,7 @@ private void startPrimary() throws Exception {
* @return Configuration. * @return Configuration.
* @throws Exception If failed. * @throws Exception If failed.
*/ */
@SuppressWarnings("unchecked")
private IgniteConfiguration primaryConfiguration(int idx) throws Exception { private IgniteConfiguration primaryConfiguration(int idx) throws Exception {
FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); FileSystemConfiguration igfsCfg = new FileSystemConfiguration();


Expand Down Expand Up @@ -172,6 +173,7 @@ private IgniteConfiguration primaryConfiguration(int idx) throws Exception {
* *
* @throws Exception If failed. * @throws Exception If failed.
*/ */
@SuppressWarnings("unchecked")
private void startSecondary() throws Exception { private void startSecondary() throws Exception {
FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); FileSystemConfiguration igfsCfg = new FileSystemConfiguration();


Expand Down Expand Up @@ -384,6 +386,7 @@ public void testMultipleClose() throws Exception {
* *
* @throws Exception If failed. * @throws Exception If failed.
*/ */
@SuppressWarnings({"ResultOfMethodCallIgnored", "ConstantConditions"})
public void testBlockMetrics() throws Exception { public void testBlockMetrics() throws Exception {
IgfsEx igfs = (IgfsEx)igfsPrimary[0]; IgfsEx igfs = (IgfsEx)igfsPrimary[0];


Expand Down Expand Up @@ -424,15 +427,15 @@ public void testBlockMetrics() throws Exception {
checkBlockMetrics(initMetrics, igfs.metrics(), 0, 0, 0, 3, 0, blockSize * 3); checkBlockMetrics(initMetrics, igfs.metrics(), 0, 0, 0, 3, 0, blockSize * 3);


// Read data from the first file. // Read data from the first file.
IgfsInputStreamAdapter is = igfs.open(file1); IgfsInputStream is = igfs.open(file1);
is.readFully(0, new byte[blockSize * 2]); is.readFully(0, new byte[blockSize * 2]);
is.close(); is.close();


checkBlockMetrics(initMetrics, igfs.metrics(), 2, 0, blockSize * 2, 3, 0, blockSize * 3); checkBlockMetrics(initMetrics, igfs.metrics(), 2, 0, blockSize * 2, 3, 0, blockSize * 3);


// Read data from the second file with hits. // Read data from the second file with hits.
is = igfs.open(file2); is = igfs.open(file2);
is.readChunks(0, blockSize); is.read(new byte[blockSize]);
is.close(); is.close();


checkBlockMetrics(initMetrics, igfs.metrics(), 3, 0, blockSize * 3, 3, 0, blockSize * 3); checkBlockMetrics(initMetrics, igfs.metrics(), 3, 0, blockSize * 3, 3, 0, blockSize * 3);
Expand All @@ -449,7 +452,7 @@ public void testBlockMetrics() throws Exception {


// Read remote file. // Read remote file.
is = igfs.open(fileRemote); is = igfs.open(fileRemote);
is.readChunks(0, rmtBlockSize); is.read(new byte[rmtBlockSize]);
is.close(); is.close();


checkBlockMetrics(initMetrics, igfs.metrics(), 4, 1, blockSize * 3 + rmtBlockSize, 3, 0, blockSize * 3); checkBlockMetrics(initMetrics, igfs.metrics(), 4, 1, blockSize * 3 + rmtBlockSize, 3, 0, blockSize * 3);
Expand All @@ -459,7 +462,7 @@ public void testBlockMetrics() throws Exception {


// Read remote file again. // Read remote file again.
is = igfs.open(fileRemote); is = igfs.open(fileRemote);
is.readChunks(0, rmtBlockSize); is.read(new byte[rmtBlockSize]);
is.close(); is.close();


checkBlockMetrics(initMetrics, igfs.metrics(), 5, 1, blockSize * 3 + rmtBlockSize * 2, 3, 0, blockSize * 3); checkBlockMetrics(initMetrics, igfs.metrics(), 5, 1, blockSize * 3 + rmtBlockSize * 2, 3, 0, blockSize * 3);
Expand Down Expand Up @@ -495,16 +498,6 @@ public void testBlockMetrics() throws Exception {
checkBlockMetrics(initMetrics, igfs.metrics(), 5, 1, blockSize * 3 + rmtBlockSize * 2, 5, 1, checkBlockMetrics(initMetrics, igfs.metrics(), 5, 1, blockSize * 3 + rmtBlockSize * 2, 5, 1,
blockSize * 7 / 2 + rmtBlockSize); blockSize * 7 / 2 + rmtBlockSize);


// Now read partial block.
// Read remote file again.
is = igfs.open(file1);
is.seek(blockSize * 2);
is.readChunks(0, blockSize / 2);
is.close();

checkBlockMetrics(initMetrics, igfs.metrics(), 6, 1, blockSize * 7 / 2 + rmtBlockSize * 2, 5, 1,
blockSize * 7 / 2 + rmtBlockSize);

igfs.resetMetrics(); igfs.resetMetrics();


metrics = igfs.metrics(); metrics = igfs.metrics();
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsMetrics; import org.apache.ignite.igfs.IgfsMetrics;
import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsOutputStream; import org.apache.ignite.igfs.IgfsOutputStream;
Expand Down Expand Up @@ -75,21 +76,22 @@ public IgfsMock(@Nullable String name) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) throws IgniteException { @Override public IgfsInputStream open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch)
throws IgniteException {
throwUnsupported(); throwUnsupported();


return null; return null;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException { @Override public IgfsInputStream open(IgfsPath path) throws IgniteException {
throwUnsupported(); throwUnsupported();


return null; return null;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException { @Override public IgfsInputStream open(IgfsPath path, int bufSize) throws IgniteException {
throwUnsupported(); throwUnsupported();


return null; return null;
Expand Down
Expand Up @@ -28,14 +28,14 @@
import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteException;
import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsOutputStream; import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathSummary; import org.apache.ignite.igfs.IgfsPathSummary;
import org.apache.ignite.igfs.IgfsUserContext; import org.apache.ignite.igfs.IgfsUserContext;
import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
import org.apache.ignite.internal.processors.igfs.IgfsInputStreamAdapter;
import org.apache.ignite.internal.processors.igfs.IgfsStatus; import org.apache.ignite.internal.processors.igfs.IgfsStatus;
import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture;
Expand Down Expand Up @@ -316,9 +316,9 @@ public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteChec
try { try {
return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
@Override public HadoopIgfsStreamDelegate apply() { @Override public HadoopIgfsStreamDelegate apply() {
IgfsInputStreamAdapter stream = igfs.open(path, bufSize); IgfsInputStream stream = igfs.open(path, bufSize);


return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length());
} }
}); });
} }
Expand All @@ -336,9 +336,9 @@ public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteChec
try { try {
return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
@Override public HadoopIgfsStreamDelegate apply() { @Override public HadoopIgfsStreamDelegate apply() {
IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); IgfsInputStream stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);


return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length());
} }
}); });
} }
Expand Down Expand Up @@ -394,7 +394,7 @@ public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteChec
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
@Nullable byte[] outBuf, int outOff, int outLen) { @Nullable byte[] outBuf, int outOff, int outLen) {
IgfsInputStreamAdapter stream = delegate.target(); IgfsInputStream stream = delegate.target();


try { try {
byte[] res = null; byte[] res = null;
Expand Down

0 comments on commit 16c5a71

Please sign in to comment.