Skip to content

Commit

Permalink
fix: Modify Record Cache to commit entries to state and expire entrie…
Browse files Browse the repository at this point in the history
…s from history correctly

Fix record cache to commit added entries to the queue state
   Ensure the record cache will also remove entries from the queue state when expired
   Created a "TruePredicate" class for when we require a predicate, but it should always be true, so we avoid duplicating the code yet again.
      This was needed for removing the head item from the queue state.
Fix record cache to remove entries from histories when expired
Fix tests as needed
Adjust comments to match design and clarify the History map semantics.

Signed-off-by: Lev Povolotsky <lev@swirldslabs.com>
Signed-off-by: Joseph Sinclair <joseph.sinclair@swirldslabs.com>
  • Loading branch information
povolev15 authored and jsync-swirlds committed Dec 19, 2023
1 parent 0cdcec3 commit 5a131fe
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 35 deletions.
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.node.app.spi.validation;

import java.util.function.Predicate;

public class TruePredicate implements Predicate {
public static final Predicate INSTANCE = new TruePredicate();

Check warning on line 22 in hedera-node/hedera-app-spi/src/main/java/com/hedera/node/app/spi/validation/TruePredicate.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app-spi/src/main/java/com/hedera/node/app/spi/validation/TruePredicate.java#L21-L22

Added lines #L21 - L22 were not covered by tests

@Override
public boolean test(Object ignored) {
return true;

Check warning on line 26 in hedera-node/hedera-app-spi/src/main/java/com/hedera/node/app/spi/validation/TruePredicate.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app-spi/src/main/java/com/hedera/node/app/spi/validation/TruePredicate.java#L26

Added line #L26 was not covered by tests
}
}
Expand Up @@ -29,8 +29,12 @@
import com.hedera.hapi.node.base.TransactionID;
import com.hedera.hapi.node.state.recordcache.TransactionRecordEntry;
import com.hedera.hapi.node.transaction.TransactionRecord;
import com.hedera.node.app.spi.state.CommittableWritableStates;
import com.hedera.node.app.spi.state.ReadableQueueState;
import com.hedera.node.app.spi.state.ReadableStates;
import com.hedera.node.app.spi.state.WritableQueueState;
import com.hedera.node.app.spi.state.WritableStates;
import com.hedera.node.app.spi.validation.TruePredicate;
import com.hedera.node.app.state.DeduplicationCache;
import com.hedera.node.app.state.HederaRecordCache;
import com.hedera.node.app.state.SingleTransactionRecord;
Expand Down Expand Up @@ -178,8 +182,9 @@ public void add(

// To avoid having a background thread cleaning out this queue, we spend a little time when adding to the queue
// to also remove from the queue any transactions that have expired.
final var queue = getQueue();
final var firstRecord = transactionRecords.get(0);
final WritableStates states = getWritableState();
final WritableQueueState<TransactionRecordEntry> queue = states.getQueue(TXN_RECORD_QUEUE);
final SingleTransactionRecord firstRecord = transactionRecords.get(0);
removeExpiredTransactions(queue, firstRecord.transactionRecord().consensusTimestampOrElse(Timestamp.DEFAULT));

// For each transaction, in order, add to the queue and to the in-memory data structures.
Expand All @@ -188,6 +193,10 @@ public void add(
addToInMemoryCache(nodeId, payerAccountId, rec);
queue.add(new TransactionRecordEntry(nodeId, payerAccountId, rec));
}

if (states instanceof CommittableWritableStates committable) {
committable.commit();
}
}

