Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backward sync log UX improvements #4655

Merged
merged 10 commits into from
Nov 15, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Upgrade RocksDB version from 7.6.0 to 7.7.3
- Added new RPC endpoints `debug_setHead` & `debug_replayBlock [4580](https://github.com/hyperledger/besu/pull/4580)
- Upgrade OpenTelemetry to version 1.19.0 [#3675](https://github.com/hyperledger/besu/pull/3675)
- Backward sync log UX improvements [#4655](https://github.com/hyperledger/besu/pull/4655)

### Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,11 @@ public Optional<BlockHeader> getOrSyncHeaderByHash(
}

private Void logSyncException(final Hash blockHash, final Throwable exception) {
LOG.warn("Sync to block hash " + blockHash.toHexString() + " failed", exception.getMessage());
debugLambda(
LOG,
"Sync to block hash {} failed, reason {}",
blockHash::toHexString,
exception::getMessage);
return null;
}

Expand Down Expand Up @@ -439,7 +443,7 @@ public ForkchoiceResult updateForkChoice(

if (newHead.getNumber() < blockchain.getChainHeadBlockNumber()
&& isDescendantOf(newHead, blockchain.getChainHeadHeader())) {
LOG.info("Ignoring update to old head");
debugLambda(LOG, "Ignoring update to old head {}", newHead::toLogString);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Ignoring head update {}", newHead

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can be specific here, since we know that FcU is trying to set the head to an old block in the same chain of the current head and so we ignore it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering if it could be confusing that it says "old head" but what is inserted is the newHead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about renaming newHead to something like tentativeNewHead or possibleNewHead to state that it is not yet confirmed?

return ForkchoiceResult.withIgnoreUpdateToOldHead(newHead);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ public JsonRpcResponse syncResponse(final JsonRpcRequestContext requestContext)

final var block =
new Block(newBlockHeader, new BlockBody(transactions, Collections.emptyList()));
final String warningMessage = "Sync to block " + block.toLogString() + " failed";

if (mergeContext.get().isSyncing() || parentHeader.isEmpty()) {
LOG.debug(
Expand All @@ -192,7 +191,11 @@ public JsonRpcResponse syncResponse(final JsonRpcRequestContext requestContext)
.appendNewPayloadToSync(block)
.exceptionally(
exception -> {
LOG.warn(warningMessage, exception.getMessage());
debugLambda(
LOG,
"Sync to block {} failed, reason {}",
block::toLogString,
exception::getMessage);
return null;
});
return respondWith(reqId, blockParam, null, SYNCING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,11 @@ protected void handleTaskError(final Throwable error) {
this::getAssignedPeer,
this::getRetryCount);
} else {
LOG.warn(
"Failed to get block {} after {} retries", logBlockNumberMaybeHash(), getRetryCount());
debugLambda(
LOG,
"Failed to get block {} after {} retries",
this::logBlockNumberMaybeHash,
this::getRetryCount);
}
super.handleTaskError(error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,15 @@ public class BackwardSyncContext {
private static final Logger LOG = LoggerFactory.getLogger(BackwardSyncContext.class);
public static final int BATCH_SIZE = 200;
private static final int DEFAULT_MAX_RETRIES = 20;

private static final long MILLIS_DELAY_BETWEEN_PROGRESS_LOG = 10_000L;
private static final long DEFAULT_MILLIS_BETWEEN_RETRIES = 5000;

protected final ProtocolContext protocolContext;
private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final SyncState syncState;

private final AtomicReference<CompletableFuture<Void>> currentBackwardSyncFuture =
new AtomicReference<>();
private final AtomicReference<Status> currentBackwardSyncStatus = new AtomicReference<>();
private final BackwardChain backwardChain;
private int batchSize = BATCH_SIZE;
private Optional<Hash> maybeFinalized = Optional.empty();
Expand Down Expand Up @@ -105,8 +103,8 @@ public BackwardSyncContext(
}

public synchronized boolean isSyncing() {
return Optional.ofNullable(currentBackwardSyncFuture.get())
.map(CompletableFuture::isDone)
return Optional.ofNullable(currentBackwardSyncStatus.get())
.map(status -> status.currentFuture.isDone())
.orElse(Boolean.FALSE);
}

Expand All @@ -124,33 +122,51 @@ public synchronized void updateHeads(final Hash head, final Hash finalizedBlockH
}

public synchronized CompletableFuture<Void> syncBackwardsUntil(final Hash newBlockHash) {
Optional<CompletableFuture<Void>> maybeFuture =
Optional.ofNullable(this.currentBackwardSyncFuture.get());
Optional<Status> maybeCurrentStatus = Optional.ofNullable(this.currentBackwardSyncStatus.get());
if (isTrusted(newBlockHash)) {
return maybeFuture.orElseGet(() -> CompletableFuture.completedFuture(null));
return maybeCurrentStatus
.map(
status -> {
backwardChain
.getBlock(newBlockHash)
.ifPresent(block -> status.updateTargetHeight(block.getHeader().getNumber()));
return status.currentFuture;
})
.orElseGet(() -> CompletableFuture.completedFuture(null));
}
backwardChain.addNewHash(newBlockHash);
return maybeFuture.orElseGet(
() -> {
CompletableFuture<Void> future = prepareBackwardSyncFutureWithRetry();
this.currentBackwardSyncFuture.set(future);
return future;
});
return maybeCurrentStatus
.map(Status::getCurrentFuture)
.orElseGet(
() -> {
LOG.info("Starting a new backward sync session");
Status status = new Status(prepareBackwardSyncFutureWithRetry());
this.currentBackwardSyncStatus.set(status);
return status.currentFuture;
});
}

public synchronized CompletableFuture<Void> syncBackwardsUntil(final Block newPivot) {
Optional<CompletableFuture<Void>> maybeFuture =
Optional.ofNullable(this.currentBackwardSyncFuture.get());
Optional<Status> maybeCurrentStatus = Optional.ofNullable(this.currentBackwardSyncStatus.get());
if (isTrusted(newPivot.getHash())) {
return maybeFuture.orElseGet(() -> CompletableFuture.completedFuture(null));
return maybeCurrentStatus
.map(Status::getCurrentFuture)
.orElseGet(() -> CompletableFuture.completedFuture(null));
}
backwardChain.appendTrustedBlock(newPivot);
return maybeFuture.orElseGet(
() -> {
CompletableFuture<Void> future = prepareBackwardSyncFutureWithRetry();
this.currentBackwardSyncFuture.set(future);
return future;
});
return maybeCurrentStatus
.map(Status::getCurrentFuture)
.orElseGet(
() -> {
LOG.info("Starting a new backward sync session");
LOG.info("Backward sync target block is {}", newPivot.toLogString());
Status status = new Status(prepareBackwardSyncFutureWithRetry());
status.setSyncRange(
getProtocolContext().getBlockchain().getChainHeadBlockNumber(),
newPivot.getHeader().getNumber());
this.currentBackwardSyncStatus.set(status);
return status.currentFuture;
});
}

private boolean isTrusted(final Hash hash) {
Expand All @@ -168,8 +184,9 @@ private CompletableFuture<Void> prepareBackwardSyncFutureWithRetry() {
return prepareBackwardSyncFutureWithRetry(maxRetries)
.handle(
(unused, throwable) -> {
this.currentBackwardSyncFuture.set(null);
this.currentBackwardSyncStatus.set(null);
if (throwable != null) {
LOG.info("Current backward sync session failed, it will be restarted");
throw extractBackwardSyncException(throwable)
.orElse(new BackwardSyncException(throwable));
}
Expand Down Expand Up @@ -201,8 +218,8 @@ protected void processException(final Throwable throwable) {
.ifPresentOrElse(
backwardSyncException -> {
if (backwardSyncException.shouldRestart()) {
LOG.info(
"Backward sync failed ({}). Current Peers: {}. Retrying in {} milliseconds...",
LOG.debug(
"Backward sync failed ({}). Current Peers: {}. Retrying in {} milliseconds",
throwable.getMessage(),
ethContext.getEthPeers().peerCount(),
millisBetweenRetries);
Expand All @@ -213,8 +230,8 @@ protected void processException(final Throwable throwable) {
}
},
() -> {
LOG.warn(
"Backward sync failed ({}). Current Peers: {}. Retrying in {} milliseconds...",
LOG.debug(
"Backward sync failed ({}). Current Peers: {}. Retrying in {} milliseconds",
throwable.getMessage(),
ethContext.getEthPeers().peerCount(),
millisBetweenRetries);
Expand Down Expand Up @@ -278,10 +295,6 @@ public boolean isReady() {
&& syncState.isInitialSyncPhaseDone();
}

public CompletableFuture<Void> stop() {
return currentBackwardSyncFuture.get();
}

public void subscribeBadChainListener(final BadChainListener badChainListener) {
badChainListeners.subscribe(badChainListener);
}
Expand Down Expand Up @@ -316,6 +329,7 @@ protected Void saveBlock(final Block block) {
.getBlockchain()
.appendBlock(block, optResult.getYield().get().getReceipts());
possiblyMoveHead(block);
logBlockImportProgress(block.getHeader().getNumber());
} else {
emitBadChainEvent(block);
throw new BackwardSyncException(
Expand Down Expand Up @@ -365,6 +379,10 @@ public Optional<Hash> findMaybeFinalized() {
.findFirst();
}

public Status getStatus() {
return currentBackwardSyncStatus.get();
}

private void emitBadChainEvent(final Block badBlock) {
final List<Block> badBlockDescendants = new ArrayList<>();
final List<BlockHeader> badBlockHeaderDescendants = new ArrayList<>();
Expand All @@ -385,4 +403,75 @@ private void emitBadChainEvent(final Block badBlock) {
badChainListeners.forEach(
listener -> listener.onBadChain(badBlock, badBlockDescendants, badBlockHeaderDescendants));
}

private void logBlockImportProgress(final long currImportedHeight) {
final Status currentStatus = getStatus();
final long targetHeight = currentStatus.getTargetChainHeight();
final long initialHeight = currentStatus.getInitialChainHeight();
final long estimatedTotal = targetHeight - initialHeight;
final long imported = currImportedHeight - initialHeight;

final float completedPercentage = 100.0f * imported / estimatedTotal;

if (completedPercentage < 100.0f) {
if (currentStatus.progressLogDue()) {
LOG.info(
String.format(
"Backward sync phase 2 of 2, %.2f%% completed, imported %d blocks of at least %d (current head %d, target head %d). Peers: %d",
completedPercentage,
imported,
estimatedTotal,
currImportedHeight,
currentStatus.getTargetChainHeight(),
getEthContext().getEthPeers().peerCount()));
}
} else {
LOG.info(
String.format(
"Backward sync phase 2 of 2 completed, imported a total of %d blocks. Peers: %d",
imported, getEthContext().getEthPeers().peerCount()));
}
}

static class Status {
private final CompletableFuture<Void> currentFuture;
private long targetChainHeight;
private long initialChainHeight;

private static long lastLogAt = 0;

public Status(final CompletableFuture<Void> currentFuture) {
this.currentFuture = currentFuture;
}

public void setSyncRange(final long initialHeight, final long targetHeight) {
initialChainHeight = initialHeight;
targetChainHeight = targetHeight;
}

public void updateTargetHeight(final long newTargetHeight) {
targetChainHeight = newTargetHeight;
}

public boolean progressLogDue() {
final long now = System.currentTimeMillis();
if (now - lastLogAt > MILLIS_DELAY_BETWEEN_PROGRESS_LOG) {
lastLogAt = now;
return true;
}
return false;
}

public CompletableFuture<Void> getCurrentFuture() {
return currentFuture;
}

public long getTargetChainHeight() {
return targetChainHeight;
}

public long getInitialChainHeight() {
return initialChainHeight;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;

import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
Expand Down Expand Up @@ -61,7 +60,7 @@ protected Hash possibleRestoreOldNodes(final BlockHeader firstAncestor) {
@VisibleForTesting
protected CompletableFuture<List<BlockHeader>> requestHeaders(final Hash hash) {
final int batchSize = context.getBatchSize();
debugLambda(LOG, "Requesting header for hash {}", hash::toHexString);
LOG.debug("Requesting headers for hash {}, with batch size {}", hash, batchSize);

final RetryingGetHeadersEndingAtFromPeerByHashTask
retryingGetHeadersEndingAtFromPeerByHashTask =
Expand Down Expand Up @@ -101,12 +100,35 @@ protected Void saveHeaders(final List<BlockHeader> blockHeaders) {
for (BlockHeader blockHeader : blockHeaders) {
saveHeader(blockHeader);
}
infoLambda(
LOG,
"Saved headers {} -> {} (head: {})",
() -> blockHeaders.get(0).getNumber(),
() -> blockHeaders.get(blockHeaders.size() - 1).getNumber(),
() -> context.getProtocolContext().getBlockchain().getChainHead().getHeight());

logProgress(blockHeaders.get(blockHeaders.size() - 1).getNumber());

return null;
}

private void logProgress(final long currLowestDownloadedHeight) {
final long targetHeight = context.getStatus().getTargetChainHeight();
final long initialHeight = context.getStatus().getInitialChainHeight();
final long estimatedTotal = targetHeight - initialHeight;
final long downloaded = targetHeight - currLowestDownloadedHeight;

final float completedPercentage = 100.0f * downloaded / estimatedTotal;

if (completedPercentage < 100.0f) {
if (context.getStatus().progressLogDue()) {
LOG.info(
String.format(
"Backward sync phase 1 of 2, %.2f%% completed, downloaded %d headers of at least %d. Peers: %d",
completedPercentage,
downloaded,
estimatedTotal,
context.getEthContext().getEthPeers().peerCount()));
}
} else {
LOG.info(
String.format(
"Backward sync phase 1 of 2 completed, downloaded a total of %d headers. Peers: %d",
downloaded, context.getEthContext().getEthPeers().peerCount()));
}
}
}
Loading