Skip to content

Commit

Permalink
add retry logic on sync pipeline for rocksdb issue (#6004)
Browse files Browse the repository at this point in the history
* add retry logic for sync pipeline with rocksdb issue

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
  • Loading branch information
matkt committed Oct 24, 2023
1 parent 9d9fe8c commit c839a3b
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 60 deletions.
1 change: 1 addition & 0 deletions ethereum/eth/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dependencies {
implementation 'io.tmio:tuweni-bytes'
implementation 'io.tmio:tuweni-units'
implementation 'io.tmio:tuweni-rlp'
implementation 'org.rocksdb:rocksdbjni'

annotationProcessor "org.immutables:value"
implementation "org.immutables:value-annotations"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync;

import org.hyperledger.besu.plugin.services.exception.StorageException;

import java.util.EnumSet;
import java.util.Optional;

import org.rocksdb.RocksDBException;
import org.rocksdb.Status;

public final class StorageExceptionManager {

private static final EnumSet<Status.Code> RETRYABLE_STATUS_CODES =
EnumSet.of(Status.Code.TimedOut, Status.Code.TryAgain, Status.Code.Busy);

private static final long ERROR_THRESHOLD = 1000;

private static long retryableErrorCounter;
/**
* Determines if an operation can be retried based on the error received. This method checks if
* the cause of the StorageException is a RocksDBException. If it is, it retrieves the status code
* of the RocksDBException and checks if it is contained in the list of retryable {@link
* StorageExceptionManager.RETRYABLE_STATUS_CODES} status codes.
*
* @param e the StorageException to check
* @return true if the operation can be retried, false otherwise
*/
public static boolean canRetryOnError(final StorageException e) {
return Optional.of(e.getCause())
.filter(z -> z instanceof RocksDBException)
.map(RocksDBException.class::cast)
.map(RocksDBException::getStatus)
.map(Status::getCode)
.map(RETRYABLE_STATUS_CODES::contains)
.map(
result -> {
retryableErrorCounter++;
return result;
})
.orElse(false);
}

public static long getRetryableErrorCounter() {
return retryableErrorCounter;
}

public static boolean errorCountAtThreshold() {
return retryableErrorCounter % ERROR_THRESHOLD == 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,26 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate;

import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError;
import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.errorCountAtThreshold;
import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.getRetryableErrorCounter;

import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.services.tasks.Task;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistDataStep {

private static final Logger LOG = LoggerFactory.getLogger(PersistDataStep.class);

private final WorldStateStorage worldStateStorage;

public PersistDataStep(final WorldStateStorage worldStateStorage) {
Expand All @@ -33,24 +44,40 @@ public List<Task<NodeDataRequest>> persist(
final List<Task<NodeDataRequest>> tasks,
final BlockHeader blockHeader,
final WorldDownloadState<NodeDataRequest> downloadState) {
final Updater updater = worldStateStorage.updater();
tasks.stream()
.map(
task -> {
enqueueChildren(task, downloadState);
return task;
})
.map(Task::getData)
.filter(request -> request.getData() != null)
.forEach(
request -> {
if (isRootState(blockHeader, request)) {
downloadState.setRootNodeData(request.getData());
} else {
request.persist(updater);
}
});
updater.commit();
try {
final Updater updater = worldStateStorage.updater();
tasks.stream()
.map(
task -> {
enqueueChildren(task, downloadState);
return task;
})
.map(Task::getData)
.filter(request -> request.getData() != null)
.forEach(
request -> {
if (isRootState(blockHeader, request)) {
downloadState.setRootNodeData(request.getData());
} else {
request.persist(updater);
}
});
updater.commit();
} catch (StorageException storageException) {
if (canRetryOnError(storageException)) {
// We reset the task by setting it to null. This way, it is considered as failed by the
// pipeline, and it will attempt to execute it again later.
if (errorCountAtThreshold()) {
LOG.info(
"Encountered {} retryable RocksDB errors, latest error message {}",
getRetryableErrorCounter(),
storageException.getMessage());
}
tasks.forEach(nodeDataRequestTask -> nodeDataRequestTask.getData().setData(null));
} else {
throw storageException;
}
}
return tasks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;

import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError;
import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.errorCountAtThreshold;
import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.getRetryableErrorCounter;

import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.TrieNodeHealingRequest;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.services.pipeline.Pipe;
import org.hyperledger.besu.services.tasks.Task;
Expand All @@ -27,9 +32,12 @@
import java.util.stream.Stream;

import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadLocalDataStep {

private static final Logger LOG = LoggerFactory.getLogger(LoadLocalDataStep.class);
private final WorldStateStorage worldStateStorage;
private final SnapWorldDownloadState downloadState;
private final SnapSyncProcessState snapSyncState;
Expand Down Expand Up @@ -58,19 +66,35 @@ public Stream<Task<SnapDataRequest>> loadLocalDataTrieNode(
final Task<SnapDataRequest> task, final Pipe<Task<SnapDataRequest>> completedTasks) {
final TrieNodeHealingRequest request = (TrieNodeHealingRequest) task.getData();
// check if node is already stored in the worldstate
if (snapSyncState.hasPivotBlockHeader()) {
Optional<Bytes> existingData = request.getExistingData(downloadState, worldStateStorage);
if (existingData.isPresent()) {
existingNodeCounter.inc();
request.setData(existingData.get());
request.setRequiresPersisting(false);
final WorldStateStorage.Updater updater = worldStateStorage.updater();
request.persist(
worldStateStorage, updater, downloadState, snapSyncState, snapSyncConfiguration);
updater.commit();
downloadState.enqueueRequests(request.getRootStorageRequests(worldStateStorage));
completedTasks.put(task);
return Stream.empty();
try {
if (snapSyncState.hasPivotBlockHeader()) {
Optional<Bytes> existingData = request.getExistingData(downloadState, worldStateStorage);
if (existingData.isPresent()) {
existingNodeCounter.inc();
request.setData(existingData.get());
request.setRequiresPersisting(false);
final WorldStateStorage.Updater updater = worldStateStorage.updater();
request.persist(
worldStateStorage, updater, downloadState, snapSyncState, snapSyncConfiguration);
updater.commit();
downloadState.enqueueRequests(request.getRootStorageRequests(worldStateStorage));
completedTasks.put(task);
return Stream.empty();
}
}
} catch (StorageException storageException) {
if (canRetryOnError(storageException)) {
// We reset the task by setting it to null. This way, it is considered as failed by the
// pipeline, and it will attempt to execute it again later.
if (errorCountAtThreshold()) {
LOG.info(
"Encountered {} retryable RocksDB errors, latest error message {}",
getRetryableErrorCounter(),
storageException.getMessage());
}
task.getData().clear();
} else {
throw storageException;
}
}
return Stream.of(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,25 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;

import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError;
import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.errorCountAtThreshold;
import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.getRetryableErrorCounter;

import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.TrieNodeHealingRequest;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.services.tasks.Task;

import java.util.List;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistDataStep {
private static final Logger LOG = LoggerFactory.getLogger(PersistDataStep.class);

private final SnapSyncProcessState snapSyncState;
private final WorldStateStorage worldStateStorage;
Expand All @@ -43,41 +52,58 @@ public PersistDataStep(
}

public List<Task<SnapDataRequest>> persist(final List<Task<SnapDataRequest>> tasks) {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
for (Task<SnapDataRequest> task : tasks) {
if (task.getData().isResponseReceived()) {
// enqueue child requests
final Stream<SnapDataRequest> childRequests =
task.getData().getChildRequests(downloadState, worldStateStorage, snapSyncState);
if (!(task.getData() instanceof TrieNodeHealingRequest)) {
enqueueChildren(childRequests);
} else {
if (!task.getData().isExpired(snapSyncState)) {
try {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
for (Task<SnapDataRequest> task : tasks) {
if (task.getData().isResponseReceived()) {
// enqueue child requests
final Stream<SnapDataRequest> childRequests =
task.getData().getChildRequests(downloadState, worldStateStorage, snapSyncState);
if (!(task.getData() instanceof TrieNodeHealingRequest)) {
enqueueChildren(childRequests);
} else {
continue;
if (!task.getData().isExpired(snapSyncState)) {
enqueueChildren(childRequests);
} else {
continue;
}
}
}

// persist nodes
final int persistedNodes =
task.getData()
.persist(
worldStateStorage,
updater,
downloadState,
snapSyncState,
snapSyncConfiguration);
if (persistedNodes > 0) {
if (task.getData() instanceof TrieNodeHealingRequest) {
downloadState.getMetricsManager().notifyTrieNodesHealed(persistedNodes);
} else {
downloadState.getMetricsManager().notifyNodesGenerated(persistedNodes);
// persist nodes
final int persistedNodes =
task.getData()
.persist(
worldStateStorage,
updater,
downloadState,
snapSyncState,
snapSyncConfiguration);
if (persistedNodes > 0) {
if (task.getData() instanceof TrieNodeHealingRequest) {
downloadState.getMetricsManager().notifyTrieNodesHealed(persistedNodes);
} else {
downloadState.getMetricsManager().notifyNodesGenerated(persistedNodes);
}
}
}
}
updater.commit();
} catch (StorageException storageException) {
if (canRetryOnError(storageException)) {
// We reset the task by setting it to null. This way, it is considered as failed by the
// pipeline, and it will attempt to execute it again later. not display all the retryable
// issues
if (errorCountAtThreshold()) {
LOG.info(
"Encountered {} retryable RocksDB errors, latest error message {}",
getRetryableErrorCounter(),
storageException.getMessage());
}
tasks.forEach(task -> task.getData().clear());
} else {
throw storageException;
}
}
updater.commit();
return tasks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ public SnapWorldStateDownloadProcess build() {
"step",
"action");

/*
The logic and intercommunication of different pipelines can be summarized as follows:
1. Account Data Pipeline (fetchAccountDataPipeline): This process starts with downloading the leaves of the account tree in ranges, with multiple ranges being processed simultaneously.
If the downloaded accounts are smart contracts, tasks are created in the storage pipeline to download the storage tree of the smart contract, and in the code download pipeline for the smart contract.
2. Storage Data Pipeline (fetchStorageDataPipeline): Running parallel to the account data pipeline, this pipeline downloads the storage of smart contracts.
If all slots cannot be downloaded at once, tasks are created in the fetchLargeStorageDataPipeline to download the storage by range, allowing parallelization of large account downloads.
3. Code Data Pipeline (fetchCodePipeline): This pipeline, running concurrently with the account and storage data pipelines, is responsible for downloading the code of the smart contracts.
4. Large Storage Data Pipeline (fetchLargeStorageDataPipeline): This pipeline is used when the storage data for a smart contract is too large to be downloaded at once.
It enables the storage data to be downloaded in ranges, similar to the account data.
5. Healing Phase: Initiated after all other pipelines have completed their tasks, this phase ensures the integrity and completeness of the downloaded data.
*/
final Pipeline<Task<SnapDataRequest>> completionPipeline =
PipelineBuilder.<Task<SnapDataRequest>>createPipeline(
"requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request")
Expand Down

0 comments on commit c839a3b

Please sign in to comment.