Skip to content

Commit

Permalink
Address Bin's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Dec 22, 2016
1 parent 6ab58a5 commit b55285b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
Expand Up @@ -210,8 +210,9 @@ public void cancel() throws IOException {
throw Throwables.propagate(e);
} finally {
mLock.unlock();
mClosed = true;
}
// NOTE: PacketWriter#cancel doesn't imply PacketWriter#close. close must be called for every
// PacketWriter instance.
}

@Override
Expand Down
Expand Up @@ -20,6 +20,7 @@
import alluxio.util.network.NetworkAddressUtils;

import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

Expand All @@ -37,6 +38,7 @@
*/
@NotThreadSafe
public final class PacketOutStream extends OutputStream implements BoundedStream, Cancelable {
private final Closer mCloser;
/** Length of the stream. If unknown, set to Long.MAX_VALUE. */
private final long mLength;
private ByteBuf mCurrentPacket = null;
Expand Down Expand Up @@ -114,9 +116,11 @@ public static PacketOutStream createReplicatedPacketOutStream(FileSystemContext
* @param length the length of the stream
*/
private PacketOutStream(PacketWriter packetWriter, long length) {
mCloser = Closer.create();
mLength = length;
mPacketWriters = new ArrayList<>(1);
mPacketWriters.add(packetWriter);
mCloser.register(packetWriter);
mClosed = false;
}

Expand All @@ -127,8 +131,12 @@ private PacketOutStream(PacketWriter packetWriter, long length) {
* @param length the length of the stream
*/
private PacketOutStream(List<PacketWriter> packetWriters, long length) {
mCloser = Closer.create();
mLength = length;
mPacketWriters = packetWriters;
for (PacketWriter packetWriter : packetWriters) {
mCloser.register(packetWriter);
}
mClosed = false;
}

Expand Down Expand Up @@ -196,9 +204,21 @@ public void cancel() throws IOException {
return;
}
releaseCurrentPacket();

IOException exception = null;
for (PacketWriter packetWriter : mPacketWriters) {
packetWriter.cancel();
try {
packetWriter.cancel();
} catch (IOException e) {
exception = e;
}
}
if (exception != null) {
throw exception;
}

// NOTE: PacketOutStream#cancel doesn't imply PacketOutStream#close. PacketOutStream#close
// must be closed for every PacketOutStream instance.
}

@Override
Expand All @@ -207,18 +227,7 @@ public void close() throws IOException {
updateCurrentPacket(true);
} finally {
mClosed = true;

IOException e = null;
for (PacketWriter packetWriter : mPacketWriters) {
try {
packetWriter.close();
} catch (IOException ee) {
e = ee;
}
}
if (e != null) {
throw e;
}
mCloser.close();
}
}

Expand Down

0 comments on commit b55285b

Please sign in to comment.