Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace guava multimap in PCBC with custom impl (Cherry-pick) #1618

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;

import com.google.common.base.Joiner;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
Expand Down Expand Up @@ -70,8 +68,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -125,6 +123,7 @@
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
import org.apache.bookkeeper.util.collections.SynchronizedHashMultiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -163,8 +162,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {

// Map that hold duplicated read requests. The idea is to only use this map (synchronized) when there is a duplicate
// read request for the same ledgerId/entryId
private final ListMultimap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
LinkedListMultimap.create();
private final SynchronizedHashMultiMap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
new SynchronizedHashMultiMap<>();

private final StatsLogger statsLogger;
private final OpStatsLogger readEntryOpLogger;
Expand Down Expand Up @@ -760,9 +759,7 @@ private void readEntryInternal(final long ledgerId,
CompletionValue existingValue = completionObjects.putIfAbsent(completionKey, readCompletion);
if (existingValue != null) {
// There's a pending read request on same ledger/entry. Use the multimap to track all of them
synchronized (completionObjectsV2Conflicts) {
completionObjectsV2Conflicts.put(completionKey, readCompletion);
}
completionObjectsV2Conflicts.put(completionKey, readCompletion);
}

writeAndFlush(channel, completionKey, request);
Expand Down Expand Up @@ -799,16 +796,7 @@ public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object
public void checkTimeoutOnPendingOperations() {
int timedOutOperations = completionObjects.removeIf(timeoutCheck);

synchronized (this) {
Iterator<CompletionValue> iterator = completionObjectsV2Conflicts.values().iterator();
while (iterator.hasNext()) {
CompletionValue value = iterator.next();
if (value.maybeTimeout()) {
++timedOutOperations;
iterator.remove();
}
}
}
timedOutOperations += completionObjectsV2Conflicts.removeIf(timeoutCheck);

if (timedOutOperations > 0) {
LOG.info("Timed-out {} operations to channel {} for {}",
Expand Down Expand Up @@ -932,6 +920,9 @@ void errorOut(final CompletionKey key) {
CompletionValue completion = completionObjects.remove(key);
if (completion != null) {
completion.errorOut();
} else {
// If there's no completion object here, try in the multimap
completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut());
}
}

Expand All @@ -944,14 +935,7 @@ void errorOut(final CompletionKey key, final int rc) {
completion.errorOut(rc);
} else {
// If there's no completion object here, try in the multimap
synchronized (completionObjectsV2Conflicts) {
if (completionObjectsV2Conflicts.containsKey(key)) {
completion = completionObjectsV2Conflicts.get(key).get(0);
completionObjectsV2Conflicts.remove(key, completion);

completion.errorOut(rc);
}
}
completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut(rc));
}
}

Expand Down Expand Up @@ -980,16 +964,10 @@ void errorOutPendingOps(int rc) {
*/

void errorOutOutstandingEntries(int rc) {
// DO NOT rewrite these using Map.Entry iterations. We want to iterate
// on keys and see if we are successfully able to remove the key from
// the map. Because the add and the read methods also do the same thing
// in case they get a write failure on the socket. The one who
// successfully removes the key from the map is the one responsible for
// calling the application callback.
for (CompletionKey key : completionObjectsV2Conflicts.keySet()) {
while (completionObjectsV2Conflicts.get(key).size() > 0) {
errorOut(key, rc);
}
Optional<CompletionKey> multikey = completionObjectsV2Conflicts.getAnyKey();
while (multikey.isPresent()) {
multikey.ifPresent(k -> errorOut(k, rc));
multikey = completionObjectsV2Conflicts.getAnyKey();
}
for (CompletionKey key : completionObjects.keys()) {
errorOut(key, rc);
Expand Down Expand Up @@ -1110,12 +1088,7 @@ private void readV2Response(final BookieProtocol.Response response) {
key.release();
if (completionValue == null) {
// If there's no completion object here, try in the multimap
synchronized (this) {
if (completionObjectsV2Conflicts.containsKey(key)) {
completionValue = completionObjectsV2Conflicts.get(key).get(0);
completionObjectsV2Conflicts.remove(key, completionValue);
}
}
completionValue = completionObjectsV2Conflicts.removeAny(key).orElse(null);
}

if (null == completionValue) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.util.collections;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiPredicate;
import org.apache.commons.lang3.tuple.Pair;

/**
* Simple multimap implementation that only stores key reference once.
*
* <p>Implementation is aimed at storing PerChannelBookieClient completions when there
* are duplicates. If the key is a pooled object, it must not exist once the value
* has been removed from the map, which can happen with guava multimap implemenations.
*
* <p>With this map is implemented with pretty heavy locking, but this shouldn't be an
* issue as the multimap only needs to be used in rare cases, i.e. when a user tries
* to read or the same entry twice at the same time. This class should *NOT* be used
* in critical path code.
*
* <p>A unique key-value pair will only be stored once.
*/
public class SynchronizedHashMultiMap<K, V> {

HashMap<Integer, Set<Pair<K, V>>> map = new HashMap<>();

public synchronized void put(K k, V v) {
map.computeIfAbsent(k.hashCode(), (ignore) -> new HashSet<>()).add(Pair.of(k, v));
}

public synchronized Optional<K> getAnyKey() {
return map.values().stream().findAny().flatMap(pairs -> pairs.stream().findAny().map(p -> p.getLeft()));
}

public synchronized Optional<V> removeAny(K k) {
Set<Pair<K, V>> set = map.getOrDefault(k.hashCode(), Collections.emptySet());
Optional<Pair<K, V>> pair = set.stream().filter(p -> p.getLeft().equals(k)).findAny();
pair.ifPresent(p -> set.remove(p));
return pair.map(p -> p.getRight());
}

public synchronized int removeIf(BiPredicate<K, V> predicate) {
int removedSum = map.values().stream().mapToInt(
pairs -> {
int removed = 0;
// Can't use removeIf because we need the count
Iterator<Pair<K, V>> iter = pairs.iterator();
while (iter.hasNext()) {
Pair<K, V> kv = iter.next();
if (predicate.test(kv.getLeft(), kv.getRight())) {
iter.remove();
removed++;
}
}
return removed;
}).sum();
map.values().removeIf((s) -> s.isEmpty());
return removedSum;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.util.collections;

import java.util.Optional;

import org.junit.Assert;
import org.junit.Test;

/**
* Test for SynchronizedHashMultiMap.
*/
public class SynchronizedHashMultiMapTest {
@Test
public void testGetAnyKey() {
SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
Assert.assertFalse(map.getAnyKey().isPresent());

map.put(1, 2);
Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(1));

map.put(1, 3);
Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(1));

map.put(2, 4);
int res = map.getAnyKey().get();
Assert.assertTrue(res == 1 || res == 2);

map.removeIf((k, v) -> k == 1);
Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(2));
}

@Test
public void testRemoveAny() {
SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
Assert.assertFalse(map.removeAny(1).isPresent());

map.put(1, 2);
map.put(1, 3);
map.put(2, 4);
map.put(2, 4);

Optional<Integer> v = map.removeAny(1);
int firstVal = v.get();
Assert.assertTrue(firstVal == 2 || firstVal == 3);

v = map.removeAny(1);
int secondVal = v.get();
Assert.assertTrue(secondVal == 2 || secondVal == 3);
Assert.assertNotEquals(secondVal, firstVal);

v = map.removeAny(2);
Assert.assertTrue(v.isPresent());
Assert.assertEquals(v.get(), Integer.valueOf(4));

Assert.assertFalse(map.removeAny(1).isPresent());
Assert.assertFalse(map.removeAny(2).isPresent());
Assert.assertFalse(map.removeAny(3).isPresent());
}

@Test
public void testRemoveIf() {
SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
Assert.assertEquals(map.removeIf((k, v) -> true), 0);

map.put(1, 2);
map.put(1, 3);
map.put(2, 4);
map.put(2, 4);

Assert.assertEquals(map.removeIf((k, v) -> v == 4), 1);
Assert.assertEquals(map.removeIf((k, v) -> k == 1), 2);

map.put(1, 2);
map.put(1, 3);
map.put(2, 4);

Assert.assertEquals(map.removeIf((k, v) -> false), 0);
Assert.assertEquals(map.removeIf((k, v) -> true), 3);
}
}