From 7c0ab37fe6b90efd35ffc24d9cab06b91759e400 Mon Sep 17 00:00:00 2001 From: eolivelli Date: Mon, 31 Oct 2016 09:52:26 +0100 Subject: [PATCH 1/2] BOOKKEEPER-912 Allow EnsemblePlacementPolicy to choose bookies using ledger custom data (multitenancy support) --- .../LocalBookieEnsemblePlacementPolicy.java | 5 +- .../bookkeeper/client/BookieWatcher.java | 13 +- .../DefaultEnsemblePlacementPolicy.java | 9 +- .../client/EnsemblePlacementPolicy.java | 30 ++-- .../bookkeeper/client/LedgerCreateOp.java | 3 +- .../bookkeeper/client/LedgerHandle.java | 4 +- .../RackawareEnsemblePlacementPolicy.java | 20 +-- .../RackawareEnsemblePlacementPolicyImpl.java | 8 +- .../RegionAwareEnsemblePlacementPolicy.java | 6 +- .../GenericEnsemblePlacementPolicyTest.java | 148 ++++++++++++++++++ .../TestRackawareEnsemblePlacementPolicy.java | 22 +-- ...estRegionAwareEnsemblePlacementPolicy.java | 58 +++---- 12 files changed, 233 insertions(+), 93 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java index 508511ba637..231b47de03d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java @@ -74,7 +74,7 @@ public Set onClusterChanged(Set writab } @Override - public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection currentEnsemble, BookieSocketAddress bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { + public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Collection currentEnsemble, BookieSocketAddress bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { throw new BKNotEnoughBookiesException(); } @@ -89,8 +89,7 @@ public List reorderReadLACSequence(ArrayList ensem } @Override - public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Set excludeBookies) throws BKNotEnoughBookiesException { + public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Set excludeBookies) throws BKNotEnoughBookiesException { if (ensembleSize > 1) { throw new IllegalArgumentException("Local ensemble policy can only return 1 bookie"); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java index b8d89519ca8..cec6920e435 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java @@ -49,6 +49,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; +import java.util.Map; /** * This class is responsible for maintaining a consistent view of what bookies @@ -255,18 +256,19 @@ public void processResult(int rc, String path, Object ctx, List children * @return list of bookies for new ensemble. * @throws BKNotEnoughBookiesException */ - public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize) + public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, + int ackQuorumSize, Map customMetadata) throws BKNotEnoughBookiesException { try { // we try to only get from the healthy bookies first return placementPolicy.newEnsemble(ensembleSize, - writeQuorumSize, ackQuorumSize, new HashSet( + writeQuorumSize, ackQuorumSize, customMetadata, new HashSet( quarantinedBookies.asMap().keySet())); } catch (BKNotEnoughBookiesException e) { if (logger.isDebugEnabled()) { logger.debug("Not enough healthy bookies available, using quarantined bookies"); } - return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, EMPTY_SET); + return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, EMPTY_SET); } } @@ -280,6 +282,7 @@ public ArrayList newEnsemble(int ensembleSize, int writeQuo * @throws BKNotEnoughBookiesException */ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Map customMetadata, List existingBookies, int bookieIdx, Set excludeBookies) throws BKNotEnoughBookiesException { @@ -288,13 +291,13 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, // we exclude the quarantined bookies also first Set existingAndQuarantinedBookies = new HashSet(existingBookies); existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet()); - return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, + return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, existingAndQuarantinedBookies, addr, excludeBookies); } catch (BKNotEnoughBookiesException e) { if (logger.isDebugEnabled()) { logger.debug("Not enough healthy bookies available, using quarantined bookies"); } - return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, + return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet(existingBookies), addr, excludeBookies); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java index 640bdb7c906..5a2c1f281d0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java @@ -45,8 +45,7 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy { private Set knownBookies = new HashSet(); @Override - public ArrayList newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize, - Set excludeBookies) throws BKNotEnoughBookiesException { + public ArrayList newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize, java.util.Map customMetadata, Set excludeBookies) throws BKNotEnoughBookiesException { ArrayList newBookies = new ArrayList(ensembleSize); if (ensembleSize <= 0) { return newBookies; @@ -70,11 +69,9 @@ public ArrayList newEnsemble(int ensembleSize, int quorumSi } @Override - public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection currentEnsemble, - BookieSocketAddress bookieToReplace, - Set excludeBookies) throws BKNotEnoughBookiesException { + public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Collection currentEnsemble, BookieSocketAddress bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { excludeBookies.addAll(currentEnsemble); - ArrayList addresses = newEnsemble(1, 1, 1, excludeBookies); + ArrayList addresses = newEnsemble(1, 1, 1, customMetadata, excludeBookies); return addresses.get(0); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java index 2af81081d59..4a0f3076950 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java @@ -80,28 +80,32 @@ public Set onClusterChanged(Set writab * Ensemble Size * @param writeQuorumSize * Write Quorum Size - * @param excludeBookies - * Bookies that should not be considered as targets. - * @return list of bookies chosen as targets. + * @param ackQuorumSize + * the value of ackQuorumSize + * @param customMetadata the value of customMetadata + * @param excludeBookies Bookies that should not be considered as targets. * @throws BKNotEnoughBookiesException if not enough bookies available. + * @return the java.util.ArrayList */ - public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Set excludeBookies) throws BKNotEnoughBookiesException; + public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map customMetadata, Set excludeBookies) throws BKNotEnoughBookiesException; /** * Choose a new bookie to replace bookieToReplace. If no bookie available in the cluster, * {@link BKNotEnoughBookiesException} is thrown. * - * @param bookieToReplace - * bookie to replace - * @param excludeBookies - * bookies that should not be considered as candidate. - * @return the bookie chosen as target. + * @param ensembleSize + * the value of ensembleSize + * @param writeQuorumSize + * the value of writeQuorumSize + * @param ackQuorumSize the value of ackQuorumSize + * @param customMetadata the value of customMetadata + * @param currentEnsemble the value of currentEnsemble + * @param bookieToReplace bookie to replace + * @param excludeBookies bookies that should not be considered as candidate. * @throws BKNotEnoughBookiesException + * @return the org.apache.bookkeeper.net.BookieSocketAddress */ - public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Collection currentEnsemble, BookieSocketAddress bookieToReplace, - Set excludeBookies) throws BKNotEnoughBookiesException; + public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Collection currentEnsemble, BookieSocketAddress bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException; /** * Reorder the read sequence of a given write quorum writeSet. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java index e88df3194d3..52a5cb6255d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java @@ -107,7 +107,8 @@ public void initiate() { ensemble = bk.bookieWatcher .newEnsemble(metadata.getEnsembleSize(), metadata.getWriteQuorumSize(), - metadata.getAckQuorumSize()); + metadata.getAckQuorumSize(), + metadata.getCustomMetadata()); } catch (BKNotEnoughBookiesException e) { LOG.error("Not enough bookies to create ledger"); createComplete(e.getCode(), null); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 5c3392914e4..11212a7238d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -995,7 +995,9 @@ ArrayList replaceBookieInMetadata(final BookieSocketAddress newEnsemble.addAll(metadata.currentEnsemble); newBookie = bk.bookieWatcher.replaceBookie(metadata.getEnsembleSize(), metadata.getWriteQuorumSize(), - metadata.getAckQuorumSize(), newEnsemble, + metadata.getAckQuorumSize(), + metadata.getCustomMetadata(), + newEnsemble, bookieIndex, new HashSet<>(Arrays.asList(addr))); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java index f42e42a747b..c306ca0b56b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java @@ -79,39 +79,31 @@ public Set onClusterChanged(Set writab @Override public ArrayList newEnsemble( - int ensembleSize, - int writeQuorumSize, - int ackQuorumSize, - Set excludeBookies) + int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { try { - return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, excludeBookies); + return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, excludeBookies); } catch (BKException.BKNotEnoughBookiesException bnebe) { if (slave == null) { throw bnebe; } else { - return slave.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, excludeBookies); + return slave.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, excludeBookies); } } } @Override public BookieSocketAddress replaceBookie( - int ensembleSize, - int writeQuorumSize, - int ackQuorumSize, - Collection currentEnsemble, - BookieSocketAddress bookieToReplace, - Set excludeBookies) + int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Collection currentEnsemble, BookieSocketAddress bookieToReplace, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { try { - return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, + return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, currentEnsemble, bookieToReplace, excludeBookies); } catch (BKException.BKNotEnoughBookiesException bnebe) { if (slave == null) { throw bnebe; } else { - return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, + return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,customMetadata, currentEnsemble, bookieToReplace, excludeBookies); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 3c41a7c7a9d..79ff0dad47f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -259,8 +259,7 @@ protected Set convertBookiesToNodes(Set excludeBookie } @Override - public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Set excludeBookies) + public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Set excludeBookies) throws BKNotEnoughBookiesException { return newEnsembleInternal(ensembleSize, writeQuorumSize, excludeBookies, null, null); } @@ -355,10 +354,7 @@ protected ArrayList newEnsembleInternal( } @Override - public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Collection currentEnsemble, - BookieSocketAddress bookieToReplace, - Set excludeBookies) + public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Collection currentEnsemble, BookieSocketAddress bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { rwLock.readLock().lock(); try { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index abdcb6117cd..08f8f3d27a2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -212,8 +212,7 @@ protected List selectRandomFromRegions(Set availableRegions, @Override - public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, - Set excludeBookies) throws BKException.BKNotEnoughBookiesException { + public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { int effectiveMinRegionsForDurability = disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability; @@ -392,8 +391,7 @@ public ArrayList newEnsemble(int ensembleSize, int writeQuo } @Override - public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection currentEnsemble, BookieSocketAddress bookieToReplace, - Set excludeBookies) throws BKException.BKNotEnoughBookiesException { + public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Collection currentEnsemble, BookieSocketAddress bookieToReplace, Set excludeBookies) throws BKException.BKNotEnoughBookiesException { rwLock.readLock().lock(); try { boolean enforceDurability = enforceDurabilityInReplace && !disableDurabilityFeature.isAvailable(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java new file mode 100644 index 00000000000..e28a6916105 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.client; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import org.junit.Before; +import org.junit.Test; + +public class GenericEnsemblePlacementPolicyTest extends BookKeeperClusterTestCase { + + private BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32; + private static final String PASSWORD = "testPasswd"; + private static final String property = "foo"; + private static final byte[] value = "bar".getBytes(StandardCharsets.UTF_8); + private static List> customMetadataOnNewEnsembleStack = new ArrayList<>(); + private static List> customMetadataOnReplaceBookieStack = new ArrayList<>(); + + public GenericEnsemblePlacementPolicyTest() { + super(0); + baseClientConf.setEnsemblePlacementPolicy(CustomEnsemblePlacementPolicy.class); + } + + public static final class CustomEnsemblePlacementPolicy extends DefaultEnsemblePlacementPolicy { + + @Override + public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, + int ackQuorumSize, Map customMetadata, Collection currentEnsemble, + BookieSocketAddress bookieToReplace, Set excludeBookies) + throws BKException.BKNotEnoughBookiesException { + new Exception("replaceBookie " + ensembleSize + "," + customMetadata).printStackTrace(); + assertNotNull(customMetadata); + customMetadataOnReplaceBookieStack.add(customMetadata); + return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, + currentEnsemble, bookieToReplace, excludeBookies); + } + + @Override + public ArrayList newEnsemble(int ensembleSize, int quorumSize, + int ackQuorumSize, Map customMetadata, Set excludeBookies) + throws BKException.BKNotEnoughBookiesException { + assertNotNull(customMetadata); + customMetadataOnNewEnsembleStack.add(customMetadata); + return super.newEnsemble(ensembleSize, quorumSize, ackQuorumSize, customMetadata, excludeBookies); + } + + } + + @Before + public void reset() { + customMetadataOnNewEnsembleStack.clear(); + customMetadataOnReplaceBookieStack.clear(); + } + + @Test(timeout = 60000) + public void testNewEnsemble() throws Exception { + numBookies = 1; + startBKCluster(); + try { + Map customMetadata = new HashMap<>(); + customMetadata.put(property, value); + try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);) { + bk.createLedger(1, 1, 1, digestType, PASSWORD.getBytes(), customMetadata); + } + assertEquals(1, customMetadataOnNewEnsembleStack.size()); + assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property)); + } finally { + stopBKCluster(); + } + } + + @Test(timeout = 60000) + public void testNewEnsembleWithNotEnoughtBookies() throws Exception { + numBookies = 0; + try { + startBKCluster(); + Map customMetadata = new HashMap<>(); + customMetadata.put(property, value); + try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);) { + bk.createLedger(1, 1, 1, digestType, PASSWORD.getBytes(), customMetadata); + fail("creation should fail"); + } catch (BKException.BKNotEnoughBookiesException bneb) { + } + assertEquals(2, customMetadataOnNewEnsembleStack.size()); + assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property)); + assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(1).get(property)); + } finally { + stopBKCluster(); + } + } + + @Test(timeout = 60000) + public void testReplaceBookie() throws Exception { + numBookies = 3; + startBKCluster(); + try { + Map customMetadata = new HashMap<>(); + customMetadata.put(property, value); + try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);) { + try (LedgerHandle lh = bk.createLedger(2, 2, 2, digestType, PASSWORD.getBytes(), customMetadata);) { + lh.addEntry(value); + long lId = lh.getId(); + ArrayList ensembleAtFirstEntry = lh.getLedgerMetadata().getEnsemble(lId); + assertEquals(2, ensembleAtFirstEntry.size()); + killBookie(ensembleAtFirstEntry.get(0)); + lh.addEntry(value); + } + } + assertEquals(2, customMetadataOnNewEnsembleStack.size()); + assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property)); + // replaceBookie by default calls newEnsemble, so newEnsemble gets called twice + assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property)); + + assertEquals(1, customMetadataOnReplaceBookieStack.size()); + assertArrayEquals(value, customMetadataOnReplaceBookieStack.get(0).get(property)); + + } finally { + stopBKCluster(); + } + } + +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 4f3690204ff..bef6bc28683 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -240,7 +240,7 @@ public void testReplaceBookieWithEnoughBookiesInSameRack() throws Exception { addrs.add(addr4); repp.onClusterChanged(addrs, new HashSet()); // replace node under r2 - BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet(), addr2, new HashSet()); + BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet(), addr2, new HashSet()); assertEquals(addr3, replacedBookie); } @@ -265,7 +265,7 @@ public void testReplaceBookieWithEnoughBookiesInDifferentRack() throws Exception // replace node under r2 Set excludedAddrs = new HashSet(); excludedAddrs.add(addr1); - BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet(), addr2, excludedAddrs); + BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet(), addr2, excludedAddrs); assertFalse(addr1.equals(replacedBookie)); assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie)); @@ -295,7 +295,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws Exception { excludedAddrs.add(addr3); excludedAddrs.add(addr4); try { - repp.replaceBookie(1, 1, 1, new HashSet(), addr2, excludedAddrs); + repp.replaceBookie(1, 1, 1, null, new HashSet(), addr2, excludedAddrs); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKNotEnoughBookiesException bnebe) { // should throw not enou @@ -316,9 +316,9 @@ public void testNewEnsembleWithSingleRack() throws Exception { addrs.add(addr4); repp.onClusterChanged(addrs, new HashSet()); try { - ArrayList ensemble = repp.newEnsemble(3, 2, 2, new HashSet()); + ArrayList ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet()); assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2)); - ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet()); + ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet()); assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2)); } catch (BKNotEnoughBookiesException bnebe) { fail("Should not get not enough bookies exception even there is only one rack."); @@ -344,10 +344,10 @@ public void testNewEnsembleWithMultipleRacks() throws Exception { addrs.add(addr4); repp.onClusterChanged(addrs, new HashSet()); try { - ArrayList ensemble = repp.newEnsemble(3, 2, 2, new HashSet()); + ArrayList ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet()); int numCovered = getNumCoveredWriteQuorums(ensemble, 2); assertTrue(numCovered >= 1 && numCovered < 3); - ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet()); + ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet()); numCovered = getNumCoveredWriteQuorums(ensemble2, 2); assertTrue(numCovered >= 1 && numCovered < 3); } catch (BKNotEnoughBookiesException bnebe) { @@ -386,9 +386,9 @@ public void testNewEnsembleWithEnoughRacks() throws Exception { addrs.add(addr8); repp.onClusterChanged(addrs, new HashSet()); try { - ArrayList ensemble1 = repp.newEnsemble(3, 2, 2, new HashSet()); + ArrayList ensemble1 = repp.newEnsemble(3, 2, 2, null, new HashSet()); assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2)); - ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet()); + ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet()); assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2)); } catch (BKNotEnoughBookiesException bnebe) { fail("Should not get not enough bookies exception even there is only one rack."); @@ -489,12 +489,12 @@ public void testPlacementOnStabilizeNetworkTopology() throws Exception { // we will never use addr4 even it is in the stabilized network topology for (int i = 0 ; i < 5; i++) { ArrayList ensemble = - repp.newEnsemble(3, 3, 3, new HashSet()); + repp.newEnsemble(3, 3, 3, null, new HashSet()); assertFalse(ensemble.contains(addr4)); } // we could still use addr4 for urgent allocation if it is just bookie flapping - ArrayList ensemble = repp.newEnsemble(4, 4, 4, new HashSet()); + ArrayList ensemble = repp.newEnsemble(4, 4, 4, null, new HashSet()); assertTrue(ensemble.contains(addr4)); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index 9a32986eb2b..5c61ae372b4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -269,7 +269,7 @@ public void testReplaceBookieWithEnoughBookiesInSameRegion() throws Exception { addrs.add(addr4); repp.onClusterChanged(addrs, new HashSet()); // replace node under r2 - BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet(), addr2, new HashSet()); + BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet(), addr2, new HashSet()); assertEquals(addr3, replacedBookie); } @@ -294,7 +294,7 @@ public void testReplaceBookieWithEnoughBookiesInDifferentRegion() throws Excepti // replace node under r2 Set excludedAddrs = new HashSet(); excludedAddrs.add(addr1); - BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet(), addr2, excludedAddrs); + BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet(), addr2, excludedAddrs); assertFalse(addr1.equals(replacedBookie)); assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie)); @@ -319,7 +319,7 @@ public void testNewEnsembleBookieWithNotEnoughBookies() throws Exception { addrs.add(addr4); repp.onClusterChanged(addrs, new HashSet()); try { - ArrayList list = repp.newEnsemble(5, 5, 3, new HashSet()); + ArrayList list = repp.newEnsemble(5, 5, 3, null, new HashSet()); LOG.info("Ensemble : {}", list); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKNotEnoughBookiesException bnebe) { @@ -351,7 +351,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws Exception { excludedAddrs.add(addr3); excludedAddrs.add(addr4); try { - repp.replaceBookie(1, 1, 1, new HashSet(), addr2, excludedAddrs); + repp.replaceBookie(1, 1, 1, null, new HashSet(), addr2, excludedAddrs); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKNotEnoughBookiesException bnebe) { // should throw not enou @@ -380,9 +380,9 @@ public void testNewEnsembleWithSingleRegion() throws Exception { addrs.add(addr4); repp.onClusterChanged(addrs, new HashSet()); try { - ArrayList ensemble = repp.newEnsemble(3, 2, 2, new HashSet()); + ArrayList ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet()); assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble, 2)); - ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet()); + ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet()); assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble2, 2)); } catch (BKNotEnoughBookiesException bnebe) { fail("Should not get not enough bookies exception even there is only one rack."); @@ -411,7 +411,7 @@ public void testNewEnsembleWithMultipleRegions() throws Exception { addrs.add(addr4); repp.onClusterChanged(addrs, new HashSet()); try { - ArrayList ensemble = repp.newEnsemble(3, 2, 2, new HashSet()); + ArrayList ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet()); int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble, 2); assertTrue(numCovered >= 1); assertTrue(numCovered < 3); @@ -419,7 +419,7 @@ public void testNewEnsembleWithMultipleRegions() throws Exception { fail("Should not get not enough bookies exception even there is only one rack."); } try { - ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet()); + ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet()); int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble2, 2); assertTrue(numCovered >= 1 && numCovered < 3); } catch (BKNotEnoughBookiesException bnebe) { @@ -458,9 +458,9 @@ public void testNewEnsembleWithEnoughRegions() throws Exception { addrs.add(addr8); repp.onClusterChanged(addrs, new HashSet()); try { - ArrayList ensemble1 = repp.newEnsemble(3, 2, 2, new HashSet()); + ArrayList ensemble1 = repp.newEnsemble(3, 2, 2, null, new HashSet()); assertEquals(3, getNumCoveredRegionsInWriteQuorum(ensemble1, 2)); - ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet()); + ArrayList ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet()); assertEquals(4, getNumCoveredRegionsInWriteQuorum(ensemble2, 2)); } catch (BKNotEnoughBookiesException bnebe) { fail("Should not get not enough bookies exception even there is only one rack."); @@ -507,22 +507,22 @@ public void testNewEnsembleWithThreeRegions() throws Exception { addrs.add(addr10); repp.onClusterChanged(addrs, new HashSet()); try { - ArrayList ensemble = repp.newEnsemble(6, 6, 4, new HashSet()); + ArrayList ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet()); assert(ensemble.contains(addr4)); assert(ensemble.contains(addr8)); assert(ensemble.size() == 6); assertEquals(3, getNumRegionsInEnsemble(ensemble)); - ensemble = repp.newEnsemble(7, 7, 4, new HashSet()); + ensemble = repp.newEnsemble(7, 7, 4, null, new HashSet()); assert(ensemble.contains(addr4)); assert(ensemble.contains(addr8)); assert(ensemble.size() == 7); assertEquals(3, getNumRegionsInEnsemble(ensemble)); - ensemble = repp.newEnsemble(8, 8, 5, new HashSet()); + ensemble = repp.newEnsemble(8, 8, 5, null, new HashSet()); assert(ensemble.contains(addr4)); assert(ensemble.contains(addr8)); assert(ensemble.size() == 8); assertEquals(3, getNumRegionsInEnsemble(ensemble)); - ensemble = repp.newEnsemble(9, 9, 5, new HashSet()); + ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet()); assert(ensemble.contains(addr4)); assert(ensemble.contains(addr8)); assert(ensemble.size() == 9); @@ -575,7 +575,7 @@ public void testNewEnsembleWithThreeRegionsWithDisable() throws Exception { repp.onClusterChanged(addrs, new HashSet()); try { ((SettableFeature) featureProvider.scope("region1").getFeature("disallowBookies")).set(true); - ArrayList ensemble = repp.newEnsemble(6, 6, 4, new HashSet()); + ArrayList ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet()); assertEquals(2, getNumRegionsInEnsemble(ensemble)); assert(ensemble.contains(addr1)); assert(ensemble.contains(addr3)); @@ -589,14 +589,14 @@ public void testNewEnsembleWithThreeRegionsWithDisable() throws Exception { } try { ((SettableFeature) featureProvider.scope("region2").getFeature("disallowBookies")).set(true); - ArrayList ensemble = repp.newEnsemble(6, 6, 4, new HashSet()); + ArrayList ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet()); fail("Should get not enough bookies exception even there is only one region with insufficient bookies."); } catch (BKNotEnoughBookiesException bnebe) { // Expected } try { ((SettableFeature) featureProvider.scope("region2").getFeature("disallowBookies")).set(false); - ArrayList ensemble = repp.newEnsemble(6, 6, 4, new HashSet()); + ArrayList ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet()); assert(ensemble.contains(addr1)); assert(ensemble.contains(addr3)); assert(ensemble.contains(addr4)); @@ -669,7 +669,7 @@ public void testNewEnsembleWithFiveRegions() throws Exception { repp.onClusterChanged(addrs, new HashSet()); try { - ArrayList ensemble = repp.newEnsemble(10, 10, 10, new HashSet()); + ArrayList ensemble = repp.newEnsemble(10, 10, 10, null, new HashSet()); assert(ensemble.size() == 10); assertEquals(5, getNumRegionsInEnsemble(ensemble)); } catch (BKNotEnoughBookiesException bnebe) { @@ -680,7 +680,7 @@ public void testNewEnsembleWithFiveRegions() throws Exception { try{ Set excludedAddrs = new HashSet(); excludedAddrs.add(addr10); - ArrayList ensemble = repp.newEnsemble(10, 10, 10, excludedAddrs); + ArrayList ensemble = repp.newEnsemble(10, 10, 10, null, excludedAddrs); assert(ensemble.contains(addr11) && ensemble.contains(addr12)); assert(ensemble.size() == 10); assertEquals(5, getNumRegionsInEnsemble(ensemble)); @@ -771,7 +771,7 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int minDurability, boole ArrayList ensemble; try { - ensemble = repp.newEnsemble(6, 6, ackQuorum, new HashSet()); + ensemble = repp.newEnsemble(6, 6, ackQuorum, null, new HashSet()); assert(ensemble.size() == 6); assertEquals(3, getNumRegionsInEnsemble(ensemble)); } catch (BKNotEnoughBookiesException bnebe) { @@ -792,7 +792,7 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int minDurability, boole Set excludedAddrs = new HashSet(); for(BookieSocketAddress addr: region2Bookies) { if (ensemble.contains(addr)) { - BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, ensemble, addr, excludedAddrs); + BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, null, ensemble, addr, excludedAddrs); ensemble.remove(addr); ensemble.add(replacedBookie); } @@ -816,7 +816,7 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int minDurability, boole Set excludedAddrs = new HashSet(); try { - BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, ensemble, bookieToReplace, excludedAddrs); + BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, null, ensemble, bookieToReplace, excludedAddrs); assert (replacedBookie.equals(replacedBookieExpected)); assertEquals(3, getNumRegionsInEnsemble(ensemble)); } catch (BKNotEnoughBookiesException bnebe) { @@ -825,7 +825,7 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int minDurability, boole excludedAddrs.add(replacedBookieExpected); try { - BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, ensemble, bookieToReplace, excludedAddrs); + BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, null, ensemble, bookieToReplace, excludedAddrs); if (minDurability > 1 && !disableDurabilityFeature.isAvailable()) { fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } @@ -900,7 +900,7 @@ public void testEnsembleDurabilityDisabledInternal(int minDurability, boolean di ArrayList ensemble; try { - ensemble = repp.newEnsemble(6, 6, 4, new HashSet()); + ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet()); assert(ensemble.size() == 6); } catch (BKNotEnoughBookiesException bnebe) { LOG.error("BKNotEnoughBookiesException", bnebe); @@ -911,7 +911,7 @@ public void testEnsembleDurabilityDisabledInternal(int minDurability, boolean di Set excludedAddrs = new HashSet(); try{ - repp.replaceBookie(6, 6, 4, ensemble, addr4, excludedAddrs); + repp.replaceBookie(6, 6, 4, null, ensemble, addr4, excludedAddrs); } catch (BKNotEnoughBookiesException bnebe) { fail("Should not get not enough bookies exception even there is only one rack."); } @@ -964,7 +964,7 @@ public void testNewEnsembleFailWithFiveRegions() throws Exception { excludedAddrs.add(addr10); excludedAddrs.add(addr9); try { - ArrayList list = repp.newEnsemble(5, 5, 5, excludedAddrs); + ArrayList list = repp.newEnsemble(5, 5, 5, null, excludedAddrs); LOG.info("Ensemble : {}", list); fail("Should throw BKNotEnoughBookiesException when there is not enough bookies"); } catch (BKNotEnoughBookiesException bnebe) { @@ -1025,7 +1025,7 @@ public void testBasicReorderReadLACSequenceWithLocalRegion() throws Exception { private void basicReorderReadSequenceWithLocalRegionTest(String myRegion, boolean isReadLAC) throws Exception { prepareNetworkTopologyForReorderTests(myRegion); - ArrayList ensemble = repp.newEnsemble(9, 9, 5, new HashSet()); + ArrayList ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet()); assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9)); DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9); @@ -1076,7 +1076,7 @@ public void testBasicReorderReadLACSequenceWithRemoteRegion() throws Exception { private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion, boolean isReadLAC) throws Exception { prepareNetworkTopologyForReorderTests(myRegion); - ArrayList ensemble = repp.newEnsemble(9, 9, 5, new HashSet()); + ArrayList ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet()); assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9)); DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9); @@ -1139,7 +1139,7 @@ private void reorderReadSequenceWithUnavailableOrReadOnlyBookiesTest(boolean isR prepareNetworkTopologyForReorderTests(myRegion); - ArrayList ensemble = repp.newEnsemble(9, 9, 5, new HashSet()); + ArrayList ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet()); assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9)); DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9); From 6c1eaca6487de3e891a7794cc95aaccf90be6ee5 Mon Sep 17 00:00:00 2001 From: eolivelli Date: Wed, 9 Nov 2016 09:01:49 +0100 Subject: [PATCH 2/2] BOOKKEEPER-912 Allow EnsemblePlacementPolicy to choose bookies using ledger custom data (multitenancy support) --- .../bookie/LocalBookieEnsemblePlacementPolicy.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java index 231b47de03d..e7bfe946548 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java @@ -74,7 +74,9 @@ public Set onClusterChanged(Set writab } @Override - public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Collection currentEnsemble, BookieSocketAddress bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { + public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + java.util.Map customMetadata, Collection currentEnsemble, + BookieSocketAddress bookieToReplace, Set excludeBookies) throws BKNotEnoughBookiesException { throw new BKNotEnoughBookiesException(); } @@ -89,7 +91,8 @@ public List reorderReadLACSequence(ArrayList ensem } @Override - public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map customMetadata, Set excludeBookies) throws BKNotEnoughBookiesException { + public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + java.util.Map customMetadata, Set excludeBookies) throws BKNotEnoughBookiesException { if (ensembleSize > 1) { throw new IllegalArgumentException("Local ensemble policy can only return 1 bookie"); }