Skip to content

Commit

Permalink
Revert "Revert "[TACHYON-108] Remote write support""
Browse files Browse the repository at this point in the history
  • Loading branch information
haoyuan committed Jul 8, 2015
1 parent 9033a1e commit aa3f042
Show file tree
Hide file tree
Showing 29 changed files with 1,431 additions and 316 deletions.
245 changes: 37 additions & 208 deletions clients/unshaded/src/main/java/tachyon/client/BlockOutStream.java
Expand Up @@ -16,239 +16,68 @@
package tachyon.client; package tachyon.client;


import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;

import com.google.common.primitives.Ints;
import com.google.common.io.Closer;


import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import tachyon.Constants; import tachyon.Constants;
import tachyon.conf.TachyonConf; import tachyon.conf.TachyonConf;
import tachyon.util.CommonUtils;


/** /**
* <code>BlockOutStream</code> implementation of TachyonFile. This class is not client facing. * <code>BlockOutStream</code> interface for writing data to a block. This class is not client
* facing.
*/ */
public class BlockOutStream extends OutStream { public abstract class BlockOutStream extends OutStream {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);


private final int mBlockIndex;
private final long mBlockCapacityByte;
private final long mBlockId;
private final long mBlockOffset;
private final Closer mCloser = Closer.create();
private final String mLocalFilePath;
private final RandomAccessFile mLocalFile;
private final FileChannel mLocalFileChannel;
private final ByteBuffer mBuffer;

private long mAvailableBytes = 0;
private long mInFileBytes = 0;
private long mWrittenBytes = 0;

private boolean mCanWrite = false;
private boolean mClosed = false;

/**
* @param file the file the block belongs to
* @param opType the OutStream's write type
* @param blockIndex the index of the block in the file
* @param tachyonConf the TachyonConf instance for this file output stream.
* @throws IOException
*/
BlockOutStream(TachyonFile file, WriteType opType, int blockIndex, TachyonConf tachyonConf)
throws IOException {
this(file, opType, blockIndex, tachyonConf.getBytes(Constants.USER_QUOTA_UNIT_BYTES,
8 * Constants.MB), tachyonConf);
}

/** /**
* @param file the file the block belongs to * Get a new BlockOutStream with a default initial size allocated to the block.
* @param opType the OutStream's write type *
* @param blockIndex the index of the block in the file * @param tachyonFile The file this block belongs to.
* @param initialBytes the initial size bytes that will be allocated to the block * @param opType The type of write.
* @param tachyonConf the TachyonConf instance for this file output stream. * @param blockIndex The index of the block in the tachyonFile.
* @param tachyonConf The TachyonConf instance.
* @return A new {@link LocalBlockOutStream} or {@link RemoteBlockOutStream}.
* @throws IOException * @throws IOException
*/ */
BlockOutStream(TachyonFile file, WriteType opType, int blockIndex, long initialBytes, public static BlockOutStream get(TachyonFile tachyonFile, WriteType opType, int blockIndex,
TachyonConf tachyonConf) throws IOException { TachyonConf tachyonConf) throws IOException {
super(file, opType, tachyonConf); return get(tachyonFile, opType, blockIndex,

tachyonConf.getBytes(Constants.USER_QUOTA_UNIT_BYTES, 8 * Constants.MB), tachyonConf);
if (!opType.isCache()) {
throw new IOException("BlockOutStream only support WriteType.CACHE");
}

mBlockIndex = blockIndex;
mBlockCapacityByte = mFile.getBlockSizeByte();
mBlockId = mFile.getBlockId(mBlockIndex);
mBlockOffset = mBlockCapacityByte * blockIndex;

mCanWrite = true;

if (!mTachyonFS.hasLocalWorker()) {
mCanWrite = false;
String msg = "The machine does not have any local worker.";
throw new IOException(msg);
}
mLocalFilePath = mTachyonFS.getLocalBlockTemporaryPath(mBlockId, initialBytes);
mLocalFile = mCloser.register(new RandomAccessFile(mLocalFilePath, "rw"));
mLocalFileChannel = mCloser.register(mLocalFile.getChannel());
// change the permission of the temporary file in order that the worker can move it.
CommonUtils.changeLocalFileToFullPermission(mLocalFilePath);
// use the sticky bit, only the client and the worker can write to the block
CommonUtils.setLocalFileStickyBit(mLocalFilePath);
LOG.info(mLocalFilePath + " was created!");
mAvailableBytes += initialBytes;

long allocateBytes = mTachyonConf.getBytes(Constants.USER_FILE_BUFFER_BYTES, Constants.MB) + 4L;
mBuffer = ByteBuffer.allocate(Ints.checkedCast(allocateBytes));
}

private synchronized void appendCurrentBuffer(byte[] buf, int offset, int length)
throws IOException {
if (mAvailableBytes < length) {
long bytesRequested = mTachyonFS.requestSpace(mBlockId, length - mAvailableBytes);
if (bytesRequested + mAvailableBytes >= length) {
mAvailableBytes += bytesRequested;
} else {
mCanWrite = false;
throw new IOException(String.format("No enough space on local worker: fileId(%d)"
+ " blockId(%d) requestSize(%d)", mFile.mFileId, mBlockId, length - mAvailableBytes));
}
}

MappedByteBuffer out = mLocalFileChannel.map(MapMode.READ_WRITE, mInFileBytes, length);
out.put(buf, offset, length);
CommonUtils.cleanDirectBuffer(out);
mInFileBytes += length;
mAvailableBytes -= length;
}

@Override
public void cancel() throws IOException {
if (!mClosed) {
mCloser.close();
mClosed = true;
mTachyonFS.cancelBlock(mBlockId);
LOG.info(String.format("Canceled output of block. blockId(%d) path(%s)", mBlockId,
mLocalFilePath));
}
} }


/** /**
* @return true if the stream can write and is not closed, otherwise false *
* @param tachyonFile The file this block belongs to.
* @param opType The type of write.
* @param blockIndex The index of the block in the tachyonFile.
* @param initialBytes The initial size (in bytes) that will be allocated to the block.
* @param tachyonConf The TachyonConf instance.
* @return A new {@link LocalBlockOutStream} or {@link RemoteBlockOutStream}.
* @throws IOException
*/ */
public boolean canWrite() { public static BlockOutStream get(TachyonFile tachyonFile, WriteType opType, int blockIndex,
return !mClosed && mCanWrite; long initialBytes, TachyonConf tachyonConf) throws IOException {
}

if (tachyonFile.mTachyonFS.hasLocalWorker()
@Override && tachyonConf.getBoolean(Constants.USER_ENABLE_LOCAL_WRITE,
public void close() throws IOException { Constants.DEFAULT_USER_ENABLE_LOCAL_WRITE)) {
if (!mClosed) { LOG.info("Writing with local stream. tachyonFile: " + tachyonFile + ", blockIndex: "
if (mBuffer.position() > 0) { + blockIndex + ", opType: " + opType);
appendCurrentBuffer(mBuffer.array(), 0, mBuffer.position()); return new LocalBlockOutStream(tachyonFile, opType, blockIndex, initialBytes, tachyonConf);
}
mCloser.close();
mTachyonFS.cacheBlock(mBlockId);
mClosed = true;
} }
}


@Override LOG.info("Writing with remote stream. tachyonFile: " + tachyonFile + ", blockIndex: "
public void flush() throws IOException { + blockIndex + ", opType: " + opType);
// Since this only writes to memory, this flush is not outside visible. return new RemoteBlockOutStream(tachyonFile, opType, blockIndex, initialBytes, tachyonConf);
} }


/** protected BlockOutStream(TachyonFile tachyonFile, WriteType opType, TachyonConf tachyonConf) {
* @return the block id of the block super(tachyonFile, opType, tachyonConf);
*/
public long getBlockId() {
return mBlockId;
} }


