Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle old events #6663

Merged
merged 21 commits into from May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,6 +34,7 @@
import com.hedera.node.app.signature.impl.SignatureVerificationFutureImpl;
import com.hedera.node.app.workflows.prehandle.PreHandleResult;
import com.swirlds.common.crypto.TransactionSignature;
import com.swirlds.common.system.SoftwareVersion;
import com.swirlds.common.system.transaction.ConsensusTransaction;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
Expand All @@ -55,12 +56,15 @@ public AdaptedMonoProcessLogic(@NonNull final StandardProcessLogic monoProcessLo
}

@Override
public void incorporateConsensusTxn(final ConsensusTransaction platformTxn, final long submittingMember) {
public void incorporateConsensusTxn(
@NonNull final ConsensusTransaction platformTxn,
final long submittingMember,
@NonNull final SoftwareVersion softwareVersion) {
if (platformTxn.getMetadata() instanceof PreHandleResult metadata) {
final var accessor = adaptForMono(platformTxn, metadata);
platformTxn.setMetadata(accessor);
}
monoProcessLogic.incorporateConsensusTxn(platformTxn, submittingMember);
monoProcessLogic.incorporateConsensusTxn(platformTxn, submittingMember, softwareVersion);
}

@NonNull
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.hedera.hapi.node.base.ResponseCodeEnum.INVALID_ACCOUNT_ID;
import static com.hedera.hapi.node.base.ResponseCodeEnum.OK;
import static com.hedera.node.app.service.mono.context.properties.SemanticVersions.SEMANTIC_VERSIONS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -42,6 +43,7 @@
import com.hedera.node.app.workflows.prehandle.PreHandleResult.Status;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.common.crypto.TransactionSignature;
import com.swirlds.common.system.SoftwareVersion;
import com.swirlds.common.system.transaction.internal.ConsensusTransactionImpl;
import java.util.List;
import java.util.Map;
Expand All @@ -68,6 +70,7 @@ class AdaptedMonoProcessLogicTest extends AppTestBase implements Scenarios {
private TransactionSignature signature;

private AdaptedMonoProcessLogic subject;
private final SoftwareVersion eventVersion = SEMANTIC_VERSIONS.deployedSoftwareVersion();

@BeforeEach
void setUp() {
Expand All @@ -78,9 +81,9 @@ void setUp() {
void passesThroughNonPreHandleResult() {
given(platformTxn.getMetadata()).willReturn(accessor);

subject.incorporateConsensusTxn(platformTxn, 1L);
subject.incorporateConsensusTxn(platformTxn, 1L, eventVersion);

verify(monoProcessLogic).incorporateConsensusTxn(platformTxn, 1L);
verify(monoProcessLogic).incorporateConsensusTxn(platformTxn, 1L, eventVersion);
}

@Test
Expand All @@ -101,7 +104,7 @@ void adaptsPreHandleResultAsPayerAndOthersIfOK() {
given(platformTxn.getMetadata()).willReturn(meta);
given(platformTxn.getContents()).willReturn(asByteArray(noopTxn));

subject.incorporateConsensusTxn(platformTxn, 1L);
subject.incorporateConsensusTxn(platformTxn, 1L, eventVersion);

verify(platformTxn).setMetadata(captor.capture());
final var accessor = captor.getValue();
Expand Down Expand Up @@ -131,7 +134,7 @@ void adaptsTransactionPayerOnlyIfNotOK() {
given(platformTxn.getMetadata()).willReturn(meta);
given(platformTxn.getContents()).willReturn(asByteArray(noopTxn));

subject.incorporateConsensusTxn(platformTxn, 1L);
subject.incorporateConsensusTxn(platformTxn, 1L, eventVersion);

verify(platformTxn).setMetadata(captor.capture());
final var accessor = captor.getValue();
Expand Down Expand Up @@ -161,7 +164,7 @@ void adaptsTransactionNonAvailableIfNullPayerKey() {
given(platformTxn.getMetadata()).willReturn(meta);
given(platformTxn.getContents()).willReturn(asByteArray(noopTxn));

subject.incorporateConsensusTxn(platformTxn, 1L);
subject.incorporateConsensusTxn(platformTxn, 1L, eventVersion);

verify(platformTxn).setMetadata(captor.capture());
final var accessor = captor.getValue();
Expand All @@ -187,7 +190,7 @@ void translatesUnparseableContentsAsISE() {
given(platformTxn.getMetadata()).willReturn(meta);
given(platformTxn.getContents()).willReturn(asByteArray(nonsenseTxn));

assertThrows(IllegalStateException.class, () -> subject.incorporateConsensusTxn(platformTxn, 1L));
assertThrows(IllegalStateException.class, () -> subject.incorporateConsensusTxn(platformTxn, 1L, eventVersion));
}

private static final JKey PAYER_KEY = new JEd25519Key("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".getBytes());
Expand Down
Expand Up @@ -27,6 +27,7 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.Comparator;
import java.util.Objects;
import java.util.regex.Pattern;

public class SerializableSemVers implements SoftwareVersion {
Expand All @@ -36,37 +37,41 @@ public class SerializableSemVers implements SoftwareVersion {
public static final int RELEASE_027_VERSION = 1;
public static final long CLASS_ID = 0x6f2b1bc2df8cbd0bL;

public static final Comparator<SemanticVersion> SEM_VER_COMPARATOR = Comparator.comparingInt(
private int servicesPreAlphaNumber;
private int protoPreAlphaNumber;
private String servicesBuild;
private String protoBuild;
private SemanticVersion proto;
private SemanticVersion services;

// Just compares major minor and patch versions. Pre and Build are compared in FULL_COMPARATOR.
private static final Comparator<SemanticVersion> SEM_VER_COMPARATOR = Comparator.comparingInt(
SemanticVersion::getMajor)
.thenComparingInt(SemanticVersion::getMinor)
.thenComparingInt(SemanticVersion::getPatch)
.thenComparingInt(semver -> alphaNumberOf(semver.getPre()))
// Whenever there is a need for doing an upgrade with config-only changes,
// we set the build portion on semver. This is needed to trigger platform
// upgrade code, which is otherwise not triggered for config-only changes.
.thenComparing(SemanticVersion::getBuild);
public static final Comparator<SerializableSemVers> FULL_COMPARATOR = Comparator.comparing(
SerializableSemVers::getServices, SEM_VER_COMPARATOR)
.thenComparing(SerializableSemVers::getProto, SEM_VER_COMPARATOR);
.thenComparingInt(SemanticVersion::getPatch);

private SemanticVersion proto;
private SemanticVersion services;
// Whenever there is a need for doing an upgrade with config-only changes,
// we set the build portion on semver. This is needed to trigger platform
// upgrade code, which is otherwise not triggered for config-only changes.
static final Comparator<SerializableSemVers> FULL_COMPARATOR = Comparator.comparing(
SerializableSemVers::getServices, SEM_VER_COMPARATOR)
.thenComparingInt(SerializableSemVers::getServicesPreAlphaNumber)
.thenComparing(SerializableSemVers::getServicesBuild)
.thenComparing(SerializableSemVers::getProto, SEM_VER_COMPARATOR)
.thenComparingInt(SerializableSemVers::getProtoPreAlphaNumber)
.thenComparing(SerializableSemVers::getProtoBuild);

public SerializableSemVers() {
// RuntimeConstructable
}

public SemanticVersion getProto() {
return proto;
}

public SemanticVersion getServices() {
return services;
}

public SerializableSemVers(@NonNull final SemanticVersion proto, @NonNull final SemanticVersion services) {
this.proto = proto;
this.services = services;
this.servicesPreAlphaNumber = alphaNumberOf(services.getPre());
this.protoPreAlphaNumber = alphaNumberOf(proto.getPre());
this.protoBuild = proto.getBuild();
this.servicesBuild = services.getBuild();
}

public static SerializableSemVers forHapiAndHedera(@NonNull final String proto, @NonNull final String services) {
Expand Down Expand Up @@ -161,6 +166,11 @@ public int compareTo(@Nullable final SoftwareVersion other) {
}
}

@Override
public int hashCode() {
return Objects.hash(proto, services);
}

@Override
public void deserialize(final SerializableDataInputStream in, final int version) throws IOException {
proto = deserializeSemVer(in);
Expand Down Expand Up @@ -228,10 +238,35 @@ private static String readable(@Nullable final SemanticVersion semVer) {
return sb.toString();
}

private static int alphaNumberOf(@NonNull final String pre) {
private int alphaNumberOf(@NonNull final String pre) {
final var alphaMatch = ALPHA_PRE_PATTERN.matcher(pre);
// alpha versions come before everything else
return alphaMatch.matches() ? Integer.parseInt(alphaMatch.group(1)) : Integer.MAX_VALUE;
servicesPreAlphaNumber = alphaMatch.matches() ? Integer.parseInt(alphaMatch.group(1)) : Integer.MAX_VALUE;
return servicesPreAlphaNumber;
}

public SemanticVersion getProto() {
return proto;
}

public SemanticVersion getServices() {
return services;
}

public int getServicesPreAlphaNumber() {
return servicesPreAlphaNumber;
}

public int getProtoPreAlphaNumber() {
return protoPreAlphaNumber;
}

public String getServicesBuild() {
return servicesBuild;
}

public String getProtoBuild() {
return protoBuild;
}

@VisibleForTesting
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.hedera.node.app.service.mono.records;

import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.BUSY;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.FAIL_INVALID;
import static com.hederahashgraph.api.proto.java.ResponseCodeEnum.UNKNOWN;

Expand All @@ -29,6 +30,7 @@
import com.hederahashgraph.api.proto.java.TransactionID;
import com.hederahashgraph.api.proto.java.TransactionReceipt;
import com.hederahashgraph.api.proto.java.TransactionRecord;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -72,16 +74,50 @@ void setPostConsensus(
}

public void setFailInvalid(
final AccountID effectivePayer,
final TxnAccessor accessor,
final Instant consensusTimestamp,
@NonNull final AccountID effectivePayer,
@NonNull final TxnAccessor accessor,
@NonNull final Instant consensusTimestamp,
final long submittingMember) {
setRecordWithStatus(effectivePayer, accessor, consensusTimestamp, submittingMember, FAIL_INVALID);
}

/**
* Set the record for a transaction that was submitted with an older event version. We set the status of the
* transaction as {@code ResponseCodeEnum.BUSY} on the receipt. This transaction needs to be re-submitted for
* it to succeed.
* @param effectivePayer payer for the transaction
* @param accessor transaction accessor
* @param consensusTimestamp consensus timestamp
* @param submittingMember submitting member
*/
public void setStaleTransaction(
Neeharika-Sompalli marked this conversation as resolved.
Show resolved Hide resolved
@NonNull final AccountID effectivePayer,
@NonNull final TxnAccessor accessor,
@NonNull final Instant consensusTimestamp,
final long submittingMember) {
setRecordWithStatus(effectivePayer, accessor, consensusTimestamp, submittingMember, BUSY);
}

/**
* Create a failure record with the given status in the receipt and store it in cache.
* @param effectivePayer payer for the transaction
* @param accessor transaction accessor
* @param consensusTimestamp consensus timestamp
* @param submittingMember submitting member
* @param status status of the transaction
*/
private void setRecordWithStatus(
Neeharika-Sompalli marked this conversation as resolved.
Show resolved Hide resolved
@NonNull final AccountID effectivePayer,
@NonNull final TxnAccessor accessor,
@NonNull final Instant consensusTimestamp,
final long submittingMember,
@NonNull final ResponseCodeEnum status) {
final var recordBuilder = creator.createInvalidFailureRecord(accessor, consensusTimestamp);
final var expiringRecord = creator.saveExpiringRecord(
effectivePayer, recordBuilder.build(), consensusTimestamp.getEpochSecond(), submittingMember);

final var recentHistory = histories.computeIfAbsent(accessor.getTxnId(), ignore -> new TxnIdRecentHistory());
recentHistory.observe(expiringRecord, FAIL_INVALID);
recentHistory.observe(expiringRecord, status);
}

public boolean isReceiptPresent(final TransactionID txnId) {
Expand Down
Expand Up @@ -16,13 +16,15 @@

package com.hedera.node.app.service.mono.state.logic;

import static com.hedera.node.app.service.mono.context.properties.SemanticVersions.SEMANTIC_VERSIONS;
import static com.hedera.node.app.service.mono.utils.Units.MIN_TRANS_TIMESTAMP_INCR_NANOS;

import com.google.protobuf.InvalidProtocolBufferException;
import com.hedera.node.app.service.mono.context.TransactionContext;
import com.hedera.node.app.service.mono.context.primitives.StateView;
import com.hedera.node.app.service.mono.ledger.SigImpactHistorian;
import com.hedera.node.app.service.mono.records.ConsensusTimeTracker;
import com.hedera.node.app.service.mono.records.RecordCache;
import com.hedera.node.app.service.mono.state.expiry.EntityAutoExpiry;
import com.hedera.node.app.service.mono.state.expiry.ExpiryManager;
import com.hedera.node.app.service.mono.stats.ExecutionTimeTracker;
Expand All @@ -31,7 +33,9 @@
import com.hedera.node.app.service.mono.txns.span.ExpandHandleSpan;
import com.hedera.node.app.service.mono.utils.accessors.SwirldsTxnAccessor;
import com.hedera.node.app.service.mono.utils.accessors.TxnAccessor;
import com.swirlds.common.system.SoftwareVersion;
import com.swirlds.common.system.transaction.ConsensusTransaction;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Instant;
import javax.inject.Inject;
import javax.inject.Singleton;
Expand All @@ -54,6 +58,7 @@ public class StandardProcessLogic implements ProcessLogic {
private final StateView workingView;
private final ScheduleProcessing scheduleProcessing;
private final RecordStreaming recordStreaming;
private final RecordCache recordCache;

@Inject
public StandardProcessLogic(
Expand All @@ -68,7 +73,8 @@ public StandardProcessLogic(
final ScheduleProcessing scheduleProcessing,
final ExecutionTimeTracker executionTimeTracker,
final RecordStreaming recordStreaming,
final StateView workingView) {
final StateView workingView,
final RecordCache recordCache) {
Neeharika-Sompalli marked this conversation as resolved.
Show resolved Hide resolved
this.expiries = expiries;
this.invariantChecks = invariantChecks;
this.expandHandleSpan = expandHandleSpan;
Expand All @@ -81,12 +87,24 @@ public StandardProcessLogic(
this.sigImpactHistorian = sigImpactHistorian;
this.recordStreaming = recordStreaming;
this.workingView = workingView;
this.recordCache = recordCache;
}

@Override
public void incorporateConsensusTxn(ConsensusTransaction platformTxn, long submittingMember) {
public void incorporateConsensusTxn(
@NonNull final ConsensusTransaction platformTxn,
final long submittingMember,
@NonNull final SoftwareVersion softwareVersion) {

try {
final var accessor = expandHandleSpan.accessorFor(platformTxn);
// Check if the transaction is submitted by a version before the deployed version.
// If so, return and set the status on the receipt to BUSY
if (SEMANTIC_VERSIONS.deployedSoftwareVersion().isAfter(softwareVersion)) {
recordCache.setStaleTransaction(
accessor.getPayer(), accessor, platformTxn.getConsensusTimestamp(), submittingMember);
return;
}
incorporate(accessor, platformTxn.getConsensusTimestamp(), submittingMember);
} catch (InvalidProtocolBufferException e) {
log.warn("Consensus platform txn was not gRPC!", e);
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.hedera.node.app.service.mono.txns;

import com.swirlds.common.system.Round;
import com.swirlds.common.system.SoftwareVersion;
import com.swirlds.common.system.transaction.ConsensusTransaction;

/**
Expand All @@ -35,15 +36,17 @@ public interface ProcessLogic {
* @param round a round of consensus transactions
*/
default void incorporateConsensus(final Round round) {
round.forEachEventTransaction((e, t) -> incorporateConsensusTxn(t, e.getCreatorId()));
round.forEachEventTransaction((e, t) -> incorporateConsensusTxn(t, e.getCreatorId(), e.getSoftwareVersion()));
}

/**
* Orchestrates a process to express the full implications of the given consensus transaction at
* the specified time.
* Orchestrates a process to express the full implications of the given consensus transaction at the specified
* time.
*
* @param platformTxn the consensus transaction to incorporate.
* @param platformTxn the consensus transaction to incorporate.
* @param submittingMember the id of the member that submitted the txn
* @param softwareVersion the version of the software that submitted the txn
*/
void incorporateConsensusTxn(ConsensusTransaction platformTxn, long submittingMember);
void incorporateConsensusTxn(
ConsensusTransaction platformTxn, long submittingMember, SoftwareVersion softwareVersion);
}