@NonNull
Expand Down Expand Up @@ -240,7 +249,7 @@ private void addToInMemoryCache(
// Either we add this tx to the main records list if it is a user/preceding transaction, or to the child
// transactions list of its parent. Note that scheduled transactions are always child transactions, but
// never produce child *records*; instead, the scheduled transaction record is treated as
// a user transaction record.
// a user transaction record. The map key remains the current user transaction ID, however.
final var listToAddTo = (isChildTx && !txId.scheduled()) ? history.childRecords() : history.records();
listToAddTo.add(transactionRecord);

Expand All @@ -257,33 +266,39 @@ private void removeExpiredTransactions(
@NonNull final Timestamp consensusTimestamp) {
// Compute the earliest valid start timestamp that is still within the max transaction duration window.
final var config = configProvider.getConfiguration().getConfigData(HederaConfig.class);
final var earliestValidState = minus(consensusTimestamp, config.transactionMaxValidDuration());

// Loop in order and expunge every entry where the timestamp is before the current time. Also remove from the
// in memory data structures.
final var itr = queue.iterator();
while (itr.hasNext()) {
final var entry = itr.next();
final var rec = entry.transactionRecordOrThrow();
final var txId = rec.transactionIDOrThrow();
// If the timestamp is before the current time, then it has expired
if (isBefore(txId.transactionValidStartOrThrow(), earliestValidState)) {
// Remove from the histories
itr.remove();
// Remove from the payer to transaction index
final var payerAccountId = txId.accountIDOrThrow(); // NOTE: Not accurate if the payer was the node
final var transactionIDs =
payerToTransactionIndex.computeIfAbsent(payerAccountId, ignored -> new HashSet<>());
transactionIDs.remove(txId);
if (transactionIDs.isEmpty()) {
payerToTransactionIndex.remove(payerAccountId);
final var earliestValidStart = minus(consensusTimestamp, config.transactionMaxValidDuration());
// Loop in order and expunge every entry where the start time is before the earliest valid start.
// Also remove from the in-memory data structures.
do {
final var entry = queue.peek();
if (entry != null) {
final var rec = entry.transactionRecordOrThrow();
final var txId = rec.transactionIDOrThrow();
// If the valid start time is before the earliest valid start, then it has expired
if (isBefore(txId.transactionValidStartOrThrow(), earliestValidStart)) {
// Remove from the histories. Note that all transactions are added to this map
// keyed to the "user transaction" ID, so removing the entry here removes both
// "parent" and "child" transaction records associated with that ID.
histories.remove(txId);

Check warning on line 282 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/recordcache/RecordCacheImpl.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/recordcache/RecordCacheImpl.java#L282

Added line #L282 was not covered by tests
// remove from queue as well. The queue only permits removing the current "HEAD",
// but that should always be correct here.
queue.removeIf(TruePredicate.INSTANCE);

Check warning on line 285 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/recordcache/RecordCacheImpl.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/recordcache/RecordCacheImpl.java#L285

Added line #L285 was not covered by tests
// Remove from the payer to transaction index
final var payerAccountId = txId.accountIDOrThrow(); // NOTE: Not accurate if the payer was the node
final var transactionIDs =
payerToTransactionIndex.computeIfAbsent(payerAccountId, ignored -> new HashSet<>());
transactionIDs.remove(txId);

Check warning on line 290 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/recordcache/RecordCacheImpl.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/recordcache/RecordCacheImpl.java#L287-L290

Added lines #L287 - L290 were not covered by tests
if (transactionIDs.isEmpty()) {
payerToTransactionIndex.remove(payerAccountId);

Check warning on line 292 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/recordcache/RecordCacheImpl.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/recordcache/RecordCacheImpl.java#L292

Added line #L292 was not covered by tests
}
} else {
break;
}
} else {
return;
break;
}
}
} while (true);

Check warning on line 300 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/recordcache/RecordCacheImpl.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/recordcache/RecordCacheImpl.java#L300

Added line #L300 was not covered by tests
}

// ---------------------------------------------------------------------------------------------------------------
// Implementation methods of RecordCache
// ---------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -329,22 +344,17 @@ public List<TransactionRecord> getRecords(@NonNull final AccountID accountID) {
}

/** Utility method that get the writable queue from the working state */
private WritableQueueState<TransactionRecordEntry> getQueue() {
private WritableStates getWritableState() {
final var hederaState = workingStateAccessor.getHederaState();
if (hederaState == null) {
throw new RuntimeException("HederaState is null. This can only happen very early during bootstrapping");
}
final var states = hederaState.createWritableStates(NAME);
return states.getQueue(TXN_RECORD_QUEUE);
return hederaState.createWritableStates(NAME);
}

/** Utility method that get the readable queue from the working state */
private ReadableQueueState<TransactionRecordEntry> getReadableQueue() {
final var hederaState = workingStateAccessor.getHederaState();
if (hederaState == null) {
throw new RuntimeException("HederaState is null. This can only happen very early during bootstrapping");
}
final var states = hederaState.createReadableStates(NAME);
final ReadableStates states = getWritableState();
return states.getQueue(TXN_RECORD_QUEUE);
}
}
Expand Up @@ -207,7 +207,7 @@ protected void refreshRecordCache() {
lenient().when(wsa.getHederaState()).thenReturn(state);
lenient().when(props.getConfiguration()).thenReturn(versionedConfig);
lenient().when(versionedConfig.getConfigData(HederaConfig.class)).thenReturn(hederaConfig);
lenient().when(hederaConfig.transactionMaxValidDuration()).thenReturn(180L);
lenient().when(hederaConfig.transactionMaxValidDuration()).thenReturn(123456789999L);
lenient().when(versionedConfig.getConfigData(LedgerConfig.class)).thenReturn(ledgerConfig);
lenient().when(ledgerConfig.recordsMaxQueryableByAccount()).thenReturn(MAX_QUERYABLE_PER_ACCOUNT);
givenRecordCacheState();
Expand Down

0 comments on commit 5a131fe

Please sign in to comment.