Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement complete SubmissionManager with expiry dedupe cache #6646

Merged
merged 3 commits into from May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,6 +27,7 @@
import com.hedera.hapi.node.transaction.TransactionBody;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Instant;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.Set;

Expand All @@ -38,6 +39,10 @@
private static final Key EMPTY_KEY_LIST =
Key.newBuilder().keyList(KeyList.DEFAULT).build();

/** A simple {@link Comparator} for {@link Timestamp}s. */
public static final Comparator<Timestamp> TIMESTAMP_COMPARATOR =
Comparator.comparingLong(Timestamp::seconds).thenComparingInt(Timestamp::nanos);

private HapiUtils() {}

/**
Expand All @@ -55,13 +60,27 @@
&& account.alias().length() == EVM_ADDRESS_ALIAS_LENGTH);
}

public static Timestamp asTimestamp(final Instant instant) {
/** Converts the given {@link Instant} into a {@link Timestamp}. */
public static Timestamp asTimestamp(@NonNull final Instant instant) {
return Timestamp.newBuilder()
.seconds(instant.getEpochSecond())
.nanos(instant.getNano())
.build();
}

/** Subtracts the given number of seconds from the given {@link Timestamp}, returning a new {@link Timestamp}. */
public static Timestamp minus(@NonNull final Timestamp ts, @NonNull final long seconds) {
return Timestamp.newBuilder()
.seconds(ts.seconds() - seconds)
.nanos(ts.nanos())
.build();

Check warning on line 76 in hedera-node/hedera-app-spi/src/main/java/com/hedera/node/app/spi/HapiUtils.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app-spi/src/main/java/com/hedera/node/app/spi/HapiUtils.java#L73-L76

Added lines #L73 - L76 were not covered by tests
}

/** Determines whether the first timestamp is before the second timestamp. Think of it as, "Is t1 before t2?" */
public static boolean isBefore(@NonNull final Timestamp t1, @NonNull final Timestamp t2) {
return TIMESTAMP_COMPARATOR.compare(t1, t2) < 0;
}

