diff --git a/core/client/src/main/java/alluxio/client/file/policy/BlockLocationPolicy.java b/core/client/src/main/java/alluxio/client/file/policy/BlockLocationPolicy.java
index adabf1e69f60..2c4f99751a32 100644
--- a/core/client/src/main/java/alluxio/client/file/policy/BlockLocationPolicy.java
+++ b/core/client/src/main/java/alluxio/client/file/policy/BlockLocationPolicy.java
@@ -18,7 +18,7 @@
/**
*
- * Interface for determine the Alluxio worker location to serve a block write or UFS block read.
+ * Interface for determining the Alluxio worker location to serve a block write or UFS block read.
*
*
*
@@ -71,7 +71,7 @@ public static BlockLocationPolicy create(String policyClassNameWithShard) {
}
/**
- * Gets the worker's host name for serve operations requested for the block.
+ * Gets the worker's host name for serving operations requested for the block.
*
* @param workerInfoList the info of the active workers
* @param blockId the block ID
diff --git a/core/client/src/main/java/alluxio/client/file/policy/DeterministicHashPolicy.java b/core/client/src/main/java/alluxio/client/file/policy/DeterministicHashPolicy.java
index 5b4d17f46470..bd2884647789 100644
--- a/core/client/src/main/java/alluxio/client/file/policy/DeterministicHashPolicy.java
+++ b/core/client/src/main/java/alluxio/client/file/policy/DeterministicHashPolicy.java
@@ -12,7 +12,6 @@
package alluxio.client.file.policy;
import alluxio.client.block.BlockWorkerInfo;
-import alluxio.master.block.BlockId;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Objects;
@@ -30,20 +29,23 @@
/**
* This policy maps blockId to several deterministic Alluxio workers. The number of workers a block
* can be mapped to can be passed through the constructor. The default is 1. It skips the workers
- * that doesn not have enough capacity to hold the block.
+ * that do not have enough capacity to hold the block.
*/
@NotThreadSafe
public final class DeterministicHashPolicy implements BlockLocationPolicy {
+ /** The default number of shards to serve a block. */
+ private static final int DEFAULT_NUM_SHARDS = 1;
+ private final int mShards;
private final Random mRandom = new Random();
private List mWorkerInfoList;
- private int mOffset;
private boolean mInitialized = false;
- private int mShards = 1;
/**
* Constructs a new {@link DeterministicHashPolicy}.
*/
- public DeterministicHashPolicy() {}
+ public DeterministicHashPolicy() {
+ this(DEFAULT_NUM_SHARDS);
+ }
/**
* Constructs a new {@link DeterministicHashPolicy}.
@@ -66,15 +68,13 @@ public int compare(BlockWorkerInfo o1, BlockWorkerInfo o2) {
return o1.getNetAddress().toString().compareToIgnoreCase(o2.getNetAddress().toString());
}
});
- mOffset = (int) (BlockId.getContainerId(blockId) % (long) mWorkerInfoList.size());
mInitialized = true;
}
List workers = new ArrayList<>();
// Try the next one if the worker mapped from the blockId doesn't work until all the workers
// are examined.
- int index =
- (int) ((mOffset + BlockId.getSequenceNumber(blockId)) % (long) mWorkerInfoList.size());
+ int index = (int) (blockId % (long) mWorkerInfoList.size());
for (int i = 0; i < mWorkerInfoList.size(); i++) {
WorkerNetAddress candidate = mWorkerInfoList.get(index).getNetAddress();
BlockWorkerInfo workerInfo = findBlockWorkerInfo(workerInfoList, candidate);
@@ -114,21 +114,19 @@ public boolean equals(Object o) {
}
DeterministicHashPolicy that = (DeterministicHashPolicy) o;
return Objects.equal(mWorkerInfoList, that.mWorkerInfoList)
- && Objects.equal(mOffset, that.mOffset)
&& Objects.equal(mInitialized, that.mInitialized)
&& Objects.equal(mShards, that.mShards);
}
@Override
public int hashCode() {
- return Objects.hashCode(mWorkerInfoList, mOffset, mInitialized, mShards);
+ return Objects.hashCode(mWorkerInfoList, mInitialized, mShards);
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("workerInfoList", mWorkerInfoList)
- .add("offset", mOffset)
.add("initialized", mInitialized)
.add("shards", mShards)
.toString();
diff --git a/core/client/src/test/java/alluxio/client/file/policy/DeterministicHashPolicyTest.java b/core/client/src/test/java/alluxio/client/file/policy/DeterministicHashPolicyTest.java
index 91534dbe538c..f21d011664ca 100644
--- a/core/client/src/test/java/alluxio/client/file/policy/DeterministicHashPolicyTest.java
+++ b/core/client/src/test/java/alluxio/client/file/policy/DeterministicHashPolicyTest.java
@@ -16,6 +16,7 @@
import alluxio.wire.WorkerNetAddress;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
@@ -29,37 +30,61 @@
public final class DeterministicHashPolicyTest {
private static final int PORT = 1;
- /**
- * Tests that the correct workers are chosen when round robin is used.
- */
- @Test
- public void getWorker() {
- List workerInfoList = new ArrayList<>();
- workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1")
+ private List mWorkerInfos;
+
+ @Before
+ public void before() {
+ mWorkerInfos = new ArrayList<>();
+ mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, 0));
- workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker2")
+ mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker2")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 2 * (long) Constants.GB, 0));
- workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker3")
+ mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker3")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 3 * (long) Constants.GB, 0));
- workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker4")
+ mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker4")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 3 * (long) Constants.GB, 0));
+ }
+ @Test
+ public void getWorkerDeterministically() {
DeterministicHashPolicy policy = (DeterministicHashPolicy) BlockLocationPolicy.Factory
.create("alluxio.client.file.policy.DeterministicHashPolicy");
- Assert.assertEquals(
- policy.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost(),
- policy.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost());
+ String host = policy.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost();
+ for (int i = 0; i < 10; i++) {
+ DeterministicHashPolicy p = (DeterministicHashPolicy) BlockLocationPolicy.Factory
+ .create("alluxio.client.file.policy.DeterministicHashPolicy");
+ // For the same block, always return the same worker.
+ Assert.assertEquals(host,
+ p.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost());
+ Assert.assertEquals(host,
+ p.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost());
+ }
+ }
+ @Test
+ public void getWorkerEnoughCapacity() {
+ DeterministicHashPolicy policy = (DeterministicHashPolicy) BlockLocationPolicy.Factory
+ .create("alluxio.client.file.policy.DeterministicHashPolicy");
+ for (long blockId = 0; blockId < 100; blockId++) {
+ // worker1 does not have enough capacity. It should never be picked.
+ Assert.assertNotEquals("worker1",
+ policy.getWorkerForBlock(mWorkerInfos, blockId, 2 * (long) Constants.GB).getHost());
+ }
+ }
+
+ @Test
+ public void getWorkerMultipleShards() {
DeterministicHashPolicy policy2 = (DeterministicHashPolicy) BlockLocationPolicy.Factory
.create("alluxio.client.file.policy.DeterministicHashPolicy@2");
Set addresses1 = new HashSet<>();
Set addresses2 = new HashSet<>();
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 100; i++) {
addresses1
- .add(policy2.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost());
+ .add(policy2.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost());
addresses2
- .add(policy2.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost());
+ .add(policy2.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost());
}
+ // With sufficient traffic, 2 (= #shards) workers should be picked to serve the block.
Assert.assertEquals(2, addresses1.size());
Assert.assertEquals(2, addresses2.size());
Assert.assertEquals(addresses1, addresses2);