Skip to content

Commit

Permalink
change
Browse files Browse the repository at this point in the history
  • Loading branch information
Lichongjie committed Mar 20, 2017
1 parent cdc6204 commit 9c1eb16
Show file tree
Hide file tree
Showing 137 changed files with 6,051 additions and 1,364 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -5,7 +5,7 @@ The master branch is in version 1.5.0-SNAPSHOT:


- [Alluxio Open Source Website](http://www.alluxio.org/) | [Alluxio Latest Release Document](http://www.alluxio.org/documentation/) | [Master Branch Document](http://alluxio.org/documentation/master/) | [Alluxio Inc.](http://www.alluxio.com/) - [Alluxio Open Source Website](http://www.alluxio.org/) | [Alluxio Latest Release Document](http://www.alluxio.org/documentation/) | [Master Branch Document](http://alluxio.org/documentation/master/) | [Alluxio Inc.](http://www.alluxio.com/)
- [Contribute to Alluxio](http://alluxio.org/documentation/master/en/Contributing-to-Alluxio.html) and - [Contribute to Alluxio](http://alluxio.org/documentation/master/en/Contributing-to-Alluxio.html) and
[New Contributor Tasks](https://alluxio.atlassian.net/issues/?jql=project%20%3D%20ALLUXIO%20AND%20labels%20%3D%20NewContributor%20AND%20status%20%3D%20Open) [New Contributor Tasks](https://alluxio.atlassian.net/browse/ALLUXIO-2506?jql=project%20%3D%20ALLUXIO%20AND%20labels%20%3D%20NewContributor%20AND%20status%20%3D%20Open%20AND%20Assignee%20%3D%20null)
- Please limit 2 tasks per new contributor. Afterwards, try some [beginner tasks](https://alluxio.atlassian.net/issues/?jql=project%20%3D%20ALLUXIO%20AND%20labels%20%3D%20Beginner%20AND%20status%20%3D%20Open) or [intermediate tasks](https://alluxio.atlassian.net/issues/?jql=project%20%3D%20ALLUXIO%20AND%20labels%20%3D%20Intermediate%20AND%20status%20%3D%20Open), - Please limit 2 tasks per new contributor. Afterwards, try some [beginner tasks](https://alluxio.atlassian.net/issues/?jql=project%20%3D%20ALLUXIO%20AND%20labels%20%3D%20Beginner%20AND%20status%20%3D%20Open) or [intermediate tasks](https://alluxio.atlassian.net/issues/?jql=project%20%3D%20ALLUXIO%20AND%20labels%20%3D%20Intermediate%20AND%20status%20%3D%20Open),
or ask in the [Developer Mailing List](https://groups.google.com/forum/#!forum/alluxio-dev). or ask in the [Developer Mailing List](https://groups.google.com/forum/#!forum/alluxio-dev).
- [Releases](http://alluxio.org/releases/) - [Releases](http://alluxio.org/releases/)
Expand Down
Expand Up @@ -53,6 +53,7 @@ public static RemoteBlockReader create(FileSystemContext context) {
* @return a byte buffer containing the remote data block * @return a byte buffer containing the remote data block
* @throws IOException if the remote server is not reachable or responds with failures * @throws IOException if the remote server is not reachable or responds with failures
*/ */
// TODO(peis): Use options idiom (ALLUXIO-2579).
ByteBuffer readRemoteBlock(InetSocketAddress address, long blockId, long offset, ByteBuffer readRemoteBlock(InetSocketAddress address, long blockId, long offset,
long length, long lockId, long sessionId) throws IOException; long length, long lockId, long sessionId) throws IOException;
} }
@@ -0,0 +1,61 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client;

import alluxio.client.file.FileSystemContext;
import alluxio.client.netty.NettyUnderFileSystemBlockReader;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;

/**
* The interface to read an under file system file through a worker's data server.
*/
public interface UnderFileSystemBlockReader extends Closeable {

/**
* The factory for the {@link UnderFileSystemBlockReader}.
*/
class Factory {

private Factory() {} // prevent instantiation

/**
* Factory for {@link UnderFileSystemBlockReader}.
*
* @param context the file system context
* @return a new instance of {@link UnderFileSystemBlockReader}
*/
public static UnderFileSystemBlockReader create(FileSystemContext context) {
return new NettyUnderFileSystemBlockReader(context);
}
}

/**
* Reads a block from UFS with an offset and length.
*
* @param address the {@link InetSocketAddress} of the data server
* @param blockId the id of the block trying to read
* @param offset the offset within the block (not file)
* @param length the length the client wants to read
* @param sessionId the session id of the client
* @param noCache do not cache the data read from UFS in the Alluxio worker if set
* @return a byte buffer containing the remote data block
* @throws IOException if the remote server is not reachable or responds with failures
*/
// TODO(peis): Use options idiom (ALLUXIO-2579).
ByteBuffer read(InetSocketAddress address, long blockId, long offset, long length,
long sessionId, boolean noCache) throws IOException;
}

This file was deleted.

Expand Up @@ -153,7 +153,7 @@ public InputStream getInStream(long blockId, InStreamOptions options)
.createLocalBlockInStream(mContext, blockId, blockInfo.getLength(), workerNetAddress, .createLocalBlockInStream(mContext, blockId, blockInfo.getLength(), workerNetAddress,
options); options);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to open local stream for block " + blockId + ". " + e.getMessage()); LOG.warn("Failed to open local stream for block {}: {}", blockId, e.getMessage());
// Getting a local stream failed, do not try again // Getting a local stream failed, do not try again
break; break;
} }
Expand Down
Expand Up @@ -11,9 +11,10 @@


package alluxio.client.block; package alluxio.client.block;


import alluxio.client.block.options.LockBlockOptions;
import alluxio.client.resource.LockBlockResource;
import alluxio.exception.AlluxioException; import alluxio.exception.AlluxioException;
import alluxio.retry.RetryPolicy; import alluxio.retry.RetryPolicy;
import alluxio.wire.LockBlockResult;
import alluxio.wire.WorkerNetAddress; import alluxio.wire.WorkerNetAddress;


import java.io.Closeable; import java.io.Closeable;
Expand Down Expand Up @@ -96,11 +97,27 @@ public static BlockWorkerClient create(BlockWorkerThriftClientPool clientPool,
* unlocked. * unlocked.
* *
* @param blockId the ID of the block * @param blockId the ID of the block
* @return the path of the block file locked * @param options the lock block options
* @return the lock block result
* @throws IOException if a non-Alluxio exception occurs * @throws IOException if a non-Alluxio exception occurs
* @throws AlluxioException if an Alluxio error occurs * @throws AlluxioException if an Alluxio error occurs
*/ */
LockBlockResult lockBlock(final long blockId) throws IOException, AlluxioException; LockBlockResource lockBlock(final long blockId, final LockBlockOptions options)
throws IOException, AlluxioException;

/**
* A wrapper over {@link BlockWorkerClient#lockBlock(long, LockBlockOptions)} to lock a block
* that is not in Alluxio but in UFS. It retries if it fails to lock because of contention for
* the block on the worker.
*
* @param blockId the block ID
* @param options the lock block options
* @return the lock block result
* @throws IOException if a non-Alluxio exception occurs
* @throws AlluxioException if an Alluxio error occurs
*/
LockBlockResource lockUfsBlock(final long blockId, final LockBlockOptions options)
throws IOException, AlluxioException;


/** /**
* Promotes block back to the top StorageTier. * Promotes block back to the top StorageTier.
Expand Down
Expand Up @@ -11,10 +11,12 @@


package alluxio.client.block; package alluxio.client.block;


import alluxio.client.block.options.LockBlockOptions;
import alluxio.client.file.FileSystemContext; import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions; import alluxio.client.file.options.InStreamOptions;
import alluxio.exception.AlluxioException; import alluxio.exception.AlluxioException;
import alluxio.metrics.MetricsSystem; import alluxio.metrics.MetricsSystem;
import alluxio.util.CommonUtils;
import alluxio.util.io.BufferUtils; import alluxio.util.io.BufferUtils;
import alluxio.wire.LockBlockResult; import alluxio.wire.LockBlockResult;
import alluxio.wire.WorkerNetAddress; import alluxio.wire.WorkerNetAddress;
Expand All @@ -39,8 +41,6 @@ public final class LocalBlockInStream extends BufferedBlockInStream {
private final Closer mCloser; private final Closer mCloser;
/** Client to communicate with the local worker. */ /** Client to communicate with the local worker. */
private final BlockWorkerClient mBlockWorkerClient; private final BlockWorkerClient mBlockWorkerClient;
/** The block store context which provides block worker clients. */
private final FileSystemContext mContext;
/** The file reader to read a local block. */ /** The file reader to read a local block. */
private final LocalFileBlockReader mReader; private final LocalFileBlockReader mReader;


Expand All @@ -52,27 +52,63 @@ public final class LocalBlockInStream extends BufferedBlockInStream {
* @param workerNetAddress the address of the local worker * @param workerNetAddress the address of the local worker
* @param context the file system context * @param context the file system context
* @param options the instream options * @param options the instream options
* @return the {@link LocalBlockInStream} instance
* @throws IOException if I/O error occurs * @throws IOException if I/O error occurs
*/ */
public LocalBlockInStream(long blockId, long blockSize, WorkerNetAddress workerNetAddress, // TODO(peis): Use options idiom (ALLUXIO-2579).
FileSystemContext context, InStreamOptions options) throws IOException { public static LocalBlockInStream create(long blockId, long blockSize,
super(blockId, blockSize); WorkerNetAddress workerNetAddress, FileSystemContext context, InStreamOptions options)
mContext = context; throws IOException {

Closer closer = Closer.create();
mCloser = Closer.create();
try { try {
mBlockWorkerClient = mCloser.register(mContext.createBlockWorkerClient(workerNetAddress)); BlockWorkerClient client = closer.register(context.createBlockWorkerClient(workerNetAddress));
LockBlockResult result = mBlockWorkerClient.lockBlock(blockId); LockBlockResult result =
mReader = mCloser.register(new LocalFileBlockReader(result.getBlockPath())); closer.register(client.lockBlock(blockId, LockBlockOptions.defaults())).getResult();
} catch (AlluxioException e) { LocalFileBlockReader reader =
mCloser.close(); closer.register(new LocalFileBlockReader(result.getBlockPath()));
throw new IOException(e); return new LocalBlockInStream(client, blockId, blockSize, reader, closer, options);
} catch (IOException e) { } catch (AlluxioException | IOException e) {
mCloser.close(); CommonUtils.closeQuitely(closer);
throw e; throw CommonUtils.castToIOException(e);
} }
} }


/**
* Creates a local block input stream. It requires the block to be locked before calling this.
*
* @param client the block worker client
* @param blockId the block id
* @param blockSize the size of the block
* @param reader the local block file reader, the {@link LocalBlockInStream} created should
* close this reader
* @param closer the closer registered with closable resources open so far
* @param options the instream options
* @return the {@link LocalBlockInStream} instance
*/
// TODO(peis): Use options idiom (ALLUXIO-2579).
public static LocalBlockInStream createWithLockedBlock(BlockWorkerClient client, long blockId,
long blockSize, LocalFileBlockReader reader, Closer closer, InStreamOptions options) {
return new LocalBlockInStream(client, blockId, blockSize, reader, closer, options);
}

/**
* Creates a local block input stream. It requires the block to be locked before calling this.
*
* @param client the block worker client
* @param blockId the block id
* @param blockSize the size of the block
* @param reader the local block file reader which should be closed by this class
* @param closer the closer registered with closable resources open so far
* @param options the instream options
*/
private LocalBlockInStream(BlockWorkerClient client, long blockId,
long blockSize, LocalFileBlockReader reader, Closer closer, InStreamOptions options) {
super(blockId, blockSize);
mBlockWorkerClient = client;
mReader = reader;
mCloser = closer;
}

@Override @Override
public void seek(long pos) throws IOException { public void seek(long pos) throws IOException {
super.seek(pos); super.seek(pos);
Expand All @@ -89,15 +125,15 @@ public void close() throws IOException {
mBlockWorkerClient.accessBlock(mBlockId); mBlockWorkerClient.accessBlock(mBlockId);
Metrics.BLOCKS_READ_LOCAL.inc(); Metrics.BLOCKS_READ_LOCAL.inc();
} }
mBlockWorkerClient.unlockBlock(mBlockId); // Note that the block is unlocked by closing mCloser.
} catch (Throwable e) { // must catch Throwable } catch (Throwable e) { // must catch Throwable
throw mCloser.rethrow(e); // IOException will be thrown as-is throw mCloser.rethrow(e); // IOException will be thrown as-is
} finally { } finally {
mClosed = true; mClosed = true;
mCloser.close();
if (mBuffer != null && mBuffer.isDirect()) { if (mBuffer != null && mBuffer.isDirect()) {
BufferUtils.cleanDirectBuffer(mBuffer); BufferUtils.cleanDirectBuffer(mBuffer);
} }
mCloser.close();
} }
} }


Expand Down

0 comments on commit 9c1eb16

Please sign in to comment.