Skip to content

Commit

Permalink
fix: recordCache to commit added entries and implemented correctly th…
Browse files Browse the repository at this point in the history
…e remove elements from the queue (#10523)

Signed-off-by: Lev Povolotsky <lev@swirldslabs.com>
Signed-off-by: Joseph Sinclair <joseph.sinclair@swirldslabs.com>
Signed-off-by: Michael Tinker <michael.tinker@swirldslabs.com>
Co-authored-by: Michael Tinker <michael.tinker@swirldslabs.com>
  • Loading branch information
povolev15 and tinker-michaelj committed Dec 31, 2023
1 parent b5574b1 commit 7534e43
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 49 deletions.
Expand Up @@ -16,7 +16,9 @@

package com.hedera.node.app.spi.records;

import com.hedera.hapi.node.base.Timestamp;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.Instant;

Expand Down Expand Up @@ -56,6 +58,15 @@ public interface BlockRecordInfo {
*/
long lastBlockNo();

/**
* Get the number of the current block.
*
* @return the number of the current block
*/
default long blockNo() {
return lastBlockNo() + 1;
}

/**
* Get the consensus time of the first transaction of the last block, this is the last completed immutable block.
*
Expand All @@ -64,6 +75,15 @@ public interface BlockRecordInfo {
@Nullable
Instant firstConsTimeOfLastBlock();

/**
* The current block timestamp. Its seconds is the value returned by {@code block.timestamp} for a contract
* executing * in this block).
*
* @return the current block timestamp
*/
@NonNull
Timestamp currentBlockTimestamp();

/**
* Gets the hash of the last block
*
Expand Down
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.node.app.spi.validation;

import java.util.function.Predicate;

public class TruePredicate implements Predicate {
public static final Predicate INSTANCE = new TruePredicate();

@Override
public boolean test(Object ignored) {
return true;
}
}
Expand Up @@ -18,6 +18,7 @@

import static java.util.Objects.requireNonNull;

import com.hedera.hapi.node.base.Timestamp;
import com.hedera.hapi.node.state.blockrecords.BlockInfo;
import com.hedera.hapi.node.state.blockrecords.RunningHashes;
import com.hedera.node.app.spi.records.BlockRecordInfo;
Expand Down Expand Up @@ -79,6 +80,12 @@ public Bytes lastBlockHash() {
return BlockRecordInfoUtils.lastBlockHash(blockInfo);
}

@Override
public @NonNull Timestamp currentBlockTimestamp() {
// There should always be a current block and a first consensus time within it
return blockInfo.firstConsTimeOfCurrentBlockOrThrow();
}

/** {@inheritDoc} */
@Nullable
@Override
Expand Down
Expand Up @@ -293,6 +293,11 @@ public Instant firstConsTimeOfLastBlock() {
return BlockRecordInfoUtils.firstConsTimeOfLastBlock(lastBlockInfo);
}

@Override
public @NonNull Timestamp currentBlockTimestamp() {
return lastBlockInfo.firstConsTimeOfCurrentBlockOrThrow();
}

/**
* {@inheritDoc}
*/
Expand Down
Expand Up @@ -29,8 +29,12 @@
import com.hedera.hapi.node.base.TransactionID;
import com.hedera.hapi.node.state.recordcache.TransactionRecordEntry;
import com.hedera.hapi.node.transaction.TransactionRecord;
import com.hedera.node.app.spi.state.CommittableWritableStates;
import com.hedera.node.app.spi.state.ReadableQueueState;
import com.hedera.node.app.spi.state.ReadableStates;
import com.hedera.node.app.spi.state.WritableQueueState;
import com.hedera.node.app.spi.state.WritableStates;
import com.hedera.node.app.spi.validation.TruePredicate;
import com.hedera.node.app.state.DeduplicationCache;
import com.hedera.node.app.state.HederaRecordCache;
import com.hedera.node.app.state.SingleTransactionRecord;
Expand Down Expand Up @@ -179,8 +183,9 @@ public void add(

// To avoid having a background thread cleaning out this queue, we spend a little time when adding to the queue
// to also remove from the queue any transactions that have expired.
final var queue = getQueue();
final var firstRecord = transactionRecords.get(0);
final WritableStates states = getWritableState();
final WritableQueueState<TransactionRecordEntry> queue = states.getQueue(TXN_RECORD_QUEUE);
final SingleTransactionRecord firstRecord = transactionRecords.get(0);
removeExpiredTransactions(queue, firstRecord.transactionRecord().consensusTimestampOrElse(Timestamp.DEFAULT));

// For each transaction, in order, add to the queue and to the in-memory data structures.
Expand All @@ -189,6 +194,10 @@ public void add(
addToInMemoryCache(nodeId, payerAccountId, rec);
queue.add(new TransactionRecordEntry(nodeId, payerAccountId, rec));
}

if (states instanceof CommittableWritableStates committable) {
committable.commit();
}
}

@NonNull
Expand Down Expand Up @@ -246,7 +255,7 @@ private void addToInMemoryCache(
// Either we add this tx to the main records list if it is a user/preceding transaction, or to the child
// transactions list of its parent. Note that scheduled transactions are always child transactions, but
// never produce child *records*; instead, the scheduled transaction record is treated as
// a user transaction record.
// a user transaction record. The map key remains the current user transaction ID, however.
final var listToAddTo = (isChildTx && !txId.scheduled()) ? history.childRecords() : history.records();
listToAddTo.add(transactionRecord);

Expand All @@ -263,33 +272,39 @@ private void removeExpiredTransactions(
@NonNull final Timestamp consensusTimestamp) {
// Compute the earliest valid start timestamp that is still within the max transaction duration window.
final var config = configProvider.getConfiguration().getConfigData(HederaConfig.class);
final var earliestValidState = minus(consensusTimestamp, config.transactionMaxValidDuration());

// Loop in order and expunge every entry where the timestamp is before the current time. Also remove from the
// in memory data structures.
final var itr = queue.iterator();
while (itr.hasNext()) {
final var entry = itr.next();
final var rec = entry.transactionRecordOrThrow();
final var txId = rec.transactionIDOrThrow();
// If the timestamp is before the current time, then it has expired
if (isBefore(txId.transactionValidStartOrThrow(), earliestValidState)) {
// Remove from the histories
itr.remove();
// Remove from the payer to transaction index
final var payerAccountId = txId.accountIDOrThrow(); // NOTE: Not accurate if the payer was the node
final var transactionIDs =
payerToTransactionIndex.computeIfAbsent(payerAccountId, ignored -> new HashSet<>());
transactionIDs.remove(txId);
if (transactionIDs.isEmpty()) {
payerToTransactionIndex.remove(payerAccountId);
final var earliestValidStart = minus(consensusTimestamp, config.transactionMaxValidDuration());
// Loop in order and expunge every entry where the start time is before the earliest valid start.
// Also remove from the in-memory data structures.
do {
final var entry = queue.peek();
if (entry != null) {
final var rec = entry.transactionRecordOrThrow();
final var txId = rec.transactionIDOrThrow();
// If the valid start time is before the earliest valid start, then it has expired
if (isBefore(txId.transactionValidStartOrThrow(), earliestValidStart)) {
// Remove from the histories. Note that all transactions are added to this map
// keyed to the "user transaction" ID, so removing the entry here removes both
// "parent" and "child" transaction records associated with that ID.
histories.remove(txId);
// remove from queue as well. The queue only permits removing the current "HEAD",
// but that should always be correct here.
queue.removeIf(TruePredicate.INSTANCE);
// Remove from the payer to transaction index
final var payerAccountId = txId.accountIDOrThrow(); // NOTE: Not accurate if the payer was the node
final var transactionIDs =
payerToTransactionIndex.computeIfAbsent(payerAccountId, ignored -> new HashSet<>());
transactionIDs.remove(txId);
if (transactionIDs.isEmpty()) {
payerToTransactionIndex.remove(payerAccountId);
}
} else {
break;
}
} else {
return;
break;
}
}
} while (true);
}

// ---------------------------------------------------------------------------------------------------------------
// Implementation methods of RecordCache
// ---------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -335,22 +350,17 @@ public List<TransactionRecord> getRecords(@NonNull final AccountID accountID) {
}

/** Utility method that get the writable queue from the working state */
private WritableQueueState<TransactionRecordEntry> getQueue() {
private WritableStates getWritableState() {
final var hederaState = workingStateAccessor.getHederaState();
if (hederaState == null) {
throw new RuntimeException("HederaState is null. This can only happen very early during bootstrapping");
}
final var states = hederaState.createWritableStates(NAME);
return states.getQueue(TXN_RECORD_QUEUE);
return hederaState.createWritableStates(NAME);
}

/** Utility method that get the readable queue from the working state */
private ReadableQueueState<TransactionRecordEntry> getReadableQueue() {
final var hederaState = workingStateAccessor.getHederaState();
if (hederaState == null) {
throw new RuntimeException("HederaState is null. This can only happen very early during bootstrapping");
}
final var states = hederaState.createReadableStates(NAME);
final ReadableStates states = getWritableState();
return states.getQueue(TXN_RECORD_QUEUE);
}
}
Expand Up @@ -169,6 +169,8 @@ void testRecordStreamProduction(final String startMode, final boolean concurrent
if (!startMode.equals("GENESIS")) {
blockRecordManager.switchBlocksAt(FORCED_BLOCK_SWITCH_TIME);
}
assertThat(blockRecordManager.currentBlockTimestamp()).isNotNull();
assertThat(blockRecordManager.blockNo()).isEqualTo(blockRecordManager.lastBlockNo() + 1);
// write a blocks & record files
int transactionCount = 0;
final List<Bytes> endOfBlockHashes = new ArrayList<>();
Expand Down
Expand Up @@ -207,7 +207,7 @@ protected void refreshRecordCache() {
lenient().when(wsa.getHederaState()).thenReturn(state);
lenient().when(props.getConfiguration()).thenReturn(versionedConfig);
lenient().when(versionedConfig.getConfigData(HederaConfig.class)).thenReturn(hederaConfig);
lenient().when(hederaConfig.transactionMaxValidDuration()).thenReturn(180L);
lenient().when(hederaConfig.transactionMaxValidDuration()).thenReturn(123456789999L);
lenient().when(versionedConfig.getConfigData(LedgerConfig.class)).thenReturn(ledgerConfig);
lenient().when(ledgerConfig.recordsMaxQueryableByAccount()).thenReturn(MAX_QUERYABLE_PER_ACCOUNT);
givenRecordCacheState();
Expand Down
Expand Up @@ -53,6 +53,6 @@ public Hash blockHashOf(final long blockNo) {
*/
@Override
public BlockValues blockValuesOf(final long gasLimit) {
return new HevmBlockValues(gasLimit, context.blockRecordInfo().lastBlockNo(), context.consensusNow());
return HevmBlockValues.from(context.blockRecordInfo(), gasLimit);
}
}
Expand Up @@ -18,16 +18,22 @@

import static java.util.Objects.requireNonNull;

import com.hedera.hapi.node.base.Timestamp;
import com.hedera.node.app.spi.records.BlockRecordInfo;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Instant;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.evm.frame.BlockValues;

public record HevmBlockValues(long gasLimit, long blockNo, @NonNull Instant blockTime) implements BlockValues {
public record HevmBlockValues(long gasLimit, long blockNo, @NonNull Timestamp blockTime) implements BlockValues {
private static final Optional<Wei> ZERO_BASE_FEE = Optional.of(Wei.ZERO);

public static HevmBlockValues from(@NonNull final BlockRecordInfo blockRecordInfo, final long gasLimit) {
requireNonNull(blockRecordInfo);
return new HevmBlockValues(gasLimit, blockRecordInfo.blockNo(), blockRecordInfo.currentBlockTimestamp());
}

public HevmBlockValues {
requireNonNull(blockTime);
}
Expand All @@ -39,7 +45,7 @@ public long getGasLimit() {

@Override
public long getTimestamp() {
return blockTime.getEpochSecond();
return blockTime.seconds();
}

@Override
Expand Down
Expand Up @@ -22,7 +22,6 @@
import com.hedera.node.app.spi.workflows.HandleContext;
import com.hedera.node.app.spi.workflows.QueryContext;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Instant;
import java.util.Objects;
import javax.inject.Inject;
import org.hyperledger.besu.datatypes.Hash;
Expand All @@ -35,12 +34,10 @@
@QueryScope
public class QueryContextHevmBlocks implements HederaEvmBlocks {
private final QueryContext context;
private final Instant consensusTime;

@Inject
public QueryContextHevmBlocks(@NonNull final QueryContext context, @NonNull final Instant consensusTime) {
public QueryContextHevmBlocks(@NonNull final QueryContext context) {
this.context = Objects.requireNonNull(context);
this.consensusTime = Objects.requireNonNull(consensusTime);
}

/**
Expand All @@ -57,6 +54,6 @@ public Hash blockHashOf(final long blockNo) {
*/
@Override
public BlockValues blockValuesOf(final long gasLimit) {
return new HevmBlockValues(gasLimit, context.blockRecordInfo().lastBlockNo(), consensusTime);
return HevmBlockValues.from(context.blockRecordInfo(), gasLimit);
}
}
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.BDDMockito.given;

import com.hedera.hapi.node.base.Timestamp;
import com.hedera.node.app.service.contract.impl.hevm.HandleContextHevmBlocks;
import com.hedera.node.app.service.contract.impl.utils.ConversionUtils;
import com.hedera.node.app.spi.records.BlockRecordInfo;
Expand Down Expand Up @@ -65,8 +66,10 @@ void returnsUnavailableHashIfNecessary() {

@Test
void blockValuesHasExpectedValues() {
given(blockRecordInfo.lastBlockNo()).willReturn(123L);
given(context.consensusNow()).willReturn(ETERNAL_NOW);
final var now = new Timestamp(1_234_567L, 890);
given(blockRecordInfo.blockNo()).willReturn(123L);
given(blockRecordInfo.currentBlockTimestamp()).willReturn(now);
given(context.blockRecordInfo()).willReturn(blockRecordInfo);

final var blockValues = subject.blockValuesOf(456L);

Expand Down
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.BDDMockito.given;

import com.hedera.hapi.node.base.Timestamp;
import com.hedera.node.app.service.contract.impl.hevm.QueryContextHevmBlocks;
import com.hedera.node.app.service.contract.impl.utils.ConversionUtils;
import com.hedera.node.app.spi.records.BlockRecordInfo;
Expand All @@ -48,7 +49,7 @@ class QueryContextHevmBlocksTest {

@BeforeEach
void setUp() {
subject = new QueryContextHevmBlocks(context, ETERNAL_NOW);
subject = new QueryContextHevmBlocks(context);
given(context.blockRecordInfo()).willReturn(blockRecordInfo);
}

Expand All @@ -65,7 +66,10 @@ void returnsUnavailableHashIfNecessary() {

@Test
void blockValuesHasExpectedValues() {
given(blockRecordInfo.lastBlockNo()).willReturn(123L);
final var now = new Timestamp(1_234_567L, 890);
given(blockRecordInfo.blockNo()).willReturn(123L);
given(blockRecordInfo.currentBlockTimestamp()).willReturn(now);
given(context.blockRecordInfo()).willReturn(blockRecordInfo);

final var blockValues = subject.blockValuesOf(456L);

Expand Down
Expand Up @@ -41,6 +41,7 @@
import com.hedera.services.bdd.junit.HapiTest;
import com.hedera.services.bdd.junit.HapiTestSuite;
import com.hedera.services.bdd.spec.HapiSpec;
import com.hedera.services.bdd.suites.BddMethodIsNotATest;
import com.hedera.services.bdd.suites.HapiSuite;
import com.hederahashgraph.api.proto.java.TokenSupplyType;
import com.hederahashgraph.api.proto.java.TokenType;
Expand Down Expand Up @@ -150,7 +151,7 @@ final HapiSpec retryLimitDemo() {
cryptoTransfer(tinyBarsFromTo(GENESIS, FUNDING, 7L)));
}

@HapiTest
@BddMethodIsNotATest
final HapiSpec freezeDemo() {
return customHapiSpec("FreezeDemo")
.withProperties(Map.of("nodes", "127.0.0.1:50213:0.0.3,127.0.0.1:50214:0.0.4,127.0.0.1:50215:0.0.5"))
Expand Down

0 comments on commit 7534e43

Please sign in to comment.