Skip to content

Commit

Permalink
Address Andrew's comment
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Mar 10, 2017
1 parent c4889ac commit 51da1ee
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 29 deletions.
Expand Up @@ -18,7 +18,7 @@


/** /**
* <p> * <p>
* Interface for determine the Alluxio worker location to serve a block write or UFS block read. * Interface for determining the Alluxio worker location to serve a block write or UFS block read.
* </p> * </p>
* *
* <p> * <p>
Expand Down Expand Up @@ -71,7 +71,7 @@ public static BlockLocationPolicy create(String policyClassNameWithShard) {
} }


/** /**
* Gets the worker's host name for serve operations requested for the block. * Gets the worker's host name for serving operations requested for the block.
* *
* @param workerInfoList the info of the active workers * @param workerInfoList the info of the active workers
* @param blockId the block ID * @param blockId the block ID
Expand Down
Expand Up @@ -12,7 +12,6 @@
package alluxio.client.file.policy; package alluxio.client.file.policy;


import alluxio.client.block.BlockWorkerInfo; import alluxio.client.block.BlockWorkerInfo;
import alluxio.master.block.BlockId;
import alluxio.wire.WorkerNetAddress; import alluxio.wire.WorkerNetAddress;


import com.google.common.base.Objects; import com.google.common.base.Objects;
Expand All @@ -30,20 +29,23 @@
/** /**
* This policy maps blockId to several deterministic Alluxio workers. The number of workers a block * This policy maps blockId to several deterministic Alluxio workers. The number of workers a block
* can be mapped to can be passed through the constructor. The default is 1. It skips the workers * can be mapped to can be passed through the constructor. The default is 1. It skips the workers
* that doesn not have enough capacity to hold the block. * that do not have enough capacity to hold the block.
*/ */
@NotThreadSafe @NotThreadSafe
public final class DeterministicHashPolicy implements BlockLocationPolicy { public final class DeterministicHashPolicy implements BlockLocationPolicy {
/** The default number of shards to serve a block. */
private static final int DEFAULT_NUM_SHARDS = 1;
private final int mShards;
private final Random mRandom = new Random(); private final Random mRandom = new Random();
private List<BlockWorkerInfo> mWorkerInfoList; private List<BlockWorkerInfo> mWorkerInfoList;
private int mOffset;
private boolean mInitialized = false; private boolean mInitialized = false;
private int mShards = 1;


/** /**
* Constructs a new {@link DeterministicHashPolicy}. * Constructs a new {@link DeterministicHashPolicy}.
*/ */
public DeterministicHashPolicy() {} public DeterministicHashPolicy() {
this(DEFAULT_NUM_SHARDS);
}


/** /**
* Constructs a new {@link DeterministicHashPolicy}. * Constructs a new {@link DeterministicHashPolicy}.
Expand All @@ -66,15 +68,13 @@ public int compare(BlockWorkerInfo o1, BlockWorkerInfo o2) {
return o1.getNetAddress().toString().compareToIgnoreCase(o2.getNetAddress().toString()); return o1.getNetAddress().toString().compareToIgnoreCase(o2.getNetAddress().toString());
} }
}); });
mOffset = (int) (BlockId.getContainerId(blockId) % (long) mWorkerInfoList.size());
mInitialized = true; mInitialized = true;
} }


List<WorkerNetAddress> workers = new ArrayList<>(); List<WorkerNetAddress> workers = new ArrayList<>();
// Try the next one if the worker mapped from the blockId doesn't work until all the workers // Try the next one if the worker mapped from the blockId doesn't work until all the workers
// are examined. // are examined.
int index = int index = (int) (blockId % (long) mWorkerInfoList.size());
(int) ((mOffset + BlockId.getSequenceNumber(blockId)) % (long) mWorkerInfoList.size());
for (int i = 0; i < mWorkerInfoList.size(); i++) { for (int i = 0; i < mWorkerInfoList.size(); i++) {
WorkerNetAddress candidate = mWorkerInfoList.get(index).getNetAddress(); WorkerNetAddress candidate = mWorkerInfoList.get(index).getNetAddress();
BlockWorkerInfo workerInfo = findBlockWorkerInfo(workerInfoList, candidate); BlockWorkerInfo workerInfo = findBlockWorkerInfo(workerInfoList, candidate);
Expand Down Expand Up @@ -114,21 +114,19 @@ public boolean equals(Object o) {
} }
DeterministicHashPolicy that = (DeterministicHashPolicy) o; DeterministicHashPolicy that = (DeterministicHashPolicy) o;
return Objects.equal(mWorkerInfoList, that.mWorkerInfoList) return Objects.equal(mWorkerInfoList, that.mWorkerInfoList)
&& Objects.equal(mOffset, that.mOffset)
&& Objects.equal(mInitialized, that.mInitialized) && Objects.equal(mInitialized, that.mInitialized)
&& Objects.equal(mShards, that.mShards); && Objects.equal(mShards, that.mShards);
} }


@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hashCode(mWorkerInfoList, mOffset, mInitialized, mShards); return Objects.hashCode(mWorkerInfoList, mInitialized, mShards);
} }


@Override @Override
public String toString() { public String toString() {
return Objects.toStringHelper(this) return Objects.toStringHelper(this)
.add("workerInfoList", mWorkerInfoList) .add("workerInfoList", mWorkerInfoList)
.add("offset", mOffset)
.add("initialized", mInitialized) .add("initialized", mInitialized)
.add("shards", mShards) .add("shards", mShards)
.toString(); .toString();
Expand Down
Expand Up @@ -16,6 +16,7 @@
import alluxio.wire.WorkerNetAddress; import alluxio.wire.WorkerNetAddress;


import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;


import java.util.ArrayList; import java.util.ArrayList;
Expand All @@ -29,37 +30,61 @@
public final class DeterministicHashPolicyTest { public final class DeterministicHashPolicyTest {
private static final int PORT = 1; private static final int PORT = 1;


/** private List<BlockWorkerInfo> mWorkerInfos;
* Tests that the correct workers are chosen when round robin is used.
*/ @Before
@Test public void before() {
public void getWorker() { mWorkerInfos = new ArrayList<>();
List<BlockWorkerInfo> workerInfoList = new ArrayList<>(); mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1")
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, 0)); .setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, 0));
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker2") mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker2")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 2 * (long) Constants.GB, 0)); .setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 2 * (long) Constants.GB, 0));
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker3") mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker3")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 3 * (long) Constants.GB, 0)); .setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 3 * (long) Constants.GB, 0));
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker4") mWorkerInfos.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker4")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 3 * (long) Constants.GB, 0)); .setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), 3 * (long) Constants.GB, 0));
}


