Skip to content

Commit

Permalink
improve the throttle function (apache#2991)
Browse files Browse the repository at this point in the history
Descriptions of the changes in this PR:


### Motivation

improve the throttle function : old pr: apache#2778 
1. duplicate definition for replicationRateByBytes
2.make sure this update safety when different callback run averageEntrySize updating

### Changes

1.clean code for duplicate definition for replicationRateByBytes
2.add a lock to make sure data safety for updateAverageEntrySize
  • Loading branch information
StevenLuMT committed Jan 19, 2022
1 parent a3b60db commit 7f31748
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,7 @@ public void readComplete(int rc, LedgerHandle lh,
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
if (replicationThrottle != null) {
int toSendSize = toSend.readableBytes();
averageEntrySize = (int) (averageEntrySize * AVERAGE_ENTRY_SIZE_RATIO
+ (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize);
updateAverageEntrySize(toSend.readableBytes());
}
for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
Expand All @@ -420,6 +418,12 @@ public void readComplete(int rc, LedgerHandle lh,
}, null);
}

/* make sure this update safety when different callback run this updating */
private synchronized void updateAverageEntrySize(int toSendSize) {
averageEntrySize = (int) (averageEntrySize * AVERAGE_ENTRY_SIZE_RATIO
+ (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize);
}

/**
* Callback for recovery of a single ledger fragment. Once the fragment has
* had all entries replicated, update the ensemble in zookeeper. Once
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
// option to limit stats logging
public static final String LIMIT_STATS_LOGGING = "limitStatsLogging";

protected static final String REPLICATION_RATE_BY_BYTES = "replicationRateByBytes";

protected AbstractConfiguration() {
super();
if (READ_SYSTEM_PROPERTIES) {
Expand Down Expand Up @@ -1205,6 +1207,28 @@ public T setLimitStatsLogging(boolean limitStatsLogging) {
return getThis();
}

/**
* Get the bytes rate of re-replication.
* Default value is -1 which it means entries will replicated without any throttling activity.
*
* @return bytes rate of re-replication.
*/
public int getReplicationRateByBytes() {
return getInt(REPLICATION_RATE_BY_BYTES, -1);
}

/**
* Set the bytes rate of re-replication.
*
* @param rate bytes rate of re-replication.
*
* @return ClientConfiguration
*/
public T setReplicationRateByBytes(int rate) {
this.setProperty(REPLICATION_RATE_BY_BYTES, rate);
return getThis();
}

/**
* Trickery to allow inheritance with fluent style.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,30 +200,6 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
"clientConnectBookieUnavailableLogThrottling";

protected static final String REPLICATION_RATE_BY_BYTES = "replicationRateByBytes";

/**
* Get the bytes rate of re-replication.
* Default value is -1 which it means entries will replicated without any throttling activity.
*
* @return bytes rate of re-replication.
*/
public int getReplicationRateByBytes() {
return getInt(REPLICATION_RATE_BY_BYTES, -1);
}

/**
* Set the bytes rate of re-replication.
*
* @param rate bytes rate of re-replication.
*
* @return ClientConfiguration
*/
public ClientConfiguration setReplicationRateByBytes(int rate) {
this.setProperty(REPLICATION_RATE_BY_BYTES, rate);
return this;
}

/**
* Construct a default client-side configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
"auditorMaxNumberOfConcurrentOpenLedgerOperations";
protected static final String AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC =
"auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec";
protected static final String REPLICATION_RATE_BY_BYTES = "replicationRateByBytes";
protected static final String IN_FLIGHT_READ_ENTRY_NUM_IN_LEDGER_CHECKER = "inFlightReadEntryNumInLedgerChecker";


Expand Down Expand Up @@ -3718,16 +3717,6 @@ public ServerConfiguration setAuthorizedRoles(String roles) {
return this;
}

/**
* Get the bytes rate of re-replication.
* Default value is -1 which it means entries will replicated without any throttling activity.
*
* @return bytes rate of re-replication.
*/
public int getReplicationRateByBytes() {
return getInt(REPLICATION_RATE_BY_BYTES, -1);
}

/**
* Get in flight read entry number when ledger checker.
* Default value is -1 which it is unlimited when ledger checker.
Expand All @@ -3738,18 +3727,6 @@ public int getInFlightReadEntryNumInLedgerChecker(){
return getInt(IN_FLIGHT_READ_ENTRY_NUM_IN_LEDGER_CHECKER, -1);
}

/**
* Set the rate of re-replication.
*
* @param rate bytes rate of re-replication.
*
* @return ServerConfiguration
*/
public ServerConfiguration setReplicationRateByBytes(int rate) {
setProperty(REPLICATION_RATE_BY_BYTES, rate);
return this;
}

/**
* Enabled data integrity checker.
* The data integrity checker checks that the bookie has all the entries which
Expand Down

0 comments on commit 7f31748

Please sign in to comment.