Skip to content

Commit

Permalink
Ledger replicate supports throttle (apache#2778)
Browse files Browse the repository at this point in the history
### Motivation

Ledger replicating puts  heavy loads on cluster.
Now,  ledger replicate only supports split fragments into small pieces.
 But, throttling is not supported.

### Changes

Add a confiuration `replicationRateByBytes `

support throttling  read rate in bytes.

Also bookkeeper shell recover command supports throttle.

(cherry picked from commit a2d7341)
  • Loading branch information
gaozhangmin authored and hangc0276 committed Nov 5, 2022
1 parent 6cbe655 commit 6992dfc
Show file tree
Hide file tree
Showing 14 changed files with 119 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ public RecoverCmd() {
opts.addOption("sk", "skipOpenLedgers", false, "Skip recovering open ledgers");
opts.addOption("d", "deleteCookie", false, "Delete cookie node for the bookie.");
opts.addOption("sku", "skipUnrecoverableLedgers", false, "Skip unrecoverable ledgers.");
opts.addOption("rate", "replicationRate", false, "Replication rate by bytes");
}

@Override
Expand Down Expand Up @@ -513,6 +514,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
boolean skipUnrecoverableLedgers = cmdLine.hasOption("sku");

Long ledgerId = getOptionLedgerIdValue(cmdLine, "ledger", -1);
int replicationRate = getOptionIntValue(cmdLine, "replicationRate", -1);

RecoverCommand cmd = new RecoverCommand();
RecoverCommand.RecoverFlags flags = new RecoverCommand.RecoverFlags();
Expand All @@ -521,6 +523,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
flags.dryRun(dryrun);
flags.force(force);
flags.ledger(ledgerId);
flags.replicateRate(replicationRate);
flags.skipOpenLedgers(skipOpenLedgers);
flags.query(query);
flags.skipUnrecoverableLedgers(skipUnrecoverableLedgers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
Expand Down Expand Up @@ -163,7 +164,7 @@ public BookKeeperAdmin(ClientConfiguration conf) throws IOException, Interrupted
// Create the BookKeeper client instance
bkc = new BookKeeper(conf);
ownsBK = true;
this.lfr = new LedgerFragmentReplicator(bkc, NullStatsLogger.INSTANCE);
this.lfr = new LedgerFragmentReplicator(bkc, NullStatsLogger.INSTANCE, conf);
this.mFactory = bkc.ledgerManagerFactory;
}

Expand All @@ -176,15 +177,22 @@ public BookKeeperAdmin(ClientConfiguration conf) throws IOException, Interrupted
* @param statsLogger
* - stats logger
*/
public BookKeeperAdmin(final BookKeeper bkc, StatsLogger statsLogger) {
public BookKeeperAdmin(final BookKeeper bkc, StatsLogger statsLogger, ClientConfiguration conf) {
Objects.requireNonNull(conf, "Client configuration cannot be null");
this.bkc = bkc;
ownsBK = false;
this.lfr = new LedgerFragmentReplicator(bkc, statsLogger);
this.lfr = new LedgerFragmentReplicator(bkc, statsLogger, conf);
this.mFactory = bkc.ledgerManagerFactory;
}

public BookKeeperAdmin(final BookKeeper bkc, ClientConfiguration conf) {
this(bkc, NullStatsLogger.INSTANCE, conf);
}

public BookKeeperAdmin(final BookKeeper bkc) {
this(bkc, NullStatsLogger.INSTANCE);
this.bkc = bkc;
ownsBK = false;
this.mFactory = bkc.ledgerManagerFactory;
}

public ClientConfiguration getConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.bookkeeper.replication.ReplicationStats.READ_DATA_LATENCY;;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;;
import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.Unpooled;
import java.util.Enumeration;
import java.util.HashSet;
Expand All @@ -42,6 +43,7 @@
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieProtocol;
Expand Down Expand Up @@ -102,8 +104,14 @@ public class LedgerFragmentReplicator {
)
private final OpStatsLogger writeDataLatency;

protected Throttler replicationThrottle = null;

public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) {
private int averageEntrySize;

private static final int INITIAL_AVERAGE_ENTRY_SIZE = 1024;
private static final double AVERAGE_ENTRY_SIZE_RATIO = 0.8;

public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger, ClientConfiguration conf) {
this.bkc = bkc;
this.statsLogger = statsLogger;
numEntriesRead = this.statsLogger.getCounter(NUM_ENTRIES_READ);
Expand All @@ -112,10 +120,14 @@ public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) {
numBytesWritten = this.statsLogger.getOpStatsLogger(NUM_BYTES_WRITTEN);
readDataLatency = this.statsLogger.getOpStatsLogger(READ_DATA_LATENCY);
writeDataLatency = this.statsLogger.getOpStatsLogger(WRITE_DATA_LATENCY);
if (conf.getReplicationRateByBytes() > 0) {
this.replicationThrottle = new Throttler(conf.getReplicationRateByBytes());
}
averageEntrySize = INITIAL_AVERAGE_ENTRY_SIZE;
}

public LedgerFragmentReplicator(BookKeeper bkc) {
this(bkc, NullStatsLogger.INSTANCE);
public LedgerFragmentReplicator(BookKeeper bkc, ClientConfiguration conf) {
this(bkc, NullStatsLogger.INSTANCE, conf);
}

private static final Logger LOG = LoggerFactory
Expand Down Expand Up @@ -324,6 +336,11 @@ private void recoverLedgerFragmentEntry(final Long entryId,
final long ledgerId = lh.getId();
final AtomicInteger numCompleted = new AtomicInteger(0);
final AtomicBoolean completed = new AtomicBoolean(false);

if (replicationThrottle != null) {
replicationThrottle.acquire(averageEntrySize);
}

final WriteCallback multiWriteCallback = new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
Expand Down Expand Up @@ -379,10 +396,16 @@ public void readComplete(int rc, LedgerHandle lh,
final long dataLength = data.length;
numEntriesRead.inc();
numBytesRead.registerSuccessfulValue(dataLength);

ByteBufList toSend = lh.getDigestManager()
.computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
if (replicationThrottle != null) {
int toSendSize = toSend.readableBytes();
averageEntrySize = (int) (averageEntrySize * AVERAGE_ENTRY_SIZE_RATIO
+ (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize);
}
for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
Expand Down Expand Up @@ -473,4 +496,17 @@ private static void updateEnsembleInfo(
}
});
}

static class Throttler {
private final RateLimiter rateLimiter;

Throttler(int throttleBytes) {
this.rateLimiter = RateLimiter.create(throttleBytes);
}

// acquire. if bybytes: bytes of this entry; if byentries: 1.
void acquire(int permits) {
rateLimiter.acquire(permits);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,30 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
"clientConnectBookieUnavailableLogThrottling";

protected static final String REPLICATION_RATE_BY_BYTES = "replicationRateByBytes";

/**
* Get the bytes rate of re-replication.
* Default value is -1 which it means entries will replicated without any throttling activity.
*
* @return bytes rate of re-replication.
*/
public int getReplicationRateByBytes() {
return getInt(REPLICATION_RATE_BY_BYTES, -1);
}

/**
* Set the bytes rate of re-replication.
*
* @param rate bytes rate of re-replication.
*
* @return ClientConfiguration
*/
public ClientConfiguration setReplicationRateByBytes(int rate) {
this.setProperty(REPLICATION_RATE_BY_BYTES, rate);
return this;
}

/**
* Construct a default client-side configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
"auditorMaxNumberOfConcurrentOpenLedgerOperations";
protected static final String AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC =
"auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec";
protected static final String REPLICATION_RATE_BY_BYTES = "replicationRateByBytes";

// Worker Thread parameters.
protected static final String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
Expand Down Expand Up @@ -3596,4 +3597,26 @@ public ServerConfiguration setAuthorizedRoles(String roles) {
this.setProperty(AUTHORIZED_ROLES, roles);
return this;
}

/**
* Get the bytes rate of re-replication.
* Default value is -1 which it means entries will replicated without any throttling activity.
*
* @return bytes rate of re-replication.
*/
public int getReplicationRateByBytes() {
return getInt(REPLICATION_RATE_BY_BYTES, -1);
}

/**
* Set the rate of re-replication.
*
* @param rate bytes rate of re-replication.
*
* @return ServerConfiguration
*/
public ServerConfiguration setReplicationRateByBytes(int rate) {
setProperty(REPLICATION_RATE_BY_BYTES, rate);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public Auditor(final String bookieIdentifier,
conf,
bkc,
ownBkc,
new BookKeeperAdmin(bkc, statsLogger),
new BookKeeperAdmin(bkc, statsLogger, new ClientConfiguration(conf)),
true,
statsLogger);
}
Expand Down Expand Up @@ -1233,7 +1233,7 @@ BookKeeper getBookKeeper(ServerConfiguration conf) throws IOException, Interrupt
* @return
*/
BookKeeperAdmin getBookKeeperAdmin(final BookKeeper bookKeeper) {
return new BookKeeperAdmin(bookKeeper, statsLogger);
return new BookKeeperAdmin(bookKeeper, statsLogger, new ClientConfiguration(conf));
}

/**
Expand All @@ -1243,7 +1243,6 @@ BookKeeperAdmin getBookKeeperAdmin(final BookKeeper bookKeeper) {
void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
final BookKeeper localClient = getBookKeeper(conf);
final BookKeeperAdmin localAdmin = getBookKeeperAdmin(localClient);

try {
final LedgerChecker checker = new LedgerChecker(localClient);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieId;
Expand Down Expand Up @@ -173,7 +174,7 @@ public ReplicationWorker(final ServerConfiguration conf,
this.ownBkc = ownBkc;

this.underreplicationManager = bkc.getLedgerManagerFactory().newLedgerUnderreplicationManager();
this.admin = new BookKeeperAdmin(bkc, statsLogger);
this.admin = new BookKeeperAdmin(bkc, statsLogger, new ClientConfiguration(conf));
this.ledgerChecker = new LedgerChecker(bkc);
this.workerThread = new BookieThread(this, "ReplicationWorker");
this.openLedgerRereplicationGracePeriod = conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private boolean updateLedger(ServerConfiguration conf, UpdateBookieInLedgerFlags
final ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.addConfiguration(conf);
final BookKeeper bk = new BookKeeper(clientConfiguration);
final BookKeeperAdmin admin = new BookKeeperAdmin(bk);
final BookKeeperAdmin admin = new BookKeeperAdmin(bk, clientConfiguration);
if (admin.getAvailableBookies().contains(srcBookieAddress)
|| admin.getReadOnlyBookies().contains(srcBookieAddress)) {
bk.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ public static class RecoverFlags extends CliFlags{

@Parameter(names = {"-sku", "--skipunrecoverableledgers"}, description = "Skip unrecoverable ledgers")
private boolean skipUnrecoverableLedgers;

@Parameter(names = { "-rate", "--replicationrate" }, description = "Replication rate in bytes")
private int replicateRate;
}

@Override
Expand All @@ -124,6 +127,7 @@ private boolean recover(ServerConfiguration conf, RecoverFlags flags)
boolean skipUnrecoverableLedgers = flags.skipUnrecoverableLedgers;

Long ledgerId = flags.ledger;
int replicateRate = flags.replicateRate;

// Get bookies list
final String[] bookieStrs = flags.bookieAddress.split(",");
Expand All @@ -147,6 +151,7 @@ private boolean recover(ServerConfiguration conf, RecoverFlags flags)
}

LOG.info("Constructing admin");
conf.setReplicationRateByBytes(replicateRate);
ClientConfiguration adminConf = new ClientConfiguration(conf);
BookKeeperAdmin admin = new BookKeeperAdmin(adminConf);
LOG.info("Construct admin : {}", admin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ private void serializeLocalBookieConfig(ServerConfiguration localBookieConfig, S
for (int i = 1; i < values.length; i++) {
concatenatedValue.append(",").append(values[i]);
}
writer.println(key + "=" + concatenatedValue.toString());
writer.println(key + "=" + concatenatedValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ Set<LedgerFragment> getResult(int time, TimeUnit unit) throws Exception {
@Test
public void testBookKeeperAdmin() throws Exception {
BookKeeper bk = new BookKeeper(baseClientConf, zkc);
try (BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk)) {
try (BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk, baseClientConf)) {

LOG.info("Create ledger and add entries to it");
LedgerHandle lh1 = createLedgerWithEntries(bk, 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testManyLedgersWithShortHostname() throws Exception {

public void testManyLedgers(boolean useShortHostName) throws Exception {
try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);
BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk)) {
BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk, baseClientConf)) {

LOG.info("Create ledger and add entries to it");
List<LedgerHandle> ledgers = new ArrayList<LedgerHandle>();
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testManyLedgers(boolean useShortHostName) throws Exception {
@Test
public void testLimitLessThanTotalLedgers() throws Exception {
try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);
BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk)) {
BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk, baseClientConf)) {

LOG.info("Create ledger and add entries to it");
List<LedgerHandle> ledgers = new ArrayList<LedgerHandle>();
Expand Down Expand Up @@ -191,7 +191,7 @@ public void testChangeEnsembleAfterRenamingToShortHostname() throws Exception {
public void testChangeEnsembleAfterRenaming(boolean useShortHostName) throws Exception {

try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);
BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk)) {
BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk, baseClientConf)) {

LOG.info("Create ledger and add entries to it");
LedgerHandle lh = createLedgerWithEntries(bk, 100);
Expand Down Expand Up @@ -252,7 +252,7 @@ public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx) {
@Test
public void testRenameWhenAddEntryInProgress() throws Exception {
try (final BookKeeper bk = new BookKeeper(baseClientConf, zkc);
BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk)) {
BookKeeperAdmin bkadmin = new BookKeeperAdmin(bk, baseClientConf)) {

LOG.info("Create ledger and add entries to it");
final int numOfEntries = 5000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private class TestBookKeeperAdmin extends BookKeeperAdmin {
public TestBookKeeperAdmin(BookKeeper bkc, StatsLogger statsLogger,
MultiKeyMap<String, AvailabilityOfEntriesOfLedger> returnAvailabilityOfEntriesOfLedger,
MultiKeyMap<String, Integer> errorReturnValueForGetAvailabilityOfEntriesOfLedger) {
super(bkc, statsLogger);
super(bkc, statsLogger, baseClientConf);
this.returnAvailabilityOfEntriesOfLedger = returnAvailabilityOfEntriesOfLedger;
this.errorReturnValueForGetAvailabilityOfEntriesOfLedger =
errorReturnValueForGetAvailabilityOfEntriesOfLedger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private LedgerMetadata testClient(BookKeeper client, int clusterSize) throws Exc
LedgerEntry e = entries.nextElement();
assertTrue("Entry contents incorrect", Arrays.equals(e.getEntry(), testEntry));
}
BookKeeperAdmin admin = new BookKeeperAdmin(client);
BookKeeperAdmin admin = new BookKeeperAdmin(client, baseClientConf);
return admin.getLedgerMetadata(lh);
}
}
Expand Down

0 comments on commit 6992dfc

Please sign in to comment.