Skip to content

Commit

Permalink
feat: Migrate transaction handling to framework (#11144)
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Littley <austin@swirldslabs.com>
  • Loading branch information
alittley committed Feb 5, 2024
1 parent eaca230 commit 9c1f467
Show file tree
Hide file tree
Showing 28 changed files with 887 additions and 1,743 deletions.
Expand Up @@ -16,8 +16,7 @@

package com.hedera.node.app.state.logging;

import static com.swirlds.platform.SwirldsPlatform.PLATFORM_THREAD_POOL_NAME;
import static com.swirlds.platform.eventhandling.ConsensusRoundHandler.THREAD_CONS_NAME;
import static com.swirlds.platform.eventhandling.ConsensusRoundHandler.TRANSACTION_HANDLING_THREAD_NAME;

import com.hedera.hapi.node.base.*;
import com.hedera.hapi.node.transaction.TransactionBody;
Expand Down Expand Up @@ -60,8 +59,6 @@
public final class TransactionStateLogger {
/** The logger we are using for Transaction State log */
private static final Logger logger = LogManager.getLogger(TransactionStateLogger.class);
/** The name of the handle transaction thread */
private static final String HANDLE_THREAD_NAME = "<" + PLATFORM_THREAD_POOL_NAME + ": " + THREAD_CONS_NAME;

/**
* Log the start of a round if it contains any non-system transactions.
Expand Down Expand Up @@ -235,7 +232,7 @@ public static void logEndTransactionRecord(
* @param <T> The type of the singleton
*/
public static <T> void logSingletonRead(@NonNull final String label, @Nullable final ValueLeaf<T> value) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(" READ singleton {} value {}", label, value == null ? "null" : value.getValue());
}
}
Expand All @@ -247,7 +244,7 @@ public static <T> void logSingletonRead(@NonNull final String label, @Nullable f
* @param value The value of the singleton
*/
public static void logSingletonWrite(@NonNull final String label, @Nullable final Object value) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(" WRITTEN singleton {} value {}", label, value == null ? "null" : value.toString());
}
}
Expand All @@ -261,7 +258,7 @@ public static void logSingletonWrite(@NonNull final String label, @Nullable fina
* @param value The value added to the queue
*/
public static void logQueueAdd(@NonNull final String label, @Nullable final Object value) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(" ADD to queue {} value {}", label, value == null ? "null" : value.toString());
}
}
Expand All @@ -273,7 +270,7 @@ public static void logQueueAdd(@NonNull final String label, @Nullable final Obje
* @param value The value removed from the queue
*/
public static void logQueueRemove(@NonNull final String label, @Nullable final Object value) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(" REMOVE from queue {} value {}", label, value == null ? "null" : value.toString());
}
}
Expand All @@ -285,7 +282,7 @@ public static void logQueueRemove(@NonNull final String label, @Nullable final O
* @param value The value peeked from the queue
*/
public static void logQueuePeek(@NonNull final String label, @Nullable final Object value) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(" PEEK on queue {} value {}", label, value == null ? "null" : value.toString());
}
}
Expand All @@ -298,7 +295,7 @@ public static void logQueuePeek(@NonNull final String label, @Nullable final Obj
* @param <K> The type of the queue values
*/
public static <K> void logQueueIterate(@NonNull final String label, @NonNull final FCQueue<ValueLeaf<K>> queue) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
if (queue.size() == 0) {
logger.debug(" ITERATE queue {} size 0 values:EMPTY", label);
} else {
Expand All @@ -325,7 +322,7 @@ public static <K> void logQueueIterate(@NonNull final String label, @NonNull fin
* @param <V> The type of the value
*/
public static <K, V> void logMapPut(@NonNull final String label, @NonNull final K key, @Nullable final V value) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(
" PUT into map {} key {} value {}",
label,
Expand All @@ -345,7 +342,7 @@ public static <K, V> void logMapPut(@NonNull final String label, @NonNull final
*/
public static <K, V> void logMapRemove(
@NonNull final String label, @NonNull final K key, @Nullable final InMemoryValue<K, V> value) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(
" REMOVE from map {} key {} removed value {}",
label,
Expand All @@ -365,7 +362,7 @@ public static <K, V> void logMapRemove(
*/
public static <K, V> void logMapRemove(
@NonNull final String label, @NonNull final K key, @Nullable final OnDiskValue<V> value) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(
" REMOVE from map {} key {} removed value {}",
label,
Expand All @@ -381,7 +378,7 @@ public static <K, V> void logMapRemove(
* @param size The size of the map
*/
public static void logMapGetSize(@NonNull final String label, final long size) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(" GET_SIZE on map {} size {}", label, size);
}
}
Expand All @@ -396,7 +393,7 @@ public static void logMapGetSize(@NonNull final String label, final long size) {
* @param <V> The type of the value
*/
public static <K, V> void logMapGet(@NonNull final String label, @NonNull final K key, @Nullable final V value) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(
" GET on map {} key {} value {}",
label,
Expand All @@ -416,7 +413,7 @@ public static <K, V> void logMapGet(@NonNull final String label, @NonNull final
*/
public static <K, V> void logMapGetForModify(
@NonNull final String label, @NonNull final K key, @Nullable final V value) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
logger.debug(
" GET_FOR_MODIFY on map {} key {} value {}",
label,
Expand All @@ -433,7 +430,7 @@ public static <K, V> void logMapGetForModify(
* @param <K> The type of the key
*/
public static <K> void logMapIterate(@NonNull final String label, @NonNull final Set<InMemoryKey<K>> keySet) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().startsWith(HANDLE_THREAD_NAME)) {
if (logger.isDebugEnabled() && Thread.currentThread().getName().equals(TRANSACTION_HANDLING_THREAD_NAME)) {
final long size = keySet.size();
if (size == 0) {
logger.debug(" ITERATE map {} size 0 keys:EMPTY", label);
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.swirlds.base.units.UnitConstants.SECONDS_TO_MILLISECONDS;
import static com.swirlds.logging.legacy.LogMarker.EVENT_STREAM;
import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;
import static com.swirlds.metrics.api.Metrics.INFO_CATEGORY;

import com.swirlds.base.time.Time;
Expand Down Expand Up @@ -47,7 +48,6 @@
* runningHash for consensus Events.
*/
public class EventStreamManager<T extends StreamAligned & Timestamped & RunningHashable & SerializableHashable> {
/** use this for all logging, as controlled by the optional data/log4j2.xml file */
private static final Logger logger = LogManager.getLogger(EventStreamManager.class);

/**
Expand All @@ -59,18 +59,21 @@ public class EventStreamManager<T extends StreamAligned & Timestamped & RunningH
* check whether this event is the last event before restart
*/
private final Predicate<T> isLastEventInFreezeCheck;
/** receives consensus events from multiStream, then passes to hashCalculator */
/**
* receives consensus events from multiStream, then passes to hashCalculator
*/
private QueueThreadObjectStream<T> hashQueueThread;
/**
* receives consensus events from hashQueueThread, calculates this event's Hash, then passes to
* runningHashCalculator
* receives consensus events from multiStream, then passes to streamFileWriter
*/
private HashCalculatorForStream<T> hashCalculator;
/** receives consensus events from multiStream, then passes to streamFileWriter */
private QueueThreadObjectStream<T> writeQueueThread;
/** receives consensus events from writeQueueThread, serializes consensus events to event stream files */
/**
* receives consensus events from writeQueueThread, serializes consensus events to event stream files
*/
private TimestampStreamFileWriter<T> streamFileWriter;
/** initialHash loaded from signed state */
/**
* initialHash loaded from signed state
*/
private Hash initialHash = new ImmutableHash(new byte[DigestType.SHA_384.digestLength()]);
/**
* When we freeze the platform, the last event to be written to EventStream file is the last event in the freeze
Expand Down Expand Up @@ -121,9 +124,9 @@ public EventStreamManager(
eventStreamDir,
eventsLogPeriod * SECONDS_TO_MILLISECONDS,
signer,
/** when event streaming is started after reconnect, or at state recovering,
* startWriteAtCompleteWindow should be set to be true; when event streaming is started after
* restart, it should be set to be false */
// when event streaming is started after reconnect, or at state recovering,
// startWriteAtCompleteWindow should be set to be true; when event streaming is started after
// restart, it should be set to be false
false,
EventStreamType.getInstance());

Expand Down Expand Up @@ -153,7 +156,10 @@ public EventStreamManager(

// receives consensus events from hashCalculator, calculates and set runningHash for this event
final RunningHashCalculatorForStream<T> runningHashCalculator = new RunningHashCalculatorForStream<>();
hashCalculator = new HashCalculatorForStream<>(runningHashCalculator);

// receives consensus events from hashQueueThread, calculates this event's Hash, then passes to
// runningHashCalculator
final HashCalculatorForStream<T> hashCalculator = new HashCalculatorForStream<>(runningHashCalculator);
hashQueueThread = new QueueThreadObjectStreamConfiguration<T>(threadManager)
.setNodeId(selfId)
.setComponent("event-stream")
Expand Down Expand Up @@ -197,42 +203,61 @@ public void stop() {
multiStream.close();
}

public void addEvents(final List<T> events) {
events.forEach(this::addEvent);
}

/**
* receives a consensus event from ConsensusRoundHandler each time, sends it to multiStream which then sends to two
* queueThread for calculating runningHash and writing to file
* Adds a list of events to the event stream.
*
* @param event the consensus event to be added
* @param events the list of events to add
*/
public void addEvent(final T event) {
if (!freezePeriodStarted) {
multiStream.addObject(event);
if (isLastEventInFreezeCheck.test(event)) {
freezePeriodStarted = true;
logger.info(
EVENT_STREAM.getMarker(),
"ConsensusTimestamp of the last Event to be written into file before restarting: " + "{}",
event::getTimestamp);
multiStream.close();
public void addEvents(@NonNull final List<T> events) {
events.forEach(event -> {
if (!freezePeriodStarted) {
multiStream.addObject(event);
if (isLastEventInFreezeCheck.test(event)) {
freezePeriodStarted = true;
logger.info(
EVENT_STREAM.getMarker(),
"ConsensusTimestamp of the last Event to be written into file before restarting: {}",
event::getTimestamp);
multiStream.close();
}
} else {
eventAfterFreezeLogger.warn(
EVENT_STREAM.getMarker(), "Event {} dropped after freezePeriodStarted!", event.getTimestamp());
}
} else {
eventAfterFreezeLogger.warn(
EVENT_STREAM.getMarker(), "Event {} dropped after freezePeriodStarted!", event.getTimestamp());
}
});
}

/**
* sets startWriteAtCompleteWindow: it should be set to be true after reconnect, or at state recovering; it should
* be set to be false at restart
* Updates the running hash with the given event hash. Called when a state is loaded.
*
* @param startWriteAtCompleteWindow whether the writer should not write until the first complete window
* @param runningEventHashUpdate the hash to update the running hash with
*/
public void setStartWriteAtCompleteWindow(final boolean startWriteAtCompleteWindow) {
public void updateRunningHash(@NonNull final RunningEventHashUpdate runningEventHashUpdate) {
try {
if (hashQueueThread != null) {
hashQueueThread.pause();
}
if (writeQueueThread != null) {
writeQueueThread.pause();
}
} catch (final InterruptedException e) {
logger.error(EXCEPTION.getMarker(), "Failed to pause queue threads", e);
Thread.currentThread().interrupt();
}

if (streamFileWriter != null) {
streamFileWriter.setStartWriteAtCompleteWindow(startWriteAtCompleteWindow);
streamFileWriter.setStartWriteAtCompleteWindow(runningEventHashUpdate.isReconnect());
}

initialHash = new Hash(runningEventHashUpdate.runningEventHash());
logger.info(EVENT_STREAM.getMarker(), "EventStreamManager::updateRunningHash: {}", initialHash);
multiStream.setRunningHash(initialHash);

if (hashQueueThread != null) {
hashQueueThread.resume();
}
if (writeQueueThread != null) {
writeQueueThread.resume();
}
}

Expand All @@ -241,75 +266,16 @@ public void setStartWriteAtCompleteWindow(final boolean startWriteAtCompleteWind
*
* @return current size of working queue for calculating hash and runningHash
*/
public int getHashQueueSize() {
if (hashQueueThread == null) {
return 0;
}
return hashQueueThread.getQueue().size();
private int getHashQueueSize() {
return hashQueueThread == null ? 0 : hashQueueThread.getQueue().size();
}

/**
* returns current size of working queue for writing to event stream files
*
* @return current size of working queue for writing to event stream files
*/
public int getEventStreamingQueueSize() {
private int getEventStreamingQueueSize() {
return writeQueueThread == null ? 0 : writeQueueThread.getQueue().size();
}

/**
* for unit testing
*
* @return current multiStream instance
*/
public MultiStream<T> getMultiStream() {
return multiStream;
}

/**
* for unit testing
*
* @return current TimestampStreamFileWriter instance
*/
public TimestampStreamFileWriter<T> getStreamFileWriter() {
return streamFileWriter;
}

/**
* for unit testing
*
* @return current HashCalculatorForStream instance
*/
public HashCalculatorForStream<T> getHashCalculator() {
return hashCalculator;
}

/**
* for unit testing
*
* @return whether freeze period has started
*/
public boolean getFreezePeriodStarted() {
return freezePeriodStarted;
}

/**
* for unit testing
*
* @return a copy of initialHash
*/
public Hash getInitialHash() {
return new Hash(initialHash);
}

/**
* sets initialHash after loading from signed state
*
* @param initialHash current runningHash of all consensus events
*/
public void setInitialHash(final Hash initialHash) {
this.initialHash = initialHash;
logger.info(EVENT_STREAM.getMarker(), "EventStreamManager::setInitialHash: {}", () -> initialHash);
multiStream.setRunningHash(initialHash);
}
}

0 comments on commit 9c1f467

Please sign in to comment.