Skip to content

Commit

Permalink
Make BookieId work with PulsarRegistrationDriver (second take) (#17922)
Browse files Browse the repository at this point in the history
* Make BookieId work with PulsarRegistrationDriver (#17762)

* Make BookieId work with PulsarRegistrationDriver

* Switch to MetadataCache

* checkstyle

* Do not execute lookup on MetadataCache in the getBookieServiceInfo caller thread

(cherry picked from commit 09f5eeb)
  • Loading branch information
eolivelli authored and liangyepianzhou committed Dec 13, 2022
1 parent cfb1722 commit e3eb026
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.Stat;
Expand Down Expand Up @@ -63,7 +64,57 @@ public byte[] serialize(String path, BookieServiceInfo bookieServiceInfo) throws
}

@Override
public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException {
return null;
public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException {
// see https://github.com/apache/bookkeeper/blob/
// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311
String bookieId = extractBookiedIdFromPath(path);
if (bookieServiceInfo == null || bookieServiceInfo.length == 0) {
return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId);
}

BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo);
BookieServiceInfo bsi = new BookieServiceInfo();
List<BookieServiceInfo.Endpoint> endpoints = builder.getEndpointsList().stream()
.map(e -> {
BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
endpoint.setId(e.getId());
endpoint.setPort(e.getPort());
endpoint.setHost(e.getHost());
endpoint.setProtocol(e.getProtocol());
endpoint.setAuth(e.getAuthList());
endpoint.setExtensions(e.getExtensionsList());
return endpoint;
})
.collect(Collectors.toList());

bsi.setEndpoints(endpoints);
bsi.setProperties(builder.getPropertiesMap());

return bsi;

}

/**
* Extract the BookieId
* The path should look like /ledgers/available/bookieId
* or /ledgers/available/readonly/bookieId.
* But the prefix depends on the configuration.
* @param path
* @return the bookieId
*/
private static String extractBookiedIdFromPath(String path) throws IOException {
// https://github.com/apache/bookkeeper/blob/
// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258
if (path == null) {
path = "";
}
int last = path.lastIndexOf("/");
if (last >= 0) {
return path.substring(last + 1);
} else {
throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,33 @@
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;

@Slf4j
public class PulsarRegistrationClient implements RegistrationClient {

private final MetadataStore store;
Expand All @@ -47,14 +58,18 @@ public class PulsarRegistrationClient implements RegistrationClient {
private final String bookieAllRegistrationPath;
private final String bookieReadonlyRegistrationPath;

private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache =
new ConcurrentHashMap();
private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
private final ScheduledExecutorService executor;

public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
this.store = store;
this.ledgersRootPath = ledgersRootPath;
this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);

// Following Bookie Network Address Changes is an expensive operation
// as it requires additional ZooKeeper watches
Expand Down Expand Up @@ -99,7 +114,25 @@ public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {

private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String path) {
return store.getChildren(path)
.thenApply(PulsarRegistrationClient::convertToBookieAddresses)
.thenComposeAsync(children -> {
Set<BookieId> bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children);
Set<BookieId> bookies = convertToBookieAddresses(children);
List<CompletableFuture<Versioned<BookieServiceInfo>>> bookieInfoUpdated =
new ArrayList<>(bookies.size());
for (BookieId id : bookies) {
// update the cache for new bookies
if (!bookieServiceInfoCache.containsKey(id)) {
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
}
}
if (bookieInfoUpdated.isEmpty()) {
return CompletableFuture.completedFuture(bookieIds);
} else {
return FutureUtil
.waitForAll(bookieInfoUpdated)
.thenApply(___ -> bookieIds);
}
})
.thenApply(s -> new Versioned<>(s, Version.NEW));
}

Expand Down Expand Up @@ -129,10 +162,20 @@ public void unwatchReadOnlyBookies(RegistrationListener registrationListener) {

private void updatedBookies(Notification n) {
if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) {

if (n.getType() == NotificationType.Deleted) {
BookieId bookieId = stripBookieIdFromPath(n.getPath());
log.info("Bookie {} disappeared", bookieId);
if (bookieId != null) {
bookieServiceInfoCache.remove(bookieId);
}
}

if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
getReadOnlyBookies().thenAccept(bookies ->
readOnlyBookiesWatchers.keySet()
.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
getReadOnlyBookies().thenAccept(bookies -> {
readOnlyBookiesWatchers.keySet()
.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)));
});
} else if (n.getPath().startsWith(bookieRegistrationPath)) {
getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.keySet()
Expand All @@ -141,6 +184,22 @@ private void updatedBookies(Notification n) {
}
}

private static BookieId stripBookieIdFromPath(String path) {
if (path == null) {
return null;
}
final int slash = path.lastIndexOf('/');
if (slash >= 0) {
try {
return BookieId.parse(path.substring(slash + 1));
} catch (IllegalArgumentException e) {
log.warn("Cannot decode bookieId from {}", path, e);
}
}
return null;
}


private static Set<BookieId> convertToBookieAddresses(List<String> children) {
// Read the bookie addresses into a set for efficient lookup
HashSet<BookieId> newBookieAddrs = new HashSet<>();
Expand All @@ -153,4 +212,56 @@ private static Set<BookieId> convertToBookieAddresses(List<String> children) {
}
return newBookieAddrs;
}

@Override
public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
// this method cannot perform blocking calls to the MetadataStore
// or return a CompletableFuture that is completed on the MetadataStore main thread
// this is because there are a few cases in which some operations on the main thread
// wait for the result. This is due to the fact that resolving the address of a bookie
// is needed in many code paths.
Versioned<BookieServiceInfo> resultFromCache = bookieServiceInfoCache.get(bookieId);
if (log.isDebugEnabled()) {
log.debug("getBookieServiceInfo {} -> {}", bookieId, resultFromCache);
}
if (resultFromCache != null) {
return CompletableFuture.completedFuture(resultFromCache);
} else {
return FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException());
}
}

public CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsync(BookieId bookieId) {
String asWritable = bookieRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.get(asWritable)
.thenCompose((Optional<BookieServiceInfo> getResult) -> {
if (getResult.isPresent()) {
Versioned<BookieServiceInfo> res =
new Versioned<>(getResult.get(), new LongVersion(-1));
log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, getResult.get());
bookieServiceInfoCache.put(bookieId, res);
return CompletableFuture.completedFuture(res);
} else {
return readBookieInfoAsReadonlyBookie(bookieId);
}
}
);
}

final CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.get(asReadonly)
.thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) -> {
if (getResultAsReadOnly.isPresent()) {
Versioned<BookieServiceInfo> res =
new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1));
log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId,
getResultAsReadOnly.get());
bookieServiceInfoCache.put(bookieId, res);
return res;
} else {
throw new CompletionException(new BKException.BKBookieHandleNotAvailableException());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -114,6 +116,66 @@ public void testGetReadonlyBookies(String provider, Supplier<String> urlSupplier
assertEquals(addresses.size(), result.getValue().size());
}

@Test(dataProvider = "impl")
public void testGetBookieServiceInfo(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build());

String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();

@Cleanup
RegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, mock(AbstractConfiguration.class));

@Cleanup
RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot);

List<BookieId> addresses = new ArrayList<>(prepareNBookies(10));
List<BookieServiceInfo> bookieServiceInfos = new ArrayList<>();
int port = 223;
for (BookieId address : addresses) {
BookieServiceInfo info = new BookieServiceInfo();
BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
endpoint.setAuth(Collections.emptyList());
endpoint.setExtensions(Collections.emptyList());
endpoint.setId("id");
endpoint.setHost("localhost");
endpoint.setPort(port++);
endpoint.setProtocol("bookie-rpc");
info.setEndpoints(Arrays.asList(endpoint));
bookieServiceInfos.add(info);
// some readonly, some writable
boolean readOnly = port % 2 == 0;
rm.registerBookie(address, readOnly, info);
}

// trigger loading the BookieServiceInfo in the local cache
rc.getAllBookies().join();

int i = 0;
for (BookieId address : addresses) {
BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++));
}

}

private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) {
assertEquals(a.getProperties(), b.getProperties());
assertEquals(a.getEndpoints().size(), b.getEndpoints().size());
for (int i = 0; i < a.getEndpoints().size(); i++) {
BookieServiceInfo.Endpoint e1 = a.getEndpoints().get(i);
BookieServiceInfo.Endpoint e2 = b.getEndpoints().get(i);
assertEquals(e1.getHost(), e2.getHost());
assertEquals(e1.getPort(), e2.getPort());
assertEquals(e1.getId(), e2.getId());
assertEquals(e1.getProtocol(), e2.getProtocol());
assertEquals(e1.getExtensions(), e2.getExtensions());
assertEquals(e1.getAuth(), e2.getAuth());
}

}

@Test(dataProvider = "impl")
public void testGetAllBookies(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
Expand Down

0 comments on commit e3eb026

Please sign in to comment.