From 51da1ee71c644a881d128b56fd3241094c47212e Mon Sep 17 00:00:00 2001 From: Pei Sun Date: Mon, 27 Feb 2017 21:38:32 -0800 Subject: [PATCH] Address Andrew's comment --- .../file/policy/BlockLocationPolicy.java | 4 +- .../file/policy/DeterministicHashPolicy.java | 20 +++---- .../policy/DeterministicHashPolicyTest.java | 57 +++++++++++++------ 3 files changed, 52 insertions(+), 29 deletions(-) diff --git a/core/client/src/main/java/alluxio/client/file/policy/BlockLocationPolicy.java b/core/client/src/main/java/alluxio/client/file/policy/BlockLocationPolicy.java index adabf1e69f60..2c4f99751a32 100644 --- a/core/client/src/main/java/alluxio/client/file/policy/BlockLocationPolicy.java +++ b/core/client/src/main/java/alluxio/client/file/policy/BlockLocationPolicy.java @@ -18,7 +18,7 @@ /** *

- * Interface for determine the Alluxio worker location to serve a block write or UFS block read. + * Interface for determining the Alluxio worker location to serve a block write or UFS block read. *

* *

@@ -71,7 +71,7 @@ public static BlockLocationPolicy create(String policyClassNameWithShard) { } /** - * Gets the worker's host name for serve operations requested for the block. + * Gets the worker's host name for serving operations requested for the block. * * @param workerInfoList the info of the active workers * @param blockId the block ID diff --git a/core/client/src/main/java/alluxio/client/file/policy/DeterministicHashPolicy.java b/core/client/src/main/java/alluxio/client/file/policy/DeterministicHashPolicy.java index 5b4d17f46470..bd2884647789 100644 --- a/core/client/src/main/java/alluxio/client/file/policy/DeterministicHashPolicy.java +++ b/core/client/src/main/java/alluxio/client/file/policy/DeterministicHashPolicy.java @@ -12,7 +12,6 @@ package alluxio.client.file.policy; import alluxio.client.block.BlockWorkerInfo; -import alluxio.master.block.BlockId; import alluxio.wire.WorkerNetAddress; import com.google.common.base.Objects; @@ -30,20 +29,23 @@ /** * This policy maps blockId to several deterministic Alluxio workers. The number of workers a block * can be mapped to can be passed through the constructor. The default is 1. It skips the workers - * that doesn not have enough capacity to hold the block. + * that do not have enough capacity to hold the block. */ @NotThreadSafe public final class DeterministicHashPolicy implements BlockLocationPolicy { + /** The default number of shards to serve a block. */ + private static final int DEFAULT_NUM_SHARDS = 1; + private final int mShards; private final Random mRandom = new Random(); private List mWorkerInfoList; - private int mOffset; private boolean mInitialized = false; - private int mShards = 1; /** * Constructs a new {@link DeterministicHashPolicy}. */ - public DeterministicHashPolicy() {} + public DeterministicHashPolicy() { + this(DEFAULT_NUM_SHARDS); + } /** * Constructs a new {@link DeterministicHashPolicy}. @@ -66,15 +68,13 @@ public int compare(BlockWorkerInfo o1, BlockWorkerInfo o2) { return o1.getNetAddress().toString().compareToIgnoreCase(o2.getNetAddress().toString()); } }); - mOffset = (int) (BlockId.getContainerId(blockId) % (long) mWorkerInfoList.size()); mInitialized = true; } List workers = new ArrayList<>(); // Try the next one if the worker mapped from the blockId doesn't work until all the workers // are examined. - int index = - (int) ((mOffset + BlockId.getSequenceNumber(blockId)) % (long) mWorkerInfoList.size()); + int index = (int) (blockId % (long) mWorkerInfoList.size()); for (int i = 0; i < mWorkerInfoList.size(); i++) { WorkerNetAddress candidate = mWorkerInfoList.get(index).getNetAddress(); BlockWorkerInfo workerInfo = findBlockWorkerInfo(workerInfoList, candidate); @@ -114,21 +114,19 @@ public boolean equals(Object o) { } DeterministicHashPolicy that = (DeterministicHashPolicy) o; return Objects.equal(mWorkerInfoList, that.mWorkerInfoList) - && Objects.equal(mOffset, that.mOffset) && Objects.equal(mInitialized, that.mInitialized) && Objects.equal(mShards, that.mShards); } @Override public int hashCode() { - return Objects.hashCode(mWorkerInfoList, mOffset, mInitialized, mShards); + return Objects.hashCode(mWorkerInfoList, mInitialized, mShards); } @Override public String toString() { return Objects.toStringHelper(this) .add("workerInfoList", mWorkerInfoList) - .add("offset", mOffset) .add("initialized", mInitialized) .add("shards", mShards) .toString(); diff --git a/core/client/src/test/java/alluxio/client/file/policy/DeterministicHashPolicyTest.java b/core/client/src/test/java/alluxio/client/file/policy/DeterministicHashPolicyTest.java index 91534dbe538c..f21d011664ca 100644 --- a/core/client/src/test/java/alluxio/client/file/policy/DeterministicHashPolicyTest.java +++ b/core/client/src/test/java/alluxio/client/file/policy/DeterministicHashPolicyTest.java @@ -16,6 +16,7 @@ import alluxio.wire.WorkerNetAddress; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.util.ArrayList; @@ -29,37 +30,61 @@ public final class DeterministicHashPolicyTest { private static final int PORT = 1; - /** - * Tests that the correct workers are chosen when round robin is used. - */ - @Test - public void getWorker() { - List workerInfoList = new ArrayList<>(); - workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1") + private List mWorkerInfos; + + @Before + public void before() { + mWorkerInfos = new ArrayList<>(); + mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1") .setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, 0)); - workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker2") + mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker2") .setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 2 * (long) Constants.GB, 0)); - workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker3") + mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker3") .setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 3 * (long) Constants.GB, 0)); - workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker4") + mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker4") .setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 3 * (long) Constants.GB, 0)); + } + @Test + public void getWorkerDeterministically() { DeterministicHashPolicy policy = (DeterministicHashPolicy) BlockLocationPolicy.Factory .create("alluxio.client.file.policy.DeterministicHashPolicy"); - Assert.assertEquals( - policy.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost(), - policy.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost()); + String host = policy.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost(); + for (int i = 0; i < 10; i++) { + DeterministicHashPolicy p = (DeterministicHashPolicy) BlockLocationPolicy.Factory + .create("alluxio.client.file.policy.DeterministicHashPolicy"); + // For the same block, always return the same worker. + Assert.assertEquals(host, + p.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost()); + Assert.assertEquals(host, + p.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost()); + } + } + @Test + public void getWorkerEnoughCapacity() { + DeterministicHashPolicy policy = (DeterministicHashPolicy) BlockLocationPolicy.Factory + .create("alluxio.client.file.policy.DeterministicHashPolicy"); + for (long blockId = 0; blockId < 100; blockId++) { + // worker1 does not have enough capacity. It should never be picked. + Assert.assertNotEquals("worker1", + policy.getWorkerForBlock(mWorkerInfos, blockId, 2 * (long) Constants.GB).getHost()); + } + } + + @Test + public void getWorkerMultipleShards() { DeterministicHashPolicy policy2 = (DeterministicHashPolicy) BlockLocationPolicy.Factory .create("alluxio.client.file.policy.DeterministicHashPolicy@2"); Set addresses1 = new HashSet<>(); Set addresses2 = new HashSet<>(); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 100; i++) { addresses1 - .add(policy2.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost()); + .add(policy2.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost()); addresses2 - .add(policy2.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost()); + .add(policy2.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost()); } + // With sufficient traffic, 2 (= #shards) workers should be picked to serve the block. Assert.assertEquals(2, addresses1.size()); Assert.assertEquals(2, addresses2.size()); Assert.assertEquals(addresses1, addresses2);