diff --git a/core/client/src/main/java/alluxio/client/block/stream/NettyPacketWriter.java b/core/client/src/main/java/alluxio/client/block/stream/NettyPacketWriter.java index e38d3fc90066..67284f4b25fc 100644 --- a/core/client/src/main/java/alluxio/client/block/stream/NettyPacketWriter.java +++ b/core/client/src/main/java/alluxio/client/block/stream/NettyPacketWriter.java @@ -210,8 +210,9 @@ public void cancel() throws IOException { throw Throwables.propagate(e); } finally { mLock.unlock(); - mClosed = true; } + // NOTE: PacketWriter#cancel doesn't imply PacketWriter#close. close must be called for every + // PacketWriter instance. } @Override diff --git a/core/client/src/main/java/alluxio/client/block/stream/PacketOutStream.java b/core/client/src/main/java/alluxio/client/block/stream/PacketOutStream.java index 9e9d4128363f..bc61094e1ffa 100644 --- a/core/client/src/main/java/alluxio/client/block/stream/PacketOutStream.java +++ b/core/client/src/main/java/alluxio/client/block/stream/PacketOutStream.java @@ -20,6 +20,7 @@ import alluxio.util.network.NetworkAddressUtils; import com.google.common.base.Preconditions; +import com.google.common.io.Closer; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; @@ -37,6 +38,7 @@ */ @NotThreadSafe public final class PacketOutStream extends OutputStream implements BoundedStream, Cancelable { + private final Closer mCloser; /** Length of the stream. If unknown, set to Long.MAX_VALUE. */ private final long mLength; private ByteBuf mCurrentPacket = null; @@ -114,9 +116,11 @@ public static PacketOutStream createReplicatedPacketOutStream(FileSystemContext * @param length the length of the stream */ private PacketOutStream(PacketWriter packetWriter, long length) { + mCloser = Closer.create(); mLength = length; mPacketWriters = new ArrayList<>(1); mPacketWriters.add(packetWriter); + mCloser.register(packetWriter); mClosed = false; } @@ -127,8 +131,12 @@ private PacketOutStream(PacketWriter packetWriter, long length) { * @param length the length of the stream */ private PacketOutStream(List packetWriters, long length) { + mCloser = Closer.create(); mLength = length; mPacketWriters = packetWriters; + for (PacketWriter packetWriter : packetWriters) { + mCloser.register(packetWriter); + } mClosed = false; } @@ -196,9 +204,21 @@ public void cancel() throws IOException { return; } releaseCurrentPacket(); + + IOException exception = null; for (PacketWriter packetWriter : mPacketWriters) { - packetWriter.cancel(); + try { + packetWriter.cancel(); + } catch (IOException e) { + exception = e; + } } + if (exception != null) { + throw exception; + } + + // NOTE: PacketOutStream#cancel doesn't imply PacketOutStream#close. PacketOutStream#close + // must be closed for every PacketOutStream instance. } @Override @@ -207,18 +227,7 @@ public void close() throws IOException { updateCurrentPacket(true); } finally { mClosed = true; - - IOException e = null; - for (PacketWriter packetWriter : mPacketWriters) { - try { - packetWriter.close(); - } catch (IOException ee) { - e = ee; - } - } - if (e != null) { - throw e; - } + mCloser.close(); } }