Skip to content

Commit

Permalink
Optimize concurrent collection's shrink logic (#3417)
Browse files Browse the repository at this point in the history
### Motivation

Optimize concurrent collection's shrink and clear logic

### Changes
1. Reduce the repeated `Arrays.fill` in the clear process
2. When `capacity` is already equal to `initCapacity`,`rehash` should not be executed
3. Reduce the `rehash` logic in the `clear` process
4. Shrinking must at least ensure `initCapacity`, so as to avoid frequent shrinking and expansion near `initCapacity`, frequent shrinking and expansion, additionally opened `arrays` will consume more memory and affect GC.

If this PR is accepted, I will optimize the same `concurrent collection's shrink and clear logic ` defined in pulsar.

Related to #3061 and #3074
  • Loading branch information
wenbingshen committed Jul 26, 2022
1 parent de70f4f commit a580547
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,11 @@ private V remove(long key, Object value, int keyHash) {
} finally {
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
// Shrinking must at least ensure initCapacity,
// so as to avoid frequent shrinking and expansion near initCapacity,
// frequent shrinking and expansion,
// additionally opened arrays will consume more memory and affect GC
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
Expand Down Expand Up @@ -556,7 +560,11 @@ int removeIf(LongObjectPredicate<V> filter) {
} finally {
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
// Shrinking must at least ensure initCapacity,
// so as to avoid frequent shrinking and expansion near initCapacity,
// frequent shrinking and expansion,
// additionally opened arrays will consume more memory and affect GC
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
Expand All @@ -575,12 +583,13 @@ void clear() {
long stamp = writeLock();

try {
Arrays.fill(keys, 0);
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
if (autoShrink) {
rehash(initCapacity);
if (autoShrink && capacity > initCapacity) {
shrinkToInitCapacity();
} else {
Arrays.fill(keys, 0);
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
}
} finally {
unlockWrite(stamp);
Expand Down Expand Up @@ -658,6 +667,21 @@ private void rehash(int newCapacity) {
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private void shrinkToInitCapacity() {
long[] newKeys = new long[initCapacity];
V[] newValues = (V[]) new Object[initCapacity];

keys = newKeys;
values = newValues;
size = 0;
usedBuckets = 0;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = initCapacity;
resizeThresholdUp = (int) (capacity * mapFillFactor);
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
int bucket = (int) hash(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,11 @@ private boolean remove(long item, int hash) {
} finally {
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
// Shrinking must at least ensure initCapacity,
// so as to avoid frequent shrinking and expansion near initCapacity,
// frequent shrinking and expansion,
// additionally opened arrays will consume more memory and affect GC
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
Expand Down Expand Up @@ -444,11 +448,12 @@ void clear() {
long stamp = writeLock();

try {
Arrays.fill(table, EmptyItem);
this.size = 0;
this.usedBuckets = 0;
if (autoShrink) {
rehash(initCapacity);
if (autoShrink && capacity > initCapacity) {
shrinkToInitCapacity();
} else {
Arrays.fill(table, EmptyItem);
this.size = 0;
this.usedBuckets = 0;
}
} finally {
unlockWrite(stamp);
Expand Down Expand Up @@ -516,6 +521,20 @@ private void rehash(int newCapacity) {
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private void shrinkToInitCapacity() {
long[] newTable = new long[initCapacity];
Arrays.fill(newTable, EmptyItem);

table = newTable;
size = 0;
usedBuckets = 0;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = initCapacity;
resizeThresholdUp = (int) (capacity * mapFillFactor);
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private static void insertKeyValueNoLock(long[] table, int capacity, long item) {
int bucket = signSafeMod(hash(item), capacity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,11 @@ private long remove(long key, long value, int keyHash) {
} finally {
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
// Shrinking must at least ensure initCapacity,
// so as to avoid frequent shrinking and expansion near initCapacity,
// frequent shrinking and expansion,
// additionally opened arrays will consume more memory and affect GC
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
Expand Down Expand Up @@ -695,7 +699,7 @@ int removeIf(LongPredicate filter) {
} finally {
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
Expand Down Expand Up @@ -734,7 +738,7 @@ int removeIf(LongLongPredicate filter) {
} finally {
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
Expand Down Expand Up @@ -775,11 +779,12 @@ void clear() {
long stamp = writeLock();

try {
Arrays.fill(table, EmptyKey);
this.size = 0;
this.usedBuckets = 0;
if (autoShrink) {
rehash(initCapacity);
if (autoShrink && capacity > initCapacity) {
shrinkToInitCapacity();
} else {
Arrays.fill(table, EmptyKey);
this.size = 0;
this.usedBuckets = 0;
}
} finally {
unlockWrite(stamp);
Expand Down Expand Up @@ -850,6 +855,20 @@ private void rehash(int newCapacity) {
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private void shrinkToInitCapacity() {
long[] newTable = new long[2 * initCapacity];
Arrays.fill(newTable, EmptyKey);

table = newTable;
size = 0;
usedBuckets = 0;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = initCapacity;
resizeThresholdUp = (int) (capacity * mapFillFactor);
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private static void insertKeyValueNoLock(long[] table, int capacity, long key, long value) {
int bucket = signSafeMod(hash(key), capacity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,11 @@ private boolean remove(long key1, long key2, long value1, long value2, int keyHa
} finally {
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
// Shrinking must at least ensure initCapacity,
// so as to avoid frequent shrinking and expansion near initCapacity,
// frequent shrinking and expansion,
// additionally opened arrays will consume more memory and affect GC
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
Expand Down Expand Up @@ -531,11 +535,12 @@ void clear() {
long stamp = writeLock();

try {
Arrays.fill(table, EmptyKey);
this.size = 0;
this.usedBuckets = 0;
if (autoShrink) {
rehash(initCapacity);
if (autoShrink && capacity > initCapacity) {
shrinkToInitCapacity();
} else {
Arrays.fill(table, EmptyKey);
this.size = 0;
this.usedBuckets = 0;
}
} finally {
unlockWrite(stamp);
Expand Down Expand Up @@ -611,6 +616,20 @@ private void rehash(int newCapacity) {
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private void shrinkToInitCapacity() {
long[] newTable = new long[4 * initCapacity];
Arrays.fill(newTable, EmptyKey);

table = newTable;
size = 0;
usedBuckets = 0;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = initCapacity;
resizeThresholdUp = (int) (capacity * mapFillFactor);
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private static void insertKeyValueNoLock(long[] table, int capacity, long key1, long key2, long value1,
long value2) {
int bucket = signSafeMod(hash(key1, key2), capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,11 @@ private V remove(K key, Object value, int keyHash) {
} finally {
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
// Shrinking must at least ensure initCapacity,
// so as to avoid frequent shrinking and expansion near initCapacity,
// frequent shrinking and expansion,
// additionally opened arrays will consume more memory and affect GC
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
Expand All @@ -465,11 +469,12 @@ void clear() {
long stamp = writeLock();

try {
Arrays.fill(table, EmptyKey);
this.size = 0;
this.usedBuckets = 0;
if (autoShrink) {
rehash(initCapacity);
if (autoShrink && capacity > initCapacity) {
shrinkToInitCapacity();
} else {
Arrays.fill(table, EmptyKey);
this.size = 0;
this.usedBuckets = 0;
}
} finally {
unlockWrite(stamp);
Expand Down Expand Up @@ -541,7 +546,11 @@ int removeIf(BiPredicate<K, V> filter) {
} finally {
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
// Shrinking must at least ensure initCapacity,
// so as to avoid frequent shrinking and expansion near initCapacity,
// frequent shrinking and expansion,
// additionally opened arrays will consume more memory and affect GC
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
Expand Down Expand Up @@ -601,6 +610,19 @@ private void rehash(int newCapacity) {
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private void shrinkToInitCapacity() {
Object[] newTable = new Object[2 * initCapacity];

table = newTable;
size = 0;
usedBuckets = 0;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = initCapacity;
resizeThresholdUp = (int) (capacity * mapFillFactor);
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private static <K, V> void insertKeyValueNoLock(Object[] table, int capacity, K key, V value) {
int bucket = signSafeMod(hash(key), capacity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,11 @@ private boolean remove(V value, int keyHash) {
} finally {
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
// Shrinking must at least ensure initCapacity,
// so as to avoid frequent shrinking and expansion near initCapacity,
// frequent shrinking and expansion,
// additionally opened arrays will consume more memory and affect GC
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
Expand All @@ -434,11 +438,12 @@ void clear() {
long stamp = writeLock();

try {
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
if (autoShrink) {
rehash(initCapacity);
if (autoShrink && capacity > initCapacity) {
shrinkToInitCapacity();
} else {
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
}
} finally {
unlockWrite(stamp);
Expand Down Expand Up @@ -509,6 +514,19 @@ private void rehash(int newCapacity) {
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private void shrinkToInitCapacity() {
V[] newValues = (V[]) new Object[initCapacity];

values = newValues;
size = 0;
usedBuckets = 0;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = initCapacity;
resizeThresholdUp = (int) (capacity * mapFillFactor);
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private static <V> void insertValueNoLock(V[] values, V value) {
int bucket = (int) hash(value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,39 @@ public void testExpandAndShrink() {
assertTrue(map.capacity() == 8);
}

@Test
public void testExpandShrinkAndClear() {
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(2)
.concurrencyLevel(1)
.autoShrink(true)
.mapIdleFactor(0.25f)
.build();
final long initCapacity = map.capacity();
assertTrue(map.capacity() == 4);
assertNull(map.put(1, "v1"));
assertNull(map.put(2, "v2"));
assertNull(map.put(3, "v3"));

// expand hashmap
assertTrue(map.capacity() == 8);

assertTrue(map.remove(1, "v1"));
// not shrink
assertTrue(map.capacity() == 8);
assertTrue(map.remove(2, "v2"));
// shrink hashmap
assertTrue(map.capacity() == 4);

assertTrue(map.remove(3, "v3"));
// Will not shrink the hashmap again because shrink capacity is less than initCapacity
// current capacity is equal than the initial capacity
assertTrue(map.capacity() == initCapacity);
map.clear();
// after clear, because current capacity is equal than the initial capacity, so not shrinkToInitCapacity
assertTrue(map.capacity() == initCapacity);
}

@Test
public void simpleInsertions() {
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
Expand Down

0 comments on commit a580547

Please sign in to comment.