Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PAN-2427] Prep chain downloader for branch by abstraction #1194

Merged
merged 12 commits into from Apr 2, 2019
Merged
Expand Up @@ -12,250 +12,11 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
import tech.pegasys.pantheon.ethereum.eth.manager.task.WaitForPeersTask;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ChainDownloader<C> {
private static final Logger LOG = LogManager.getLogger();

private final SynchronizerConfiguration config;
private final EthContext ethContext;
private final SyncState syncState;
private final SyncTargetManager<C> syncTargetManager;
private final CheckpointHeaderManager<C> checkpointHeaderManager;
private final BlockImportTaskFactory blockImportTaskFactory;
private final MetricsSystem metricsSystem;
private final CompletableFuture<Void> downloadFuture = new CompletableFuture<>();

private int chainSegmentTimeouts = 0;

private final AtomicBoolean started = new AtomicBoolean(false);
private CompletableFuture<?> currentTask;

public ChainDownloader(
final SynchronizerConfiguration config,
final EthContext ethContext,
final SyncState syncState,
final SyncTargetManager<C> syncTargetManager,
final CheckpointHeaderManager<C> checkpointHeaderManager,
final BlockImportTaskFactory blockImportTaskFactory,
final MetricsSystem metricsSystem) {
this.metricsSystem = metricsSystem;
this.config = config;
this.ethContext = ethContext;

this.syncState = syncState;
this.syncTargetManager = syncTargetManager;
this.checkpointHeaderManager = checkpointHeaderManager;
this.blockImportTaskFactory = blockImportTaskFactory;
}

public CompletableFuture<Void> start() {
if (started.compareAndSet(false, true)) {
executeDownload();
return downloadFuture;
} else {
throw new IllegalStateException(
"Attempt to start an already started " + this.getClass().getSimpleName() + ".");
}
}

@VisibleForTesting
public CompletableFuture<?> getCurrentTask() {
return currentTask;
}

private void executeDownload() {
if (downloadFuture.isDone()) {
return;
}
// Find target, pull checkpoint headers, import, repeat
currentTask =
waitForPeers()
.thenCompose(r -> syncTargetManager.findSyncTarget(syncState.syncTarget()))
.thenApply(this::updateSyncState)
.thenCompose(this::pullCheckpointHeaders)
.thenCompose(this::importBlocks)
.thenCompose(r -> checkSyncTarget())
.whenComplete(
(r, t) -> {
if (t != null) {
final Throwable rootCause = ExceptionUtils.rootCause(t);
if (rootCause instanceof CancellationException) {
LOG.trace("Download cancelled", t);
} else if (rootCause instanceof InvalidBlockException) {
LOG.debug("Invalid block downloaded", t);
} else if (rootCause instanceof EthTaskException) {
LOG.debug(rootCause.toString());
} else if (rootCause instanceof InterruptedException) {
LOG.trace("Interrupted while downloading chain", rootCause);
} else {
LOG.error("Error encountered while downloading", t);
}
// On error, wait a bit before retrying
ethContext
.getScheduler()
.scheduleFutureTask(this::executeDownload, Duration.ofSeconds(2));
} else if (syncTargetManager.shouldContinueDownloading()) {
executeDownload();
} else {
LOG.info("Chain download complete");
downloadFuture.complete(null);
}
});
}

private SyncTarget updateSyncState(final SyncTarget newTarget) {
if (isSameAsCurrentTarget(newTarget)) {
return syncState.syncTarget().get();
}
return syncState.setSyncTarget(newTarget.peer(), newTarget.commonAncestor());
}

private Boolean isSameAsCurrentTarget(final SyncTarget newTarget) {
return syncState
.syncTarget()
.map(currentTarget -> currentTarget.equals(newTarget))
.orElse(false);
}

private CompletableFuture<List<BlockHeader>> pullCheckpointHeaders(final SyncTarget syncTarget) {
return syncTarget.peer().isDisconnected()
? CompletableFuture.completedFuture(emptyList())
: checkpointHeaderManager.pullCheckpointHeaders(syncTarget);
}

private CompletableFuture<?> waitForPeers() {
return WaitForPeersTask.create(ethContext, 1, metricsSystem).run();
}

private CompletableFuture<Void> checkSyncTarget() {
final Optional<SyncTarget> maybeSyncTarget = syncState.syncTarget();
if (!maybeSyncTarget.isPresent()) {
// No sync target, so nothing to check.
return CompletableFuture.completedFuture(null);
}

final SyncTarget syncTarget = maybeSyncTarget.get();
if (syncTargetManager.shouldSwitchSyncTarget(syncTarget)) {
LOG.info("Better sync target found, clear current sync target: {}.", syncTarget);
clearSyncTarget(syncTarget);
return CompletableFuture.completedFuture(null);
}
if (finishedSyncingToCurrentTarget(syncTarget)) {
LOG.info("Finished syncing to target: {}.", syncTarget);
clearSyncTarget(syncTarget);
// Wait a bit before checking for a new sync target
final CompletableFuture<Void> future = new CompletableFuture<>();
ethContext
.getScheduler()
.scheduleFutureTask(() -> future.complete(null), Duration.ofSeconds(10));
return future;
}
return CompletableFuture.completedFuture(null);
}

private boolean finishedSyncingToCurrentTarget(final SyncTarget syncTarget) {
return !syncTargetManager.syncTargetCanProvideMoreBlocks(syncTarget)
|| checkpointHeaderManager.checkpointsHaveTimedOut()
|| chainSegmentsHaveTimedOut();
}

private boolean chainSegmentsHaveTimedOut() {
return chainSegmentTimeouts >= config.downloaderChainSegmentTimeoutsPermitted();
}

private void clearSyncTarget() {
syncState.syncTarget().ifPresent(this::clearSyncTarget);
}

private void clearSyncTarget(final SyncTarget syncTarget) {
chainSegmentTimeouts = 0;
checkpointHeaderManager.clearSyncTarget();
syncState.clearSyncTarget();
}

private CompletableFuture<List<Block>> importBlocks(final List<BlockHeader> checkpointHeaders) {
if (checkpointHeaders.isEmpty()) {
// No checkpoints to download
return CompletableFuture.completedFuture(emptyList());
}

final CompletableFuture<List<Block>> importedBlocks =
blockImportTaskFactory.importBlocksForCheckpoints(checkpointHeaders);

return importedBlocks.whenComplete(
(r, t) -> {
t = ExceptionUtils.rootCause(t);
if (t instanceof InvalidBlockException) {
// Blocks were invalid, meaning our checkpoints are wrong
// Reset sync target
final Optional<SyncTarget> maybeSyncTarget = syncState.syncTarget();
maybeSyncTarget.ifPresent(
target -> target.peer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL));
final String peerDescriptor =
maybeSyncTarget
.map(SyncTarget::peer)
.map(EthPeer::toString)
.orElse("(unknown - already disconnected)");
LOG.warn(
"Invalid block discovered while downloading from peer {}. Disconnect.",
peerDescriptor);
clearSyncTarget();
} else if (t != null || r.isEmpty()) {
if (t != null) {
final Throwable rootCause = ExceptionUtils.rootCause(t);
if (rootCause instanceof EthTaskException) {
LOG.debug(rootCause.toString());
} else if (rootCause instanceof InterruptedException) {
LOG.trace("Interrupted while importing blocks", rootCause);
} else {
LOG.error("Encountered error importing blocks", t);
}
}
if (checkpointHeaderManager.clearImportedCheckpointHeaders()) {
chainSegmentTimeouts = 0;
}
if (t instanceof TimeoutException || r != null) {
// Download timed out, or returned no new blocks
chainSegmentTimeouts++;
}
} else {
chainSegmentTimeouts = 0;
public interface ChainDownloader {

final BlockHeader lastImportedCheckpoint =
checkpointHeaderManager.allCheckpointsImported();
syncState.setCommonAncestor(lastImportedCheckpoint);
}
});
}
CompletableFuture<Void> start();

public interface BlockImportTaskFactory {
CompletableFuture<List<Block>> importBlocksForCheckpoints(
final List<BlockHeader> checkpointHeaders);
}
void cancel();
}
Expand Up @@ -18,9 +18,10 @@
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastDownloaderFactory;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncException;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullDownloaderFactory;
import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
Expand All @@ -43,10 +44,10 @@ public class DefaultSynchronizer<C> implements Synchronizer {

private final SyncState syncState;
private final AtomicBoolean started = new AtomicBoolean(false);
private final BlockPropagationManager<C> blockPropagationManager;
private final FullSyncDownloader<C> fullSyncDownloader;
private final Optional<FastSynchronizer<C>> fastSynchronizer;
private final Optional<FastDownloaderFactory<C>> fastSynchronizer;
private final Subscribers<SyncStatusListener> syncStatusListeners = new Subscribers<>();
private final BlockPropagationManager<C> blockPropagationManager;
private final FullDownloaderFactory<C> fullDownloaderFactory;

public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
Expand All @@ -60,6 +61,13 @@ public DefaultSynchronizer(
final MetricsSystem metricsSystem) {
this.syncState = syncState;

ChainHeadTracker.trackChainHeadForPeers(
ethContext,
protocolSchedule,
protocolContext.getBlockchain(),
this::calculateTrailingPeerRequirements,
metricsSystem);

this.blockPropagationManager =
new BlockPropagationManager<>(
syncConfig,
Expand All @@ -71,19 +79,11 @@ public DefaultSynchronizer(
metricsSystem,
new BlockBroadcaster(ethContext));

ChainHeadTracker.trackChainHeadForPeers(
ethContext,
protocolSchedule,
protocolContext.getBlockchain(),
this::calculateTrailingPeerRequirements,
metricsSystem);

this.fullSyncDownloader =
new FullSyncDownloader<>(
this.fullDownloaderFactory =
new FullDownloaderFactory<>(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);

fastSynchronizer =
FastSynchronizer.create(
this.fastSynchronizer =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fastSynchronizer -> fastDownloaderFactory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

FastDownloaderFactory.create(
syncConfig,
dataDirectory,
protocolSchedule,
Expand All @@ -97,8 +97,8 @@ public DefaultSynchronizer(

private TrailingPeerRequirements calculateTrailingPeerRequirements() {
return fastSynchronizer
.flatMap(FastSynchronizer::calculateTrailingPeerRequirements)
.orElseGet(fullSyncDownloader::calculateTrailingPeerRequirements);
.flatMap(FastDownloaderFactory::calculateTrailingPeerRequirements)
.orElseGet(fullDownloaderFactory::calculateTrailingPeerRequirements);
}

@Override
Expand Down Expand Up @@ -128,15 +128,15 @@ private void handleFastSyncResult(final FastSyncState result, final Throwable er
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
}
fastSynchronizer.ifPresent(FastSynchronizer::deleteFastSyncState);
fastSynchronizer.ifPresent(FastDownloaderFactory::deleteFastSyncState);

startFullSync();
}

private void startFullSync() {
LOG.info("Starting synchronizer.");
blockPropagationManager.start();
fullSyncDownloader.start();
fullDownloaderFactory.start();
}

@Override
Expand Down