Skip to content

Commit

Permalink
Convert more IOExceptions to AlluxioStatusException
Browse files Browse the repository at this point in the history
  • Loading branch information
aaudiber committed Apr 18, 2017
1 parent d90678a commit 1545533
Show file tree
Hide file tree
Showing 25 changed files with 262 additions and 175 deletions.
Expand Up @@ -17,7 +17,9 @@
import alluxio.client.file.URIStatus;
import alluxio.client.file.options.CreateFileOptions;
import alluxio.client.file.options.OpenFileOptions;
import alluxio.exception.AlluxioException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -84,7 +86,7 @@ public static void createByteFile(FileSystem fs, AlluxioURI fileURI,
arr[k] = (byte) k;
}
os.write(arr);
} catch (Exception e) {
} catch (IOException | AlluxioException e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -123,7 +125,7 @@ public static List<String> listFiles(FileSystem fs, String path) {
}
}
return res;
} catch (Exception e) {
} catch (IOException | AlluxioException e) {
throw new RuntimeException(e);
}
}
Expand Down
Expand Up @@ -157,6 +157,9 @@ public static BlockWorkerClient create(BlockWorkerThriftClientPool clientPool,
*/
void sessionHeartbeat(RetryPolicy retryPolicy) throws InterruptedException;

/**
* Overridden to indicate that this does not throw IOException.
*/
@Override
void close();
}
Expand Up @@ -279,7 +279,7 @@ public String call(BlockWorkerClientService.Client client) throws TException {
});
} catch (ResourceExhaustedException e) {
throw new ResourceExhaustedException(ExceptionMessage.CANNOT_REQUEST_SPACE
.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, mRpcAddress, blockId));
.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, mRpcAddress, blockId), e);
}
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.resource.LockBlockResource;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.proto.dataserver.Protocol;
import alluxio.util.CommonUtils;
import alluxio.util.network.NetworkAddressUtils;
Expand Down Expand Up @@ -178,15 +179,19 @@ public static BlockInStream createUfsBlockInStream(FileSystemContext context, St
!options.getAlluxioStorageType().isStore(), Protocol.RequestType.UFS_BLOCK));
}
return new BlockInStream(inStream, blockWorkerClient, closer, options);
} catch (Exception e) {
} catch (RuntimeException e) {
CommonUtils.closeQuietly(closer);
throw e;
}
}

