Skip to content

Commit

Permalink
Fix bonsai getMutable method regression (#2934)
Browse files Browse the repository at this point in the history
Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
Co-authored-by: Fabio Di Fabio <fabio.difabio@consensys.net>
Co-authored-by: Justin Florentine <justin+github@florentine.us>
  • Loading branch information
3 people committed Oct 28, 2021
1 parent 40a0457 commit ffc8da3
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 32 deletions.
Expand Up @@ -194,34 +194,34 @@ public Optional<MutableWorldState> getMutable(final Hash rootHash, final Hash bl
BlockHeader targetHeader = blockchain.getBlockHeader(blockHash).get();
BlockHeader persistedHeader = maybePersistedHeader.get();
// roll back from persisted to even with target
Hash persistedBlockHash = fromPlugin(persistedHeader.getBlockHash());
Hash persistedBlockHash = persistedHeader.getBlockHash();
while (persistedHeader.getNumber() > targetHeader.getNumber()) {
LOG.debug("Rollback {}", persistedBlockHash);
rollBacks.add(getTrieLogLayer(persistedBlockHash).get());
persistedHeader =
blockchain.getBlockHeader(fromPlugin(persistedHeader.getParentHash())).get();
persistedHeader = blockchain.getBlockHeader(persistedHeader.getParentHash()).get();
persistedBlockHash = persistedHeader.getBlockHash();
}
// roll forward to target
Hash targetBlockHash = fromPlugin(targetHeader.getBlockHash());
Hash targetBlockHash = targetHeader.getBlockHash();
while (persistedHeader.getNumber() < targetHeader.getNumber()) {
LOG.debug("Rollforward {}", targetBlockHash);
rollForwards.add(getTrieLogLayer(targetBlockHash).get());
targetHeader =
blockchain.getBlockHeader(fromPlugin(targetHeader.getParentHash())).get();
targetBlockHash = fromPlugin(targetHeader.getBlockHash());
targetHeader = blockchain.getBlockHeader(targetHeader.getParentHash()).get();
targetBlockHash = targetHeader.getBlockHash();
}

// roll back in tandem until we hit a shared state
while (!persistedBlockHash.equals(targetBlockHash)) {
LOG.debug("Paired Rollback {}", persistedBlockHash);
LOG.debug("Paired Rollforward {}", targetBlockHash);
rollForwards.add(getTrieLogLayer(targetBlockHash).get());
targetHeader =
blockchain.getBlockHeader(fromPlugin(targetHeader.getParentHash())).get();
targetHeader = blockchain.getBlockHeader(targetHeader.getParentHash()).get();

rollBacks.add(getTrieLogLayer(persistedBlockHash).get());
persistedHeader =
blockchain.getBlockHeader(fromPlugin(persistedHeader.getParentHash())).get();
persistedHeader = blockchain.getBlockHeader(persistedHeader.getParentHash()).get();

targetBlockHash = targetHeader.getBlockHash();
persistedBlockHash = persistedHeader.getBlockHash();
}
}

Expand Down
Expand Up @@ -15,7 +15,7 @@
package org.hyperledger.besu.ethereum.eth.sync.fastsync;

import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.bonsai.BonsaiPersistedWorldState;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
Expand Down Expand Up @@ -80,7 +80,7 @@ public static Optional<FastSyncDownloader> create(
"Fast sync was requested, but cannot be enabled because the local blockchain is not empty.");
return Optional.empty();
}
if (worldStateStorage instanceof BonsaiPersistedWorldState) {
if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) {
worldStateStorage.clear();
}
final CachingTaskCollection<NodeDataRequest> taskCollection =
Expand Down Expand Up @@ -108,6 +108,7 @@ public static Optional<FastSyncDownloader> create(
ethContext,
syncState,
metricsSystem),
worldStateStorage,
worldStateDownloader,
fastSyncStateStorage,
taskCollection,
Expand Down
Expand Up @@ -16,11 +16,13 @@

import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;

import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.NodeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.StalledDownloadException;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.tasks.TaskCollection;
import org.hyperledger.besu.util.ExceptionUtils;

