Skip to content

Commit

Permalink
Improve readability and documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
cc committed Jan 26, 2016
1 parent 8ca4864 commit f3e76c3
Show file tree
Hide file tree
Showing 15 changed files with 48 additions and 80 deletions.
3 changes: 3 additions & 0 deletions common/src/main/java/tachyon/util/io/BufferUtils.java
Expand Up @@ -371,6 +371,9 @@ public static ByteBuffer sliceByteBuffer(ByteBuffer buffer, int position, int le
* @return the sliced {@link ByteBuffer} * @return the sliced {@link ByteBuffer}
*/ */
public static ByteBuffer sliceByteBuffer(ByteBuffer buffer, int position) { public static ByteBuffer sliceByteBuffer(ByteBuffer buffer, int position) {
// The following is an optimization comparing to directly calling
// sliceByteBuffer(ByteBuffer, int, int) needs to compute the length of the sliced buffer and
// set the limit, but those operations should have been taken care of by the slice() method.
return ((ByteBuffer) buffer.duplicate().position(position)).slice(); return ((ByteBuffer) buffer.duplicate().position(position)).slice();
} }


Expand Down
24 changes: 3 additions & 21 deletions common/src/test/java/tachyon/util/FormatUtilsTest.java
Expand Up @@ -24,8 +24,6 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;


import com.google.common.collect.Lists;

import tachyon.Constants; import tachyon.Constants;


/** /**
Expand Down Expand Up @@ -101,25 +99,9 @@ public TestCase(String expected, ByteBuffer input) {
*/ */
@Test @Test
public void byteArrayToHexStringTest() { public void byteArrayToHexStringTest() {
class TestCase { Assert.assertEquals("", FormatUtils.byteArrayToHexString(new byte[0]));
String mExpected; Assert.assertEquals("0x01", FormatUtils.byteArrayToHexString(new byte[]{1}));
byte[] mInput; Assert.assertEquals("0x01 0xAC", FormatUtils.byteArrayToHexString(new byte[]{1, (byte) 0xAC}));

public TestCase(String expected, byte[] input) {
mExpected = expected;
mInput = input;
}
}

List<TestCase> testCases = Lists.newArrayList();
testCases.add(new TestCase("", new byte[0]));
testCases.add(new TestCase("0x01", new byte[]{1}));
testCases.add(new TestCase("0x01 0x02", new byte[]{1, 2}));
testCases.add(new TestCase("0x01 0x02 0xAC 0xFF", new byte[]{1, 2, (byte) 0xAC, (byte) 0xFF}));

for (TestCase testCase : testCases) {
Assert.assertEquals(testCase.mExpected, FormatUtils.byteArrayToHexString(testCase.mInput));
}
} }


/** /**
Expand Down
16 changes: 9 additions & 7 deletions common/src/test/java/tachyon/util/io/BufferUtilsTest.java
Expand Up @@ -367,7 +367,8 @@ public void cleanDirectBufferTest() {
} }


/** /**
* Tests the {@link BufferUtils#sliceByteBuffer(ByteBuffer, int, int)} method. * Tests the {@link BufferUtils#sliceByteBuffer(ByteBuffer, int, int)} and the
* {@link BufferUtils#sliceByteBuffer(ByteBuffer, int)} methods.
*/ */
@Test @Test
public void sliceByteBufferTest() { public void sliceByteBufferTest() {
Expand All @@ -382,16 +383,17 @@ public void sliceByteBufferTest() {


// Slice a ByteBuffer from the target position to the end // Slice a ByteBuffer from the target position to the end
int slicedBufferLength = size - slicePosition; int slicedBufferLength = size - slicePosition;
slicedBuffer = BufferUtils.sliceByteBuffer(buf, slicePosition, slicedBufferLength); ByteBuffer slicedBuffer1 = BufferUtils.sliceByteBuffer(buf, slicePosition,
ByteBuffer slicedBuffer1 = BufferUtils.sliceByteBuffer(buf, slicePosition); slicedBufferLength);
Assert.assertEquals(0, slicedBuffer.position()); ByteBuffer slicedBuffer2 = BufferUtils.sliceByteBuffer(buf, slicePosition);
Assert.assertEquals(0, slicedBuffer1.position()); Assert.assertEquals(0, slicedBuffer1.position());
Assert.assertEquals(slicedBufferLength, slicedBuffer.limit()); Assert.assertEquals(0, slicedBuffer2.position());
Assert.assertEquals(slicedBufferLength, slicedBuffer1.limit()); Assert.assertEquals(slicedBufferLength, slicedBuffer1.limit());
Assert.assertTrue(BufferUtils.equalIncreasingByteBuffer(slicePosition, slicedBufferLength, Assert.assertEquals(slicedBufferLength, slicedBuffer2.limit());
slicedBuffer));
Assert.assertTrue(BufferUtils.equalIncreasingByteBuffer(slicePosition, slicedBufferLength, Assert.assertTrue(BufferUtils.equalIncreasingByteBuffer(slicePosition, slicedBufferLength,
slicedBuffer1)); slicedBuffer1));
Assert.assertTrue(BufferUtils.equalIncreasingByteBuffer(slicePosition, slicedBufferLength,
slicedBuffer2));
} }
} }
} }
Expand Up @@ -73,7 +73,7 @@ public final class LinearProbingIndex implements Index {
* {@link #mBuf}. * {@link #mBuf}.
*/ */
private int mKeyCount; private int mKeyCount;
/** A duplicate of mBuf corresponding to the hash table part */ /** A slice of mBuf corresponding to the hash table part */
private ByteBuffer mHashTableBuf; private ByteBuffer mHashTableBuf;
private int mNumBuckets; private int mNumBuckets;


Expand Down Expand Up @@ -101,7 +101,6 @@ public static LinearProbingIndex loadFromByteArray(ByteBuffer buffer) {


private LinearProbingIndex(ByteBuffer buf, int numBuckets, int keyCount) { private LinearProbingIndex(ByteBuffer buf, int numBuckets, int keyCount) {
mBuf = buf; mBuf = buf;
mHashTableBuf = ((ByteBuffer) mBuf.duplicate().position(Constants.BYTES_IN_INTEGER)).slice();
mHashTableBuf = BufferUtils.sliceByteBuffer(mBuf, Constants.BYTES_IN_INTEGER); mHashTableBuf = BufferUtils.sliceByteBuffer(mBuf, Constants.BYTES_IN_INTEGER);
mNumBuckets = numBuckets; mNumBuckets = numBuckets;
mKeyCount = keyCount; mKeyCount = keyCount;
Expand Down
Expand Up @@ -67,8 +67,8 @@ public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
List<InputSplit> splits = Lists.newArrayList(); List<InputSplit> splits = Lists.newArrayList();
try { try {
for (Path path : paths) { for (Path path : paths) {
List<PartitionInfo> partitionInfos = mKeyValueMasterClient.getPartitionInfo(new TachyonURI( List<PartitionInfo> partitionInfos =
path.toString())); mKeyValueMasterClient.getPartitionInfo(new TachyonURI(path.toString()));
for (PartitionInfo partitionInfo : partitionInfos) { for (PartitionInfo partitionInfo : partitionInfos) {
splits.add(new KeyValueInputSplit(partitionInfo)); splits.add(new KeyValueInputSplit(partitionInfo));
} }
Expand Down
Expand Up @@ -44,7 +44,8 @@ final class KeyValueInputSplit implements InputSplit {
private long mBlockId; private long mBlockId;


/** /**
* Default constructor, to be used in de-serialization of {@link KeyValueInputSplit}. * Default constructor, to be used together with {@link #readFields(DataInput)} when
* de-serializing {@link KeyValueInputSplit}.
*/ */
public KeyValueInputSplit() { public KeyValueInputSplit() {
mBlockStore = TachyonBlockStore.get(); mBlockStore = TachyonBlockStore.get();
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.TaskAttemptContext;
Expand All @@ -34,7 +33,6 @@
import tachyon.TachyonURI; import tachyon.TachyonURI;
import tachyon.annotation.PublicApi; import tachyon.annotation.PublicApi;
import tachyon.client.keyvalue.KeyValueStores; import tachyon.client.keyvalue.KeyValueStores;
import tachyon.exception.FileDoesNotExistException;
import tachyon.exception.TachyonException; import tachyon.exception.TachyonException;


/** /**
Expand All @@ -51,14 +49,14 @@ public final class KeyValueOutputCommitter extends FileOutputCommitter {


private List<TachyonURI> getTaskTemporaryStores(JobConf conf) throws IOException { private List<TachyonURI> getTaskTemporaryStores(JobConf conf) throws IOException {
TachyonURI taskOutputURI = KeyValueOutputFormat.getTaskOutputURI(conf); TachyonURI taskOutputURI = KeyValueOutputFormat.getTaskOutputURI(conf);
Path outputPath = FileOutputFormat.getOutputPath(conf); Path taskOutputPath = new Path(taskOutputURI.toString());
FileSystem fs = outputPath.getFileSystem(conf); FileSystem fs = taskOutputPath.getFileSystem(conf);
FileStatus[] subDirs = fs.listStatus(new Path(taskOutputURI.toString())); FileStatus[] subDirs = fs.listStatus(taskOutputPath);
List<TachyonURI> ret = Lists.newArrayListWithExpectedSize(subDirs.length); List<TachyonURI> temporaryStores = Lists.newArrayListWithExpectedSize(subDirs.length);
for (FileStatus subDir : subDirs) { for (FileStatus subDir : subDirs) {
ret.add(taskOutputURI.join(subDir.getPath().getName())); temporaryStores.add(taskOutputURI.join(subDir.getPath().getName()));
} }
return ret; return temporaryStores;
} }


/** /**
Expand Down Expand Up @@ -93,9 +91,6 @@ public void abortTask(TaskAttemptContext context) throws IOException {
for (TachyonURI tempStoreUri : getTaskTemporaryStores(context.getJobConf())) { for (TachyonURI tempStoreUri : getTaskTemporaryStores(context.getJobConf())) {
try { try {
KEY_VALUE_STORES.delete(tempStoreUri); KEY_VALUE_STORES.delete(tempStoreUri);
} catch (FileDoesNotExistException e) {
// The goal of deleting the store is to cleanup directories before aborting the task, since
// the store directory does not exist, it meets the goal, nothing needs to be done.
} catch (TachyonException e) { } catch (TachyonException e) {
throw new IOException(e); throw new IOException(e);
} }
Expand Down
Expand Up @@ -73,8 +73,8 @@ public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(FileSystem ign
/** /**
* {@inheritDoc} * {@inheritDoc}
* <p> * <p>
* {@link KeyValueOutputCommitter} is forced to be used. An empty key-value store is created at * {@link KeyValueOutputCommitter} is forced to be used. If the output path exists, an exception
* the output path if the path does not exist yet. * is thrown, otherwise, an empty key-value store is created at the output path.
* <p> * <p>
* NOTE: This method is called immediately when job is submitted, so that modifications to the * NOTE: This method is called immediately when job is submitted, so that modifications to the
* {@link JobConf} are reflected in the whole job. * {@link JobConf} are reflected in the whole job.
Expand Down
Expand Up @@ -16,6 +16,7 @@
package tachyon.client.keyvalue.hadoop; package tachyon.client.keyvalue.hadoop;


import java.io.IOException; import java.io.IOException;
import java.util.Arrays;


import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.RecordWriter;
Expand All @@ -29,12 +30,10 @@
import tachyon.exception.TachyonException; import tachyon.exception.TachyonException;


/** /**
* It writes key-value pairs into a temporary key-value store. * A {@link RecordWriter} to write key-value pairs into a temporary key-value store.
*/ */
@ThreadSafe @ThreadSafe
class KeyValueRecordWriter implements RecordWriter<BytesWritable, BytesWritable> { class KeyValueRecordWriter implements RecordWriter<BytesWritable, BytesWritable> {
private static final KeyValueStores KEY_VALUE_STORES = KeyValueStores.Factory.create();

private final KeyValueStoreWriter mWriter; private final KeyValueStoreWriter mWriter;
private final Progressable mProgress; private final Progressable mProgress;


Expand All @@ -47,31 +46,20 @@ class KeyValueRecordWriter implements RecordWriter<BytesWritable, BytesWritable>
*/ */
public KeyValueRecordWriter(TachyonURI storeUri, Progressable progress) throws IOException { public KeyValueRecordWriter(TachyonURI storeUri, Progressable progress) throws IOException {
try { try {
mWriter = KEY_VALUE_STORES.create(storeUri); mWriter = KeyValueStores.Factory.create().create(storeUri);
} catch (TachyonException e) { } catch (TachyonException e) {
throw new IOException(e); throw new IOException(e);
} }
mProgress = progress; mProgress = progress;
} }


/**
* Copies a byte array of the specified length from the specified array, starting at offset 0.
*
* @param src the source array
* @param length the length of bytes to be copied
* @return the copied array
*/
private byte[] copyBytes(byte[] src, int length) {
byte[] result = new byte[length];
System.arraycopy(src, 0, result, 0, length);
return result;
}

@Override @Override
public synchronized void write(BytesWritable key, BytesWritable value) throws IOException { public synchronized void write(BytesWritable key, BytesWritable value) throws IOException {
try { try {
mWriter.put(copyBytes(key.getBytes(), key.getLength()), copyBytes(value.getBytes(), // NOTE: BytesWritable.getBytes() returns the internal byte array, whose length might not be
value.getLength())); // the same as BytesWritable.getLength().
mWriter.put(Arrays.copyOf(key.getBytes(), key.getLength()),
Arrays.copyOf(value.getBytes(), value.getLength()));
// Sends a progress to the job manager to inform it that the task is still running. // Sends a progress to the job manager to inform it that the task is still running.
mProgress.progress(); mProgress.progress();
} catch (TachyonException e) { } catch (TachyonException e) {
Expand Down
2 changes: 0 additions & 2 deletions keyvalue/examples/pom.xml
Expand Up @@ -23,13 +23,11 @@
<artifactId>tachyon-keyvalue-client</artifactId> <artifactId>tachyon-keyvalue-client</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<!-- for tachyon.examples.Utils -->
<dependency> <dependency>
<groupId>org.tachyonproject</groupId> <groupId>org.tachyonproject</groupId>
<artifactId>tachyon-examples</artifactId> <artifactId>tachyon-examples</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<!-- for FormatUtils#byteArrayToHexString -->
<dependency> <dependency>
<groupId>org.tachyonproject</groupId> <groupId>org.tachyonproject</groupId>
<artifactId>tachyon-common</artifactId> <artifactId>tachyon-common</artifactId>
Expand Down
Expand Up @@ -41,12 +41,12 @@
* This example illustrates how to create a key-value store, put key-value pairs into the store, and * This example illustrates how to create a key-value store, put key-value pairs into the store, and
* read the store afterwards. * read the store afterwards.
*/ */
public class KeyValueStoreOperations implements Callable<Boolean> { public final class KeyValueStoreOperations implements Callable<Boolean> {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);


private final int mPartitionLength = Constants.MB; private final int mPartitionLength = Constants.MB;
private final int mValueLength = mPartitionLength / 2; private final int mValueLength = mPartitionLength / 2;
private final int mKeyValuePairsNumber = 10; private final int mNumKeyValuePairs = 10;


private TachyonURI mStoreUri; private TachyonURI mStoreUri;
private Map<ByteBuffer, ByteBuffer> mKeyValuePairs = Maps.newHashMap(); private Map<ByteBuffer, ByteBuffer> mKeyValuePairs = Maps.newHashMap();
Expand Down Expand Up @@ -82,7 +82,7 @@ public Boolean call() throws Exception {
private void putKeyValuePairs(KeyValueStoreWriter writer) throws Exception { private void putKeyValuePairs(KeyValueStoreWriter writer) throws Exception {
LOG.info("Putting key-value pairs..."); LOG.info("Putting key-value pairs...");
// API: KeyValueStoreWriter#put // API: KeyValueStoreWriter#put
for (int i = 0; i < mKeyValuePairsNumber; i ++) { for (int i = 0; i < mNumKeyValuePairs; i ++) {
// Keys are 0, 1, 2, etc. // Keys are 0, 1, 2, etc.
byte[] key = ByteBuffer.allocate(4).putInt(i).array(); byte[] key = ByteBuffer.allocate(4).putInt(i).array();
// Values are byte arrays of length {@link #mValueLength}. // Values are byte arrays of length {@link #mValueLength}.
Expand All @@ -95,7 +95,7 @@ private void putKeyValuePairs(KeyValueStoreWriter writer) throws Exception {
private boolean getKeyValuePairs(KeyValueStoreReader reader) throws Exception { private boolean getKeyValuePairs(KeyValueStoreReader reader) throws Exception {
LOG.info("Getting key-value pairs..."); LOG.info("Getting key-value pairs...");
// API: KeyValueStoreReader#size // API: KeyValueStoreReader#size
if (reader.size() != mKeyValuePairsNumber) { if (reader.size() != mNumKeyValuePairs) {
LOG.error("The key-value store has the wrong numbers of key-value pairs"); LOG.error("The key-value store has the wrong numbers of key-value pairs");
return false; return false;
} }
Expand Down Expand Up @@ -133,7 +133,7 @@ private boolean getKeyValuePairs(KeyValueStoreReader reader) throws Exception {
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (args.length != 1) { if (args.length != 1) {
System.out.println("java -cp " + Version.TACHYON_JAR + " " System.out.println("Usage: java -cp " + Version.TACHYON_JAR + " "
+ KeyValueStoreOperations.class.getName() + " <key-value store URI>"); + KeyValueStoreOperations.class.getName() + " <key-value store URI>");
System.exit(-1); System.exit(-1);
} }
Expand Down
Expand Up @@ -24,7 +24,7 @@
/** /**
* A quick start tutorial for creating a key-value store, putting and getting some key-value pairs. * A quick start tutorial for creating a key-value store, putting and getting some key-value pairs.
*/ */
public class KeyValueStoreQuickStart { public final class KeyValueStoreQuickStart {
/** /**
* The main program. * The main program.
* *
Expand All @@ -33,7 +33,7 @@ public class KeyValueStoreQuickStart {
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (args.length != 1) { if (args.length != 1) {
System.out.println("java -cp " + Version.TACHYON_JAR + " " System.out.println("Usage: java -cp " + Version.TACHYON_JAR + " "
+ KeyValueStoreQuickStart.class.getName() + " <key-value store URI>"); + KeyValueStoreQuickStart.class.getName() + " <key-value store URI>");
System.exit(-1); System.exit(-1);
} }
Expand Down
Expand Up @@ -35,7 +35,7 @@
/** /**
* Tests whether two key-value stores contain the same set of key-value pairs. * Tests whether two key-value stores contain the same set of key-value pairs.
*/ */
public class SameKeyValueStoresTest implements Callable<Boolean> { public final class SameKeyValueStoresTest implements Callable<Boolean> {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);


private final TachyonURI mStoreUri1; private final TachyonURI mStoreUri1;
Expand Down Expand Up @@ -102,7 +102,7 @@ private boolean areTheSameStores(KeyValueStoreReader reader1, KeyValueStoreReade
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (args.length != 2) { if (args.length != 2) {
System.out.println("java -cp " + Version.TACHYON_JAR + " " System.out.println("Usage: java -cp " + Version.TACHYON_JAR + " "
+ SameKeyValueStoresTest.class.getName() + " <key-value store URI 1>" + SameKeyValueStoresTest.class.getName() + " <key-value store URI 1>"
+ " <key-value store URI 2>"); + " <key-value store URI 2>");
System.exit(-1); System.exit(-1);
Expand Down
Expand Up @@ -27,7 +27,7 @@
/** /**
* Prints out (key, value) pairs, or only keys, or only values in a key-value store. * Prints out (key, value) pairs, or only keys, or only values in a key-value store.
*/ */
public class ShowKeyValueStore { public final class ShowKeyValueStore {
private static void show(KeyValuePair pair, String scope) { private static void show(KeyValuePair pair, String scope) {
String key = FormatUtils.byteArrayToHexString(BufferUtils.newByteArrayFromByteBuffer( String key = FormatUtils.byteArrayToHexString(BufferUtils.newByteArrayFromByteBuffer(
pair.getKey())); pair.getKey()));
Expand All @@ -53,7 +53,7 @@ private static void show(KeyValuePair pair, String scope) {
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (args.length != 2) { if (args.length != 2) {
System.out.println("java -cp " + Version.TACHYON_JAR + " " System.out.println("Usage: java -cp " + Version.TACHYON_JAR + " "
+ ShowKeyValueStore.class.getName() + " <key-value store URI>" + ShowKeyValueStore.class.getName() + " <key-value store URI>"
+ " <scope, be one of key/value/all>"); + " <scope, be one of key/value/all>");
System.exit(-1); System.exit(-1);
Expand Down
Expand Up @@ -37,7 +37,7 @@
/** /**
* This boring MapReduce job clones a key-value store to a different URI. * This boring MapReduce job clones a key-value store to a different URI.
*/ */
public class CloneKeyValueStore { public final class CloneKeyValueStoreMapReduce {
/** /**
* The mapper emits all key-value pairs it receives to reducers. * The mapper emits all key-value pairs it receives to reducers.
*/ */
Expand Down Expand Up @@ -70,7 +70,7 @@ public void reduce(BytesWritable key, Iterator<BytesWritable> values,
* @throws Exception if any exception happens * @throws Exception if any exception happens
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(CloneKeyValueStore.class); JobConf conf = new JobConf(CloneKeyValueStoreMapReduce.class);
conf.setJobName("clone key-value store"); conf.setJobName("clone key-value store");


conf.setOutputKeyClass(BytesWritable.class); conf.setOutputKeyClass(BytesWritable.class);
Expand Down

0 comments on commit f3e76c3

Please sign in to comment.