Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dbw9580 committed Jul 12, 2023
1 parent 327c138 commit b12269b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,33 @@ static class ConsistentHashProvider {
private static final HashFunction HASH_FUNCTION = murmur3_32_fixed();
private final int mMaxAttempts;
private final long mWorkerInfoUpdateIntervalNs;

/**
* Timestamp of the last update to {@link #mActiveNodesByConsistentHashing}.
* Must use System.nanoTime to ensure monotonic increment. Otherwise, updates to the worker
* list may be missed as the expiry based on TTL cannot be reliably determined.
*/
private final AtomicLong mLastUpdatedTimestamp = new AtomicLong(System.nanoTime());
/**
* Counter for how many times the map has been updated.
*/
private final AtomicLong mUpdateCount = new AtomicLong(0);
/**
* The worker list which the {@link #mActiveNodesByConsistentHashing} was built from.
* Must kept in sync with {@link #mActiveNodesByConsistentHashing}.
* Used to compare with incoming worker list to skip the heavy build process if the worker
* list has not changed.
*/
private volatile List<BlockWorkerInfo> mLastWorkerInfos = ImmutableList.of();
// Lazily initialized, can only be null before the first call to refresh
/**
* A map of virtual node indices to the actual workers.
* This is lazily initialized, can only be null before the first call to refresh.
*/
@Nullable
private volatile NavigableMap<Integer, BlockWorkerInfo> mActiveNodesByConsistentHashing;
// Must use System.nanoTime to ensure monotonic increment
private final AtomicLong mUpdateCount = new AtomicLong(0);
/**
* Lock to protect the lazy initialization of {@link #mActiveNodesByConsistentHashing}.
*/
private final Object mInitLock = new Object();

public ConsistentHashProvider(int maxAttempts, long workerListTtlMs) {
Expand All @@ -90,6 +110,11 @@ public ConsistentHashProvider(int maxAttempts, long workerListTtlMs) {
/**
* Initializes or refreshes the worker list using the given list of workers and number of
* virtual nodes.
* <br>
* Thread safety:
* If called concurrently by two or more threads, only one of the callers will actually
* update the state of the hash provider using the worker list provided by that thread, and all
* others will not change the internal state of the hash provider.
*/
public void refresh(List<BlockWorkerInfo> workerInfos, int numVirtualNodes) {
Preconditions.checkArgument(!workerInfos.isEmpty(),
Expand All @@ -116,9 +141,11 @@ public void refresh(List<BlockWorkerInfo> workerInfos, int numVirtualNodes) {
}
}

// Lazy initialization of the hash provider:
// When the active nodes map does not exist, the hash provider is not initialized yet.
// let one caller initialize the map while blocking all others.
/**
* Lazily initializes the hash ring.
* Only one caller gets to initialize the map while all others are blocked.
* After the initialization, the map must not be null.
*/
private void maybeInitialize(List<BlockWorkerInfo> workerInfos, int numVirtualNodes) {
if (mActiveNodesByConsistentHashing == null) {
synchronized (mInitLock) {
Expand Down Expand Up @@ -161,10 +188,8 @@ public BlockWorkerInfo get(String key, int index) {

@VisibleForTesting
static BlockWorkerInfo get(NavigableMap<Integer, BlockWorkerInfo> map, String key, int index) {
Preconditions.checkNotNull(map, "Hash provider is not properly initialized");
int hashKey = HASH_FUNCTION.hashString(format("%s%d", key, index), UTF_8).asInt();
if (map == null) {
throw new IllegalStateException("Hash provider is not properly initialized");
}
Map.Entry<Integer, BlockWorkerInfo> entry = map.ceilingEntry(hashKey);
if (entry != null) {
return entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -24,7 +25,6 @@

import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
Expand All @@ -47,13 +47,13 @@ public class WorkerLocationPolicyTest {
@Test
public void uninitializedThrowsException() {
ConsistentHashProvider provider = new ConsistentHashProvider(1, WORKER_LIST_TTL_MS);
Assert.assertThrows(IllegalStateException.class, () -> provider.get(OBJECT_KEY, 0));
assertThrows(IllegalStateException.class, () -> provider.get(OBJECT_KEY, 0));
}

@Test
public void concurrentInitialization() {
ConsistentHashProvider provider = new ConsistentHashProvider(1, WORKER_LIST_TTL_MS);
final int numThreads = Runtime.getRuntime().availableProcessors();
final int numThreads = 16;
CountDownLatch startSignal = new CountDownLatch(numThreads);
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
List<List<BlockWorkerInfo>> lists = IntStream.range(0, numThreads)
Expand Down Expand Up @@ -101,7 +101,7 @@ public void concurrentRefresh() throws Exception {
long initialCount = provider.getUpdateCount();
Thread.sleep(WORKER_LIST_TTL_MS);

final int numThreads = Runtime.getRuntime().availableProcessors();
final int numThreads = 16;
CountDownLatch startSignal = new CountDownLatch(numThreads);
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);

Expand Down Expand Up @@ -134,6 +134,12 @@ public void concurrentRefresh() throws Exception {
}
// only one thread actually updated the map
assertEquals(1, provider.getUpdateCount() - initialCount);
// check if the worker list is one of the lists provided by the threads
List<BlockWorkerInfo> workerInfoListUsedByPolicy = provider.getLastWorkerInfos();
assertTrue(listsPerThread.contains(workerInfoListUsedByPolicy));
assertEquals(
ConsistentHashProvider.build(workerInfoListUsedByPolicy, NUM_VIRTUAL_NODES),
provider.getActiveNodesMap());
}

@Test
Expand Down

0 comments on commit b12269b

Please sign in to comment.