Expand All @@ -43,6 +45,7 @@ public class FastSyncDownloader {

private static final Logger LOG = LogManager.getLogger();
private final FastSyncActions fastSyncActions;
private final WorldStateStorage worldStateStorage;
private final WorldStateDownloader worldStateDownloader;
private final FastSyncStateStorage fastSyncStateStorage;
private final TaskCollection<NodeDataRequest> taskCollection;
Expand All @@ -53,12 +56,14 @@ public class FastSyncDownloader {

public FastSyncDownloader(
final FastSyncActions fastSyncActions,
final WorldStateStorage worldStateStorage,
final WorldStateDownloader worldStateDownloader,
final FastSyncStateStorage fastSyncStateStorage,
final TaskCollection<NodeDataRequest> taskCollection,
final Path fastSyncDataDirectory,
final FastSyncState initialFastSyncState) {
this.fastSyncActions = fastSyncActions;
this.worldStateStorage = worldStateStorage;
this.worldStateDownloader = worldStateDownloader;
this.fastSyncStateStorage = fastSyncStateStorage;
this.taskCollection = taskCollection;
Expand All @@ -75,6 +80,9 @@ public CompletableFuture<FastSyncState> start() {

private CompletableFuture<FastSyncState> start(final FastSyncState fastSyncState) {
LOG.info("Starting fast sync.");
if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) {
worldStateStorage.clear();
}
return exceptionallyCompose(
fastSyncActions
.waitForSuitablePeers(fastSyncState)
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.hyperledger.besu.ethereum.eth.sync.worldstate.NodeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.StalledDownloadException;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.services.tasks.TaskCollection;

import java.nio.file.Path;
Expand All @@ -52,6 +53,8 @@ public class FastSyncDownloaderTest {
@SuppressWarnings("unchecked")
private final FastSyncActions fastSyncActions = mock(FastSyncActions.class);

private final WorldStateStorage worldStateStorage = mock(WorldStateStorage.class);

private final WorldStateDownloader worldStateDownloader = mock(WorldStateDownloader.class);
private final FastSyncStateStorage storage = mock(FastSyncStateStorage.class);

Expand All @@ -65,6 +68,7 @@ public class FastSyncDownloaderTest {
private final FastSyncDownloader downloader =
new FastSyncDownloader(
fastSyncActions,
worldStateStorage,
worldStateDownloader,
storage,
taskCollection,
Expand Down Expand Up @@ -114,6 +118,7 @@ public void shouldResumeFastSync() {
final FastSyncDownloader resumedDownloader =
new FastSyncDownloader(
fastSyncActions,
worldStateStorage,
worldStateDownloader,
storage,
taskCollection,
Expand Down
Expand Up @@ -25,9 +25,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Statistics;
import org.rocksdb.TransactionDB;

public class RocksDBMetricsFactory {

Expand All @@ -54,7 +54,7 @@ public RocksDBMetricsFactory(
public RocksDBMetrics create(
final MetricsSystem metricsSystem,
final RocksDBConfiguration rocksDbConfiguration,
final TransactionDB db,
final OptimisticTransactionDB db,
final Statistics stats) {
final OperationTimer readLatency =
metricsSystem
Expand Down
Expand Up @@ -51,11 +51,11 @@
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.LRUCache;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.Status;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionDBOptions;
import org.rocksdb.WriteOptions;

Expand All @@ -72,7 +72,7 @@ public class RocksDBColumnarKeyValueStorage

private final DBOptions options;
private final TransactionDBOptions txOptions;
private final TransactionDB db;
private final OptimisticTransactionDB db;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Map<String, ColumnFamilyHandle> columnHandlesByName;
private final RocksDBMetrics metrics;
Expand Down Expand Up @@ -110,12 +110,8 @@ public RocksDBColumnarKeyValueStorage(
txOptions = new TransactionDBOptions();
final List<ColumnFamilyHandle> columnHandles = new ArrayList<>(columnDescriptors.size());
db =
TransactionDB.open(
options,
txOptions,
configuration.getDatabaseDir().toString(),
columnDescriptors,
columnHandles);
OptimisticTransactionDB.open(
options, configuration.getDatabaseDir().toString(), columnDescriptors, columnHandles);
metrics = rocksDBMetricsFactory.create(metricsSystem, configuration, db, stats);
final Map<Bytes, String> segmentsById =
segments.stream()
Expand Down
Expand Up @@ -38,13 +38,12 @@
import org.apache.logging.log4j.Logger;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.LRUCache;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.Status;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionDBOptions;
import org.rocksdb.WriteOptions;

public class RocksDBKeyValueStorage implements KeyValueStorage {
Expand All @@ -56,8 +55,7 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
private static final Logger LOG = LogManager.getLogger();

private final Options options;
private final TransactionDBOptions txOptions;
private final TransactionDB db;
private final OptimisticTransactionDB db;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final RocksDBMetrics rocksDBMetrics;
private final WriteOptions tryDeleteOptions = new WriteOptions().setNoSlowdown(true);
Expand All @@ -78,8 +76,7 @@ public RocksDBKeyValueStorage(
.setStatistics(stats);
options.getEnv().setBackgroundThreads(configuration.getBackgroundThreadCount());

txOptions = new TransactionDBOptions();
db = TransactionDB.open(options, txOptions, configuration.getDatabaseDir().toString());
db = OptimisticTransactionDB.open(options, configuration.getDatabaseDir().toString());
rocksDBMetrics = rocksDBMetricsFactory.create(metricsSystem, configuration, db, stats);
} catch (final RocksDBException e) {
throw new StorageException(e);
Expand Down Expand Up @@ -159,7 +156,6 @@ public KeyValueStorageTransaction startTransaction() throws StorageException {
public void close() {
if (closed.compareAndSet(false, true)) {
tryDeleteOptions.close();
txOptions.close();
options.close();
db.close();
}
Expand Down
Expand Up @@ -39,8 +39,8 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.Statistics;
import org.rocksdb.TransactionDB;

@RunWith(MockitoJUnitRunner.class)
public class RocksDBMetricsTest {
Expand All @@ -49,7 +49,7 @@ public class RocksDBMetricsTest {
@Mock private LabelledMetric<OperationTimer> labelledMetricOperationTimerMock;
@Mock private LabelledMetric<Counter> labelledMetricCounterMock;
@Mock private OperationTimer operationTimerMock;
@Mock private TransactionDB db;
@Mock private OptimisticTransactionDB db;
@Mock private Statistics stats;

@Rule public final TemporaryFolder folder = new TemporaryFolder();
Expand Down

0 comments on commit ffc8da3

Please sign in to comment.