/** /**
* @return the block offset in the file. * @return the remaining space of the block, in bytes.
*/ */
public long getBlockOffset() { public abstract long getRemainingSpaceBytes();
return mBlockOffset;
}

/**
* @return the remaining space of the block, in bytes
*/
public long getRemainingSpaceByte() {
return mBlockCapacityByte - mWrittenBytes;
}

@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|| ((off + len) < 0)) {
throw new IndexOutOfBoundsException(String.format("Buffer length (%d), offset(%d), len(%d)",
b.length, off, len));
}

if (!canWrite()) {
throw new IOException("Can not write cache.");
}
if (mWrittenBytes + len > mBlockCapacityByte) {
throw new IOException("Out of capacity.");
}

long userFileBufferBytes =
mTachyonConf.getBytes(Constants.USER_FILE_BUFFER_BYTES, Constants.MB);
if (mBuffer.position() > 0 && mBuffer.position() + len > userFileBufferBytes) {
// Write the non-empty buffer if the new write will overflow it.
appendCurrentBuffer(mBuffer.array(), 0, mBuffer.position());
mBuffer.clear();
}

if (len > userFileBufferBytes / 2) {
// This write is "large", so do not write it to the buffer, but write it out directly to the
// mapped file.
if (mBuffer.position() > 0) {
// Make sure all bytes in the buffer are written out first, to prevent out-of-order writes.
appendCurrentBuffer(mBuffer.array(), 0, mBuffer.position());
mBuffer.clear();
}
appendCurrentBuffer(b, off, len);
} else if (len > 0) {
// Write the data to the buffer, and not directly to the mapped file.
mBuffer.put(b, off, len);
}