@Test
public void getWorkerDeterministically() {
DeterministicHashPolicy policy = (DeterministicHashPolicy) BlockLocationPolicy.Factory DeterministicHashPolicy policy = (DeterministicHashPolicy) BlockLocationPolicy.Factory
.create("alluxio.client.file.policy.DeterministicHashPolicy"); .create("alluxio.client.file.policy.DeterministicHashPolicy");
Assert.assertEquals( String host = policy.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost();
policy.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost(), for (int i = 0; i < 10; i++) {
policy.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost()); DeterministicHashPolicy p = (DeterministicHashPolicy) BlockLocationPolicy.Factory
.create("alluxio.client.file.policy.DeterministicHashPolicy");
// For the same block, always return the same worker.
Assert.assertEquals(host,
p.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost());
Assert.assertEquals(host,
p.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost());
}
}


@Test
public void getWorkerEnoughCapacity() {
DeterministicHashPolicy policy = (DeterministicHashPolicy) BlockLocationPolicy.Factory
.create("alluxio.client.file.policy.DeterministicHashPolicy");
for (long blockId = 0; blockId < 100; blockId++) {
// worker1 does not have enough capacity. It should never be picked.
Assert.assertNotEquals("worker1",
policy.getWorkerForBlock(mWorkerInfos, blockId, 2 * (long) Constants.GB).getHost());
}
}

@Test
public void getWorkerMultipleShards() {
DeterministicHashPolicy policy2 = (DeterministicHashPolicy) BlockLocationPolicy.Factory DeterministicHashPolicy policy2 = (DeterministicHashPolicy) BlockLocationPolicy.Factory
.create("alluxio.client.file.policy.DeterministicHashPolicy@2"); .create("alluxio.client.file.policy.DeterministicHashPolicy@2");
Set<String> addresses1 = new HashSet<>(); Set<String> addresses1 = new HashSet<>();
Set<String> addresses2 = new HashSet<>(); Set<String> addresses2 = new HashSet<>();
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 100; i++) {
addresses1 addresses1
.add(policy2.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost()); .add(policy2.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost());
addresses2 addresses2
.add(policy2.getWorkerForBlock(workerInfoList, 1, 2 * (long) Constants.GB).getHost()); .add(policy2.getWorkerForBlock(mWorkerInfos, 1, 2 * (long) Constants.GB).getHost());
} }
// With sufficient traffic, 2 (= #shards) workers should be picked to serve the block.
Assert.assertEquals(2, addresses1.size()); Assert.assertEquals(2, addresses1.size());
Assert.assertEquals(2, addresses2.size()); Assert.assertEquals(2, addresses2.size());
Assert.assertEquals(addresses1, addresses2); Assert.assertEquals(addresses1, addresses2);
Expand Down

0 comments on commit 51da1ee

Please sign in to comment.