public static final Set<HederaFunctionality> QUERY_FUNCTIONS = EnumSet.of(
HederaFunctionality.CONSENSUS_GET_TOPIC_INFO,
HederaFunctionality.GET_BY_SOLIDITY_ID,
Expand Down
@@ -0,0 +1,86 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.node.app.spi;

import static com.hedera.node.app.spi.HapiUtils.asTimestamp;
import static org.assertj.core.api.Assertions.assertThat;

import com.hedera.hapi.node.base.Timestamp;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Instant;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

final class HapiUtilsTest {

@ParameterizedTest
@CsvSource(
textBlock =
"""
2007-12-03T10:15:30.00Z, 2007-12-03T10:15:30.01Z
2007-12-31T23:59:59.99Z, 2008-01-01T00:00:00.00Z
""")
@DisplayName("When timestamp t1 comes before timestamp t2")
void isBefore(@NonNull final Instant i1, @NonNull final Instant i2) {
final var t1 = asTimestamp(i1);
final var t2 = asTimestamp(i2);
assertThat(HapiUtils.isBefore(t1, t2)).isTrue();
}

@ParameterizedTest
@CsvSource(
textBlock =
"""
2007-12-03T10:15:30.01Z, 2007-12-03T10:15:30.00Z
2008-01-01T00:00:00.00Z, 2007-12-31T23:59:59.99Z
""")
@DisplayName("When timestamp t1 comes after timestamp t2")
void isAfter(@NonNull final Instant i1, @NonNull final Instant i2) {
final var t1 = asTimestamp(i1);
final var t2 = asTimestamp(i2);
assertThat(HapiUtils.isBefore(t1, t2)).isFalse();
}

@ParameterizedTest
@CsvSource(
textBlock =
"""
2007-12-03T10:15:30.00Z, 2007-12-03T10:15:30.00Z
2007-12-31T23:59:59.99Z, 2007-12-31T23:59:59.99Z
2008-01-01T00:00:00.00Z, 2008-01-01T00:00:00.00Z
""")
@DisplayName("When timestamp t1 is the same as timestamp t2")
void isEqual(@NonNull final Instant i1, @NonNull final Instant i2) {
final var t1 = asTimestamp(i1);
final var t2 = asTimestamp(i2);
assertThat(HapiUtils.isBefore(t1, t2)).isFalse();
}

@Test
@DisplayName("Converting an Instant into a Timestamp")
void convertInstantToTimestamp() {
// Given an instant with nanosecond precision
final var instant = Instant.ofEpochSecond(1000, 123456789);
// When we convert it into a timestamp
final var timestamp = asTimestamp(instant);
// Then we find the timestamp matches the original instant
assertThat(timestamp)
.isEqualTo(Timestamp.newBuilder().seconds(1000).nanos(123456789).build());
}
}
Expand Up @@ -16,57 +16,91 @@

package com.hedera.node.app.workflows.ingest;

import static com.hedera.hapi.node.base.ResponseCodeEnum.DUPLICATE_TRANSACTION;
import static com.hedera.hapi.node.base.ResponseCodeEnum.PLATFORM_TRANSACTION_NOT_CREATED;
import static com.hedera.node.app.spi.HapiUtils.TIMESTAMP_COMPARATOR;
import static com.hedera.node.app.spi.HapiUtils.asTimestamp;
import static com.hedera.node.app.spi.HapiUtils.isBefore;
import static com.hedera.node.app.spi.HapiUtils.minus;
import static java.util.Objects.requireNonNull;

import com.hedera.hapi.node.base.AccountID;
import com.hedera.hapi.node.base.TransactionID;
import com.hedera.hapi.node.transaction.TransactionBody;
import com.hedera.node.app.annotations.NodeSelfId;
import com.hedera.node.app.service.mono.context.properties.GlobalDynamicProperties;
import com.hedera.node.app.service.mono.context.properties.NodeLocalProperties;
import com.hedera.node.app.service.mono.context.properties.Profile;
import com.hedera.node.app.service.mono.pbj.PbjConverter;
import com.hedera.node.app.spi.workflows.PreCheckException;
import com.hedera.node.app.state.RecordCache;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.common.metrics.Metrics;
import com.swirlds.common.metrics.SpeedometerMetric;
import com.swirlds.common.system.Platform;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Instant;
import java.util.Set;
import java.util.TreeSet;
import javax.inject.Inject;
import javax.inject.Singleton;

/** The {@code SubmissionManager} provides functionality to submit transactions to the platform. */
/**
* The {@code SubmissionManager} submits transactions to the platform. As this is an honest node, it makes a strong
* attempt to <strong>avoid</strong> submitting duplicate transactions to the platform.
*
* <p>An honest node does not want to submit duplicate transactions (since it will be charged for it). We use an
* in-memory cache to keep track of transactions that have been submitted, sorted by transaction start time. This cache
* is not part of state, because it will be different for different nodes.
*
* <p>Because this cache is not in state, if this node is restarted, it will forget about transactions it has already
* submitted and could end up sending a <strong>single</strong> duplicate transaction. If there is a poorly behaving
* client and this node reboots, it will no longer know the transaction is a duplicate and will submit it, with the
* node ending up having to pay for it. If we had a shutdown hook we could save this information off during graceful
* shutdown and reload it on startup, but we don't have that hook yet, and anyway hard crashes would still impact the
* node.
*
* <p>This cache is <strong>NOT</strong> impacted by falling behind or reconnecting, so the only time we will submit
* duplicate transactions is if the node is restarted. We hope to improve this in the future.
*/
@Singleton
public class SubmissionManager {
/** Metric settings for keeping track of rejected transactions */
private static final String PLATFORM_TXN_REJECTIONS_NAME = "platformTxnNotCreated/sec";

private static final String PLATFORM_TXN_REJECTIONS_DESC = "number of platform transactions not created per second";
private static final String SPEEDOMETER_FORMAT = "%,13.2f";

// FUTURE Consider adding a metric to keep track of the number of duplicate transactions submitted by users.

/** The {@link Platform} to which transactions will be submitted */
private final Platform platform;
private final RecordCache recordCache;
/** Whether this node is running in production mode. We hope to remove this logic in the future. */
private final boolean isProduction;
/** Used for looking up the max transaction duration window. To be replaced by some new config object */
private final GlobalDynamicProperties props;
/** Metrics related to submissions */
private final SpeedometerMetric platformTxnRejections;
private final AccountID nodeSelfID;
/**
* The {@link TransactionID}s that this node has already submitted to the platform, sorted by transaction start
* time, such that earlier start times come first. We guard this data structure within a synchronized block.
*/
private final Set<TransactionID> submittedTxns = new TreeSet<>((t1, t2) ->
TIMESTAMP_COMPARATOR.compare(t1.transactionValidStartOrThrow(), t2.transactionValidStartOrThrow()));

/**
* Constructor of {@code SubmissionManager}
* Create a new {@code SubmissionManager} instance.
*
* @param nodeSelfID the {@link AccountID} for referring to this node's operator account
* @param platform the {@link Platform} to which transactions will be submitted
* @param recordCache the {@link RecordCache} that tracks submitted transactions
* @param props the {@link GlobalDynamicProperties} with the setting for max transaction duration
* @param nodeLocalProperties the {@link NodeLocalProperties} that keep local properties
* @param metrics metrics related to submissions
*/
@Inject
public SubmissionManager(
@NodeSelfId @NonNull final AccountID nodeSelfID,
@NonNull final Platform platform,
@NonNull final RecordCache recordCache,
@NonNull final GlobalDynamicProperties props,
@NonNull final NodeLocalProperties nodeLocalProperties,
@NonNull final Metrics metrics) {
this.nodeSelfID = requireNonNull(nodeSelfID);
this.platform = requireNonNull(platform);
this.recordCache = requireNonNull(recordCache);
this.props = requireNonNull(props);
this.isProduction = requireNonNull(nodeLocalProperties).activeProfile() == Profile.PROD;
this.platformTxnRejections =
metrics.getOrCreate(new SpeedometerMetric.Config("app", PLATFORM_TXN_REJECTIONS_NAME)
Expand Down Expand Up @@ -103,18 +137,57 @@ public void submit(@NonNull final TransactionBody txBody, @NonNull final Bytes t
payload = txBody.uncheckedSubmitOrThrow().transactionBytes();
}

// An honest node does not want to submit duplicate transactions (since it will be charged for it), so we will
// check whether it has been submitted already. It is still possible under high concurrency that a duplicate
// transaction could be submitted, but doing this check here makes it much less likely.
final var txId = txBody.transactionIDOrThrow();
if (recordCache.get(txId) == null) {
// This method is not called at a super high rate, so synchronizing here is perfectly fine. We could have
// made the data structure serializable, but that doesn't actually help much here because we need to check
// for containment and then do a bunch of logic that might throw an exception before doing the `add`.
synchronized (submittedTxns) {
// We don't want to use another thread to prune the set, so we will take the opportunity here to do so.
// Remember that at this point we have passed through all the throttles, so this method is only called
// at most 10,000 / (Number of nodes) times per second, which is not a lot.
removeExpiredTransactions();

// If we have already submitted this transaction, then fail. Note that both of these calls will throw if
// the transaction is malformed. This should NEVER happen, because the transaction was already checked
// before we got here. But if it ever does happen, for any reason, we want it to happen BEFORE we submit,
// and BEFORE we record the transaction as a duplicate.
final var txId = txBody.transactionIDOrThrow();
if (submittedTxns.contains(txId)) {
throw new PreCheckException(DUPLICATE_TRANSACTION);
}

// This call to submit to the platform should almost always work. Maybe under extreme load it will fail,
// or while the system is being shut down. In any event, the user will receive an error code indicating
// that the transaction was not submitted and they can retry.
final var success = platform.createTransaction(PbjConverter.asBytes(payload));
if (success) {
recordCache.put(txBody.transactionIDOrThrow(), nodeSelfID);
submittedTxns.add(txId);
} else {
platformTxnRejections.cycle();
throw new PreCheckException(PLATFORM_TRANSACTION_NOT_CREATED);
}
}
}

/**
* Removes all expired {@link TransactionID}s from the cache. This method is not threadsafe and should only be
* called from within a block synchronized on {@link #submittedTxns}.
*/
private void removeExpiredTransactions() {
final var itr = submittedTxns.iterator();
// Compute the earliest valid start timestamp that is still within the max transaction duration window.
final var now = asTimestamp(Instant.now());
final var earliestValidState = minus(now, props.maxTxnDuration());

// Loop in order and expunge every entry where the timestamp is before the current time.
// Also remove the associated transaction from the submittedTxns set.
while (itr.hasNext()) {
final var txId = itr.next();
// If the timestamp is before the current time, then it has expired
if (isBefore(txId.transactionValidStartOrThrow(), earliestValidState)) {
itr.remove();
} else {
return;
}
}
}
}