Skip to content

Commit

Permalink
improve the throttle function (apache#2991)
Browse files Browse the repository at this point in the history
Descriptions of the changes in this PR:

improve the throttle function : old pr: apache#2778
1. duplicate definition for replicationRateByBytes
2.make sure this update safety when different callback run averageEntrySize updating

1.clean code for duplicate definition for replicationRateByBytes
2.add a lock to make sure data safety for updateAverageEntrySize

(cherry picked from commit 7f31748)
  • Loading branch information
StevenLuMT authored and hangc0276 committed Nov 7, 2022
1 parent 7267058 commit a13dfce
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,7 @@ public void readComplete(int rc, LedgerHandle lh,
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
if (replicationThrottle != null) {
int toSendSize = toSend.readableBytes();
averageEntrySize = (int) (averageEntrySize * AVERAGE_ENTRY_SIZE_RATIO
+ (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize);
updateAverageEntrySize(toSend.readableBytes());
}
for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
Expand All @@ -420,6 +418,12 @@ public void readComplete(int rc, LedgerHandle lh,
}, null);
}

/* make sure this update safety when different callback run this updating */
private synchronized void updateAverageEntrySize(int toSendSize) {
averageEntrySize = (int) (averageEntrySize * AVERAGE_ENTRY_SIZE_RATIO
+ (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize);
}

/**
* Callback for recovery of a single ledger fragment. Once the fragment has
* had all entries replicated, update the ensemble in zookeeper. Once
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
// option to limit stats logging
public static final String LIMIT_STATS_LOGGING = "limitStatsLogging";

protected static final String REPLICATION_RATE_BY_BYTES = "replicationRateByBytes";

protected AbstractConfiguration() {
super();
if (READ_SYSTEM_PROPERTIES) {
Expand Down Expand Up @@ -1162,6 +1164,28 @@ public T setLimitStatsLogging(boolean limitStatsLogging) {
return getThis();
}

/**
* Get the bytes rate of re-replication.
* Default value is -1 which it means entries will replicated without any throttling activity.
*
* @return bytes rate of re-replication.
*/
public int getReplicationRateByBytes() {
return getInt(REPLICATION_RATE_BY_BYTES, -1);
}

/**
* Set the bytes rate of re-replication.
*
* @param rate bytes rate of re-replication.
*
* @return ClientConfiguration
*/
public T setReplicationRateByBytes(int rate) {
this.setProperty(REPLICATION_RATE_BY_BYTES, rate);
return getThis();
}

/**
* Trickery to allow inheritance with fluent style.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,30 +198,6 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
"clientConnectBookieUnavailableLogThrottling";

protected static final String REPLICATION_RATE_BY_BYTES = "replicationRateByBytes";

/**
* Get the bytes rate of re-replication.
* Default value is -1 which it means entries will replicated without any throttling activity.
*
* @return bytes rate of re-replication.
*/
public int getReplicationRateByBytes() {
return getInt(REPLICATION_RATE_BY_BYTES, -1);
}

/**
* Set the bytes rate of re-replication.
*
* @param rate bytes rate of re-replication.
*
* @return ClientConfiguration
*/
public ClientConfiguration setReplicationRateByBytes(int rate) {
this.setProperty(REPLICATION_RATE_BY_BYTES, rate);
return this;
}

/**
* Construct a default client-side configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
"auditorMaxNumberOfConcurrentOpenLedgerOperations";
protected static final String AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC =
"auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec";
protected static final String REPLICATION_RATE_BY_BYTES = "replicationRateByBytes";
protected static final String IN_FLIGHT_READ_ENTRY_NUM_IN_LEDGER_CHECKER = "inFlightReadEntryNumInLedgerChecker";

// Worker Thread parameters.
protected static final String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
Expand Down Expand Up @@ -3599,24 +3599,12 @@ public ServerConfiguration setAuthorizedRoles(String roles) {
}

/**
* Get the bytes rate of re-replication.
* Default value is -1 which it means entries will replicated without any throttling activity.
* Get in flight read entry number when ledger checker.
* Default value is -1 which it is unlimited when ledger checker.
*
* @return bytes rate of re-replication.
* @return read entry number of in flight.
*/
public int getReplicationRateByBytes() {
return getInt(REPLICATION_RATE_BY_BYTES, -1);
}

/**
* Set the rate of re-replication.
*
* @param rate bytes rate of re-replication.
*
* @return ServerConfiguration
*/
public ServerConfiguration setReplicationRateByBytes(int rate) {
setProperty(REPLICATION_RATE_BY_BYTES, rate);
return this;
public int getInFlightReadEntryNumInLedgerChecker(){
return getInt(IN_FLIGHT_READ_ENTRY_NUM_IN_LEDGER_CHECKER, -1);
}
}

0 comments on commit a13dfce

Please sign in to comment.