Skip to content

Commit

Permalink
Lots more testing and fixing, happy now
Browse files Browse the repository at this point in the history
Signed-off-by: jasperpotts <jasperpotts@users.noreply.github.com>
  • Loading branch information
jasperpotts committed Jun 16, 2023
1 parent 855c8b4 commit de71ede
Show file tree
Hide file tree
Showing 12 changed files with 450 additions and 857 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public final class BlockRecordManagerImpl implements BlockRecordManager {
/** The hash size in bytes, normally 48 for SHA384 */
private static final int HASH_SIZE = DigestType.SHA_384.digestLength();
/** how many block hash history to keep in memory */
private static final int NUM_BLOCK_HASHES_TO_KEEP = 256;
public static final int NUM_BLOCK_HASHES_TO_KEEP = 256;
/** Number of blocks to keep multiplied by hash size */
private static final int NUM_BLOCK_HASHES_TO_KEEP_BYTES = NUM_BLOCK_HASHES_TO_KEEP * HASH_SIZE;

Expand Down Expand Up @@ -174,7 +174,6 @@ public void startUserTransaction(Instant consensusTime, HederaState state) {
final Timestamp firstConsTimeOfLastBlock = blockInfo.firstConsTimeOfLastBlock();
if (firstConsTimeOfLastBlock == null || firstConsTimeOfLastBlock.seconds() == 0) {
// we are in genesis, so create a new block 1
System.out.println("GENESIS firstConsTimeOfLastBlock= " + firstConsTimeOfLastBlock);
streamFileProducer.switchBlocks(0, 1, consensusTime);
// set this transaction as the first transaction in the current block
provisionalCurrentBlockFirstTransactionTime = consensusTime;
Expand All @@ -188,29 +187,21 @@ public void startUserTransaction(Instant consensusTime, HederaState state) {
// we are in a new block, so close the previous one if we are not the first block after startup
// otherwise load the block info from state for the last block before the node was started.
final long lastBlockNo;
final Bytes lastBlockHashBytes;
if (provisionalCurrentBlockFirstTransactionTime == null) {
// last block was the last block saved in state before the node started, so load it from state
lastBlockNo = blockInfo.lastBlockNo();
lastBlockHashBytes = getLastBlockHash(blockInfo);
System.out.println("***************************************** !!!!");
} else {
// last block has finished so compute newly closed block number
lastBlockNo = blockInfo.lastBlockNo() + 1;
// compute block hash of the newly closed block, this is the running hash after the last transaction
// record.
lastBlockHashBytes = streamFileProducer.getCurrentRunningHash();
System.out.println("lastBlockNo = " + lastBlockNo);
final Bytes lastBlockHashBytes = streamFileProducer.getRunningHash();
assert lastBlockHashBytes != null
: "lastBlockHashBytes should never be null, we only get here "
+ "if we have completed a block";
// update the first transaction time of the last block
final Instant lastBlockFirstTransactionTime = provisionalCurrentBlockFirstTransactionTime;
// update BlockInfo state
System.out.println(
"updateBlockInfo(blockInfo, lastBlockNo, lastBlockFirstTransactionTime, lastBlockHashBytes) = "
+ updateBlockInfo(
blockInfo, lastBlockNo, lastBlockFirstTransactionTime, lastBlockHashBytes));
System.out.println(
" lastBlockFirstTransactionTime = " + lastBlockFirstTransactionTime.getEpochSecond()
+ " :: " + lastBlockFirstTransactionTime.getNano());
blockInfoWritableSingletonState.put(
updateBlockInfo(blockInfo, lastBlockNo, lastBlockFirstTransactionTime, lastBlockHashBytes));
// log end of block if needed
Expand Down Expand Up @@ -253,8 +244,8 @@ public void endUserTransaction(
@NonNull final Stream<SingleTransactionRecord> recordStreamItems, @NonNull final HederaState state) {
// check if we need to run event recovery before we can write any new records to stream
if (!this.eventRecoveryCompleted) {
// TODO create event recovery class and call it here
// TODO should this be in startUserTransaction()?
// FUTURE create event recovery class and call it here
// FUTURE should this be in startUserTransaction()?
this.eventRecoveryCompleted = true;
}
// pass to record stream writer to handle
Expand All @@ -271,7 +262,7 @@ public void endUserTransaction(
public void endRound(HederaState hederaState) {
// We get the latest running hash from the StreamFileProducer blocking if needed
// for it to be computed.
final Bytes currentRunningHash = streamFileProducer.getCurrentRunningHash();
final Bytes currentRunningHash = streamFileProducer.getRunningHash();
// update running hashes in state with the latest running hash and the previous 3
// running hashes.
final WritableStates states = hederaState.createWritableStates(BlockRecordService.NAME);
Expand All @@ -280,8 +271,10 @@ public void endRound(HederaState hederaState) {
final RunningHashes existingRunningHashes = runningHashesState.get();
if (existingRunningHashes == null) {
runningHashesState.put(new RunningHashes(currentRunningHash, null, null, null));
} else if (!currentRunningHash.equals(
existingRunningHashes.runningHash())) { // only update if running hash has changed
} else if (currentRunningHash != null
&& !currentRunningHash.equals(
existingRunningHashes
.runningHash())) { // only update if running hash has changed and is not null
runningHashesState.put(new RunningHashes(
currentRunningHash,
existingRunningHashes.runningHash(),
Expand All @@ -302,22 +295,7 @@ public void endRound(HederaState hederaState) {
@Nullable
@Override
public Bytes getRunningHash() {
System.out.println("BlockRecordManagerImpl.getRunningHash");
final var hederaState = workingStateAccessor.getHederaState();
if (hederaState == null) {
throw new RuntimeException("HederaState is null. This can only happen very early during bootstrapping");
}
final var states = hederaState.createReadableStates(BlockRecordService.NAME);
final ReadableSingletonState<RunningHashes> runningHashesState =
states.getSingleton(BlockRecordService.RUNNING_HASHES_STATE_KEY);
final RunningHashes runningHashes = runningHashesState.get();
System.out.println(" runningHashes.runningHash() = "
+ runningHashes.runningHash().toHex());
if (runningHashes != null) {
return runningHashes.runningHash();
} else {
return null;
}
return streamFileProducer.getRunningHash();
}

/**
Expand All @@ -329,19 +307,7 @@ public Bytes getRunningHash() {
@Nullable
@Override
public Bytes getNMinus3RunningHash() {
final var hederaState = workingStateAccessor.getHederaState();
if (hederaState == null) {
throw new RuntimeException("HederaState is null. This can only happen very early during bootstrapping");
}
final var states = hederaState.createReadableStates(BlockRecordService.NAME);
final ReadableSingletonState<RunningHashes> runningHashesState =
states.getSingleton(BlockRecordService.RUNNING_HASHES_STATE_KEY);
final RunningHashes runningHashes = runningHashesState.get();
if (runningHashes != null) {
return runningHashes.nMinus3RunningHash();
} else {
return null;
}
return streamFileProducer.getNMinus3RunningHash();
}

// ========================================================================================================
Expand Down Expand Up @@ -439,10 +405,10 @@ public Bytes blockHashByBlockNumber(long blockNo) {
return null;
}
final long lastBlockNo = blockInfo.lastBlockNo();
final long firstAvailableBlockNo = lastBlockNo - blocksAvailable;
final long firstAvailableBlockNo = lastBlockNo - blocksAvailable + 1;
// If blocksAvailable == 0, then firstAvailable == blockNo; and all numbers are
// either less than or greater than or equal to blockNo, so we return unavailable
if (blockNo < firstAvailableBlockNo || blockNo >= lastBlockNo) {
if (blockNo < firstAvailableBlockNo || blockNo > lastBlockNo) {
return null;
} else {
long offset = (blockNo - firstAvailableBlockNo) * HASH_SIZE;
Expand All @@ -453,26 +419,14 @@ public Bytes blockHashByBlockNumber(long blockNo) {
// ========================================================================================================
// Private Methods

/**
* Get the block period from consensus timestamp. Based on {@link LinkedObjectStreamUtilities#getPeriod(Instant, long)}
* but updated to work on {@link Timestamp}.
*
* @param consensusTimestamp The consensus timestamp
* @return The block period from epoc the consensus timestamp is in
*/
public long getBlockPeriod(Timestamp consensusTimestamp) {
long nanos = consensusTimestamp.seconds() * 1000000000L + (long) consensusTimestamp.nanos();
return nanos / 1000000L / blockPeriodInMilliSeconds;
}

/**
* Get the block period from consensus timestamp. Based on {@link LinkedObjectStreamUtilities#getPeriod(Instant, long)}
* but updated to work on {@link Instant}.
*
* @param consensusTimestamp The consensus timestamp
* @return The block period from epoc the consensus timestamp is in
*/
public long getBlockPeriod(Instant consensusTimestamp) {
private long getBlockPeriod(Instant consensusTimestamp) {
if (consensusTimestamp == null) return 0;
long nanos = consensusTimestamp.getEpochSecond() * 1000000000L + (long) consensusTimestamp.getNano();
return nanos / 1000000L / blockPeriodInMilliSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public boolean writeTransactionSidecarRecord(
bytesWritten += transactionSidecarRecord.length();
// write protobuf format to file
// TODO can change once https://github.com/hashgraph/pbj/issues/44 is fixed to:
// ProtoWriterTools.writeTag(outputStream, SIDECAR_RECORDS, WIRE_TYPE_DELIMITED);;
// ProtoWriterTools.writeTag(outputStream, SIDECAR_RECORDS, WIRE_TYPE_DELIMITED);
outputStream.writeVarInt((SIDECAR_RECORDS.number() << TAG_TYPE_BITS) | WIRE_TYPE_DELIMITED, false);
outputStream.writeVarInt((int) transactionSidecarRecord.length(), false);
outputStream.writeBytes(transactionSidecarRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,24 @@ public StreamFileProducerBase(
public abstract void setRunningHash(Bytes recordStreamItemRunningHash);

/**
* Get the current running hash of record stream items. This is called on handle transaction thread. It will block if background thread is still hashing.
* Get the current running hash of record stream items. This is called on handle transaction thread. It will block
* if background thread is still hashing. It will always return the running hash after the last user transaction
* was added. Hence, any pre-transactions or others not yet committed via
* {@link StreamFileProducerBase#writeRecordStreamItems(long, Instant, Stream)} will not be included.
*
* @return The current running hash upto and including the last record stream item sent in writeRecordStreamItems().
*/
public abstract Bytes getCurrentRunningHash();
@Nullable
public abstract Bytes getRunningHash();

/**
* Get the previous, previous, previous runningHash of all RecordStreamObject. This will block if
* the running hash has not yet been computed for the most recent user transaction.
*
* @return the previous, previous, previous runningHash of all RecordStreamObject
*/
@Nullable
public abstract Bytes getNMinus3RunningHash();

/**
* Called at the end of a block and start of next block.
Expand All @@ -149,7 +162,9 @@ public abstract void switchBlocks(

/**
* Write record items to stream files. They must be in exact consensus time order! This must only be called after the user
* transaction has been committed to state and is 100% done.
* transaction has been committed to state and is 100% done. So is called exactly once per user transaction at the end
* after it has been committed to state. Each call is for a complete set of transactions that represent a single user
* transaction and its pre-transactions and child-transactions.
*
* @param blockNumber the block number for this block that we are writing record stream items for
* @param blockFirstTransactionConsensusTime the consensus time of the first transaction in the block
Expand Down Expand Up @@ -189,10 +204,6 @@ protected final void closeBlock(
@NonNull final RecordFileWriter currentRecordFileWriter,
@Nullable final List<SidecarFileWriter> sidecarFileWriters) {
final long blockNumber = currentRecordFileWriter.blockNumber();
System.out.println(
"closeBlock: blockNumber=" + blockNumber + ", startingRunningHash=" + startingRunningHash.toHex()
+ ", finalRunningHash=" + finalRunningHash.toHex() + ", currentRecordFileWriter="
+ currentRecordFileWriter + ", sidecarFileWriters=" + sidecarFileWriters);
try {
// close any open sidecar writers and add sidecar metadata to record file
final List<SidecarMetadata> sidecarMetadata = new ArrayList<>();
Expand Down Expand Up @@ -320,9 +331,7 @@ protected final List<SidecarFileWriter> handleSidecarItems(
*/
protected Path getRecordFilePath(final Instant consensusTime) {
return nodeScopedRecordLogDir.resolve(convertInstantToStringWithPadding(consensusTime) + "." + RECORD_EXTENSION
+ (compressFilesOnCreation
? COMPRESSION_ALGORITHM_EXTENSION
: "")); // TODO I assume we need extension here, what happens for signature file names?
+ (compressFilesOnCreation ? COMPRESSION_ALGORITHM_EXTENSION : ""));
}

/**
Expand Down
Loading

0 comments on commit de71ede

Please sign in to comment.