@Override
public void close() throws IOException {
mCloser.close();
public void close() {
try {
mCloser.close();
} catch (IOException e) {
throw AlluxioStatusException.fromIOException(e);
}
}

@Override
Expand All @@ -195,12 +200,12 @@ public long remaining() {
}

@Override
public void seek(long pos) throws IOException {
public void seek(long pos) {
mInputStream.seek(pos);
}

@Override
public int positionedRead(long pos, byte[] b, int off, int len) throws IOException {
public int positionedRead(long pos, byte[] b, int off, int len) {
return mInputStream.positionedRead(pos, b, off, len);
}

Expand Down
Expand Up @@ -12,10 +12,11 @@
package alluxio.client.block.stream;

import alluxio.client.BoundedStream;
import alluxio.client.Cancelable;
import alluxio.client.QuietlyCancelable;
import alluxio.client.block.BlockWorkerClient;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.proto.dataserver.Protocol;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerNetAddress;
Expand All @@ -33,7 +34,7 @@
* {@link alluxio.client.block.AlluxioBlockStore#getOutStream(long, long, OutStreamOptions)}.
*/
@NotThreadSafe
public class BlockOutStream extends FilterOutputStream implements BoundedStream, Cancelable {
public class BlockOutStream extends FilterOutputStream implements BoundedStream, QuietlyCancelable {
private final long mBlockId;
private final long mBlockSize;
private final Closer mCloser;
Expand Down Expand Up @@ -112,41 +113,38 @@ public long remaining() {
}

@Override
public void cancel() throws IOException {
public void cancel() {
if (mClosed) {
return;
}
Throwable throwable = null;
RuntimeException exception = null;
try {
mOutStream.cancel();
} catch (Throwable e) {
throwable = e;
} catch (RuntimeException e) {
exception = e;
}
try {
mBlockWorkerClient.cancelBlock(mBlockId);
} catch (Throwable e) {
throwable = e;
} catch (RuntimeException e) {
exception = e;
}

if (throwable == null) {
if (exception == null) {
mClosed = true;
return;
}

try {
mCloser.close();
} catch (Throwable e) {
} catch (IOException | RuntimeException e) {
// Ignore
}
mClosed = true;
if (throwable instanceof IOException) {
throw (IOException) throwable;
}
throw new IOException(throwable);
throw AlluxioStatusException.fromRuntimeException(exception);
}

@Override
public void close() throws IOException {
public void close() {
if (mClosed) {
return;
}
Expand All @@ -155,8 +153,10 @@ public void close() throws IOException {
if (remaining() < mBlockSize) {
mBlockWorkerClient.cacheBlock(mBlockId);
}
} catch (IOException e) {
throw AlluxioStatusException.fromIOException(e);
} finally {
mCloser.close();
CommonUtils.close(mCloser);
mClosed = true;
}
}
Expand Down
Expand Up @@ -17,7 +17,6 @@
import alluxio.network.protocol.databuffer.DataByteBuffer;
import alluxio.worker.block.io.LocalFileBlockReader;

import java.io.IOException;
import java.nio.ByteBuffer;

import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -50,7 +49,7 @@ public LocalFilePacketReader(LocalFileBlockReader reader, long offset, long len)
}

@Override
public DataBuffer readPacket() throws IOException {
public DataBuffer readPacket() {
if (mPos >= mEnd) {
return null;
}
Expand All @@ -66,7 +65,7 @@ public long pos() {
}

@Override
public void close() throws IOException {
public void close() {
mReader.close();
}

Expand All @@ -86,7 +85,7 @@ public Factory(String path) {
}

@Override
public PacketReader create(long offset, long len) throws IOException {
public PacketReader create(long offset, long len) {
return new LocalFilePacketReader(new LocalFileBlockReader(mPath), offset, len);
}

Expand Down
Expand Up @@ -80,15 +80,15 @@ public void writePacket(final ByteBuf buf) throws IOException {
}

@Override
public void cancel() throws IOException {
public void cancel() {
close();
}

@Override
public void flush() {}

@Override
public void close() throws IOException {
public void close() {
if (mClosed) {
return;
}
Expand Down
Expand Up @@ -14,12 +14,13 @@
import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.client.file.FileSystemContext;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.exception.status.UnavailableException;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.Status;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataNettyBufferV2;
import alluxio.proto.dataserver.Protocol;
import alluxio.util.CommonUtils;
import alluxio.util.network.NettyUtils;
import alluxio.util.proto.ProtoMessage;

Expand Down Expand Up @@ -120,11 +121,10 @@ public final class NettyPacketReader implements PacketReader {
* @param sessionId the session ID
* @param noCache do not cache the block to the Alluxio worker if read from UFS when this is set
* @param type the request type (block or UFS file)
* @throws IOException if it fails to acquire a netty channel
*/
private NettyPacketReader(FileSystemContext context, InetSocketAddress address, long id,
long offset, long len, long lockId, long sessionId, boolean noCache,
Protocol.RequestType type) throws IOException {
Protocol.RequestType type) {
Preconditions.checkArgument(offset >= 0 && len > 0);

mContext = context;
Expand Down Expand Up @@ -153,7 +153,7 @@ public long pos() {
}

@Override
public DataBuffer readPacket() throws IOException {
public DataBuffer readPacket() {
Preconditions.checkState(!mClosed, "PacketReader is closed while reading packets.");
ByteBuf buf;

Expand All @@ -167,10 +167,11 @@ public DataBuffer readPacket() throws IOException {
throw new RuntimeException(e);
}
if (buf == null) {
throw new IOException(String.format("Timeout to read %d from %s.", mId, mChannel.toString()));
throw new DeadlineExceededException(
String.format("Timeout to read %d from %s.", mId, mChannel.toString()));
}
if (buf == THROWABLE) {
throw CommonUtils.castToIOException(Preconditions.checkNotNull(mPacketReaderException));
throw new RuntimeException(Preconditions.checkNotNull(mPacketReaderException));
}
if (buf == EOF_OR_CANCELLED) {
mDone = true;
Expand Down Expand Up @@ -203,7 +204,7 @@ public void close() {

try {
readAndDiscardAll();
} catch (IOException e) {
} catch (UnavailableException e) {
LOG.warn("Failed to close the NettyBlockReader (block: {}, address: {}) with exception {}.",
mId, mAddress, e.getMessage());
mChannel.close();
Expand All @@ -223,10 +224,8 @@ public void close() {

/**
* Reads and discards everything read from the channel until it reaches end of the stream.
*
* @throws IOException if any I/O related errors occur
*/
private void readAndDiscardAll() throws IOException {
private void readAndDiscardAll() {
DataBuffer buf;
do {
buf = readPacket();
Expand Down Expand Up @@ -371,7 +370,7 @@ public Factory(FileSystemContext context, InetSocketAddress address, long id, lo
}

@Override
public PacketReader create(long offset, long len) throws IOException {
public PacketReader create(long offset, long len) {
return new NettyPacketReader(mContext, mAddress, mId, offset, len, mLockId, mSessionId,
mNoCache, mRequestType);
}
Expand Down
Expand Up @@ -14,6 +14,8 @@
import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.client.file.FileSystemContext;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.exception.status.UnavailableException;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.Status;
import alluxio.network.protocol.databuffer.DataBuffer;
Expand Down Expand Up @@ -182,7 +184,7 @@ public void writePacket(final ByteBuf buf) throws IOException {
}

@Override
public void cancel() throws IOException {
public void cancel() {
if (mClosed) {
return;
}
Expand Down Expand Up @@ -212,7 +214,7 @@ public void flush() throws IOException {
}

@Override
public void close() throws IOException {
public void close() {
if (mClosed) {
return;
}
Expand All @@ -227,11 +229,11 @@ public void close() throws IOException {
try {
if (mPacketWriteException != null) {
mChannel.close().sync();
throw new IOException(mPacketWriteException);
throw new UnavailableException(mPacketWriteException);
}
if (!mDoneOrFailed.await(WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
mChannel.close().sync();
throw new IOException(String.format(
throw new DeadlineExceededException(String.format(
"Timeout closing PacketWriter to %s for request %s.", mAddress, mPartialRequest));
}
} catch (InterruptedException e) {
Expand Down
Expand Up @@ -139,7 +139,7 @@ public int read(byte[] b, int off, int len) throws IOException {
}

@Override
public int positionedRead(long pos, byte[] b, int off, int len) throws IOException {
public int positionedRead(long pos, byte[] b, int off, int len) {
if (len == 0) {
return 0;
}
Expand Down Expand Up @@ -182,7 +182,7 @@ public long remaining() {
}

@Override
public void seek(long pos) throws IOException {
public void seek(long pos) {
checkIfClosed();
Preconditions.checkArgument(pos >= 0, PreconditionMessage.ERR_SEEK_NEGATIVE.toString(), pos);
Preconditions
Expand Down Expand Up @@ -248,7 +248,7 @@ private void readPacket() throws IOException {
/**
* Close the current packet reader.
*/
private void closePacketReader() throws IOException {
private void closePacketReader() {
if (mCurrentPacket != null) {
mCurrentPacket.release();
mCurrentPacket = null;
Expand Down

0 comments on commit 1545533

Please sign in to comment.