mWrittenBytes += len;
}

@Override
public void write(int b) throws IOException {
if (!canWrite()) {
throw new IOException("Can not write cache.");
}
if (mWrittenBytes + 1 > mBlockCapacityByte) {
throw new IOException("Out of capacity.");
}

if (mBuffer.position()
>= mTachyonConf.getBytes(Constants.USER_FILE_BUFFER_BYTES, Constants.MB)) {
appendCurrentBuffer(mBuffer.array(), 0, mBuffer.position());
mBuffer.clear();
}

CommonUtils.putIntByteBuffer(mBuffer, b);
mWrittenBytes ++;
}
} }
Expand Up @@ -22,4 +22,6 @@ public class ClientConstants {


public static final Class<? extends RemoteBlockReader> USER_REMOTE_BLOCK_READER_CLASS = public static final Class<? extends RemoteBlockReader> USER_REMOTE_BLOCK_READER_CLASS =
tachyon.client.tcp.TCPRemoteBlockReader.class; tachyon.client.tcp.TCPRemoteBlockReader.class;
public static final Class<? extends RemoteBlockWriter> USER_REMOTE_BLOCK_WRITER_CLASS =
tachyon.client.netty.NettyRemoteBlockWriter.class;
} }
11 changes: 6 additions & 5 deletions clients/unshaded/src/main/java/tachyon/client/FileOutStream.java
Expand Up @@ -149,7 +149,7 @@ public void flush() throws IOException {


private void getNextBlock() throws IOException { private void getNextBlock() throws IOException {
if (mCurrentBlockOutStream != null) { if (mCurrentBlockOutStream != null) {
if (mCurrentBlockOutStream.getRemainingSpaceByte() > 0) { if (mCurrentBlockOutStream.getRemainingSpaceBytes() > 0) {
throw new IOException("The current block still has space left, no need to get new block"); throw new IOException("The current block still has space left, no need to get new block");
} }
mPreviousBlockOutStreams.add(mCurrentBlockOutStream); mPreviousBlockOutStreams.add(mCurrentBlockOutStream);
Expand All @@ -158,7 +158,7 @@ private void getNextBlock() throws IOException {


if (mWriteType.isCache()) { if (mWriteType.isCache()) {
int offset = (int) (mCachedBytes / mBlockCapacityByte); int offset = (int) (mCachedBytes / mBlockCapacityByte);
mCurrentBlockOutStream = new BlockOutStream(mFile, mWriteType, offset, mTachyonConf); mCurrentBlockOutStream = BlockOutStream.get(mFile, mWriteType, offset, mTachyonConf);
} }
} }


Expand All @@ -182,10 +182,10 @@ public void write(byte[] b, int off, int len) throws IOException {
int tOff = off; int tOff = off;
while (tLen > 0) { while (tLen > 0) {
if (mCurrentBlockOutStream == null if (mCurrentBlockOutStream == null
|| mCurrentBlockOutStream.getRemainingSpaceByte() == 0) { || mCurrentBlockOutStream.getRemainingSpaceBytes() == 0) {
getNextBlock(); getNextBlock();
} }
long currentBlockLeftBytes = mCurrentBlockOutStream.getRemainingSpaceByte(); long currentBlockLeftBytes = mCurrentBlockOutStream.getRemainingSpaceBytes();
if (currentBlockLeftBytes >= tLen) { if (currentBlockLeftBytes >= tLen) {
mCurrentBlockOutStream.write(b, tOff, tLen); mCurrentBlockOutStream.write(b, tOff, tLen);
mCachedBytes += tLen; mCachedBytes += tLen;
Expand Down Expand Up @@ -219,7 +219,8 @@ public void write(byte[] b, int off, int len) throws IOException {
public void write(int b) throws IOException { public void write(int b) throws IOException {
if (mWriteType.isCache()) { if (mWriteType.isCache()) {
try { try {
if (mCurrentBlockOutStream == null || mCurrentBlockOutStream.getRemainingSpaceByte() == 0) { if (mCurrentBlockOutStream == null
|| mCurrentBlockOutStream.getRemainingSpaceBytes() == 0) {
getNextBlock(); getNextBlock();
} }
// TODO Cache the exception here. // TODO Cache the exception here.
Expand Down

0 comments on commit aa3f042

Please sign in to comment.