Skip to content

Commit

Permalink
Sync with develop. Fixed hammer test CloseFlushTest
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Ananev <artem.ananev@swirldslabs.com>
  • Loading branch information
artemananiev committed May 3, 2023
2 parents c401e6f + eeccd5e commit 22de0d7
Show file tree
Hide file tree
Showing 43 changed files with 2,366 additions and 190 deletions.
Expand Up @@ -86,6 +86,7 @@
import com.swirlds.common.system.NodeId;
import com.swirlds.common.system.Platform;
import com.swirlds.common.system.state.notifications.IssListener;
import com.swirlds.common.system.state.notifications.NewRecoveredStateListener;
import com.swirlds.common.system.state.notifications.NewSignedStateListener;
import dagger.BindsInstance;
import dagger.Component;
Expand Down Expand Up @@ -206,6 +207,8 @@ public interface ServicesApp {

NewSignedStateListener newSignedStateListener();

Optional<NewRecoveredStateListener> maybeNewRecoveredStateListener();

Supplier<NotificationEngine> notificationEngine();

BackingStore<AccountID, HederaAccount> backingAccounts();
Expand Down
Expand Up @@ -87,6 +87,7 @@
import com.swirlds.common.system.SwirldState;
import com.swirlds.common.system.address.AddressBook;
import com.swirlds.common.system.events.Event;
import com.swirlds.common.system.state.notifications.NewRecoveredStateListener;
import com.swirlds.common.threading.manager.AdHocThreadManager;
import com.swirlds.fchashmap.FCHashMap;
import com.swirlds.jasperdb.VirtualDataSourceJasperDB;
Expand Down Expand Up @@ -353,6 +354,8 @@ private ServicesApp internalInit(
.build();
APPS.save(selfId, app);
}
app.maybeNewRecoveredStateListener().ifPresent(listener -> platform.getNotificationEngine()
.register(NewRecoveredStateListener.class, listener));

if (dualState == null) {
dualState = new DualStateImpl();
Expand Down
Expand Up @@ -35,6 +35,7 @@
import com.hedera.node.app.service.mono.state.expiry.ExpiringCreations;
import com.hedera.node.app.service.mono.state.exports.AccountsExporter;
import com.hedera.node.app.service.mono.state.exports.BalancesExporter;
import com.hedera.node.app.service.mono.state.exports.ExportingRecoveredStateListener;
import com.hedera.node.app.service.mono.state.exports.ServicesSignedStateListener;
import com.hedera.node.app.service.mono.state.exports.SignedStateBalancesExporter;
import com.hedera.node.app.service.mono.state.exports.ToStringAccountsExporter;
Expand Down Expand Up @@ -67,6 +68,7 @@
import com.hedera.node.app.service.mono.state.virtual.VirtualBlobValue;
import com.hedera.node.app.service.mono.state.virtual.VirtualMapFactory;
import com.hedera.node.app.service.mono.store.schedule.ScheduleStore;
import com.hedera.node.app.service.mono.stream.RecordStreamManager;
import com.hedera.node.app.service.mono.stream.RecordsRunningHashLeaf;
import com.hedera.node.app.service.mono.utils.EntityNum;
import com.hedera.node.app.service.mono.utils.JvmSystemExits;
Expand All @@ -80,15 +82,18 @@
import com.swirlds.common.notification.listeners.PlatformStatusChangeListener;
import com.swirlds.common.notification.listeners.ReconnectCompleteListener;
import com.swirlds.common.notification.listeners.StateWriteToDiskCompleteListener;
import com.swirlds.common.system.InitTrigger;
import com.swirlds.common.system.NodeId;
import com.swirlds.common.system.Platform;
import com.swirlds.common.system.address.AddressBook;
import com.swirlds.common.system.state.notifications.IssListener;
import com.swirlds.common.system.state.notifications.NewRecoveredStateListener;
import com.swirlds.common.system.state.notifications.NewSignedStateListener;
import com.swirlds.common.utility.CommonUtils;
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.PrintStream;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -155,6 +160,20 @@ static BalancesExporter bindBalancesExporter(
}
}

@Provides
@Singleton
static Optional<NewRecoveredStateListener> provideMaybeRecoveredStateListener(
@NonNull final InitTrigger initTrigger,
@NonNull final RecordStreamManager recordStreamManager,
@NonNull final BalancesExporter balancesExporter,
@NonNull final NodeId nodeId) {
if (initTrigger == InitTrigger.EVENT_STREAM_RECOVERY) {
return Optional.of(new ExportingRecoveredStateListener(recordStreamManager, balancesExporter, nodeId));
} else {
return Optional.empty();
}
}

@Binds
@Singleton
SystemFilesManager bindSysFilesManager(HfsSystemFilesManager hfsSystemFilesManager);
Expand Down
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.node.app.service.mono.state.exports;

import com.hedera.node.app.service.mono.stream.RecordStreamManager;
import com.swirlds.common.system.InitTrigger;
import com.swirlds.common.system.NodeId;
import com.swirlds.common.system.state.notifications.NewRecoveredStateListener;
import com.swirlds.common.system.state.notifications.NewRecoveredStateNotification;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;

/**
* A tiny {@link NewRecoveredStateListener}, registered <i>only</i> when {@code ServicesState#init()} is called with
* {@link InitTrigger#EVENT_STREAM_RECOVERY}, that upon receiving a recovered state notification
* <ol>
* <li>Freezes the record stream (hence flushing any pending items to a final file); and,</li>
* <li>Exports the account balances from the recovered state.</li>
* </ol>
*/
public class ExportingRecoveredStateListener implements NewRecoveredStateListener {
private final RecordStreamManager recordStreamManager;
private final BalancesExporter balancesExporter;
private final NodeId nodeId;

public ExportingRecoveredStateListener(
@NonNull final RecordStreamManager recordStreamManager,
@NonNull final BalancesExporter balancesExporter,
@NonNull final NodeId nodeId) {
this.recordStreamManager = Objects.requireNonNull(recordStreamManager);
this.balancesExporter = Objects.requireNonNull(balancesExporter);
this.nodeId = Objects.requireNonNull(nodeId);
}

@Override
public void notify(@NonNull final NewRecoveredStateNotification notification) {
recordStreamManager.setInFreeze(true);
balancesExporter.exportBalancesFrom(
notification.getSwirldState(), notification.getConsensusTimestamp(), nodeId);
}
}
Expand Up @@ -43,6 +43,7 @@
import java.io.InputStreamReader;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HexFormat;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -96,9 +97,43 @@ public BlocklistAccountCreator(
* Makes sure that all blocked accounts contained in the blocklist resource are present in state, and creates them if necessary.
*/
public void createMissingAccounts() {
final List<String> fileLines = readFileLines();
if (fileLines.isEmpty()) return;

final List<BlockedInfo> blocklist = parseBlockList(fileLines);

if (!blocklist.isEmpty()) {
final var blockedToCreate = blocklist.stream()
.filter(blockedAccount ->
aliasManager.lookupIdBy(blockedAccount.evmAddress).equals(MISSING_NUM))
.collect(Collectors.toSet());

for (final var blockedInfo : blockedToCreate) {
final var newId = ids.newAccountId(); // get the next available new account ID
final var account = blockedAccountWith(blockedInfo);
accounts.put(newId, account); // add the account with the corresponding newId to state
accountsCreated.add(account); // add the account to the list of accounts created by this class
aliasManager.link(
blockedInfo.evmAddress,
EntityNum.fromAccountId(newId)); // link the EVM address alias to the new account ID
}
}
}

private List<String> readFileLines() {
final List<String> fileLines;
try {
fileLines = readPrivateKeyBlocklist(blocklistResourceName);
} catch (Exception e) {
log.error("Failed to read blocklist resource {}", blocklistResourceName, e);
return Collections.emptyList();
}
return fileLines;
}

private List<BlockedInfo> parseBlockList(final List<String> fileLines) {
final List<BlockedInfo> blocklist;
try {
final var fileLines = readPrivateKeyBlocklist(blocklistResourceName);
final var columnHeaderLine = fileLines.get(0);
final var blocklistLines = fileLines.subList(1, fileLines.size());
final var columnCount = columnHeaderLine.split(",").length;
Expand All @@ -107,26 +142,9 @@ public void createMissingAccounts() {
.toList();
} catch (IllegalArgumentException iae) {
log.error("Failed to parse blocklist", iae);
return;
} catch (Exception e) {
log.error("Failed to read blocklist resource {}", blocklistResourceName, e);
return;
}

final var blockedToCreate = blocklist.stream()
.filter(blockedAccount ->
aliasManager.lookupIdBy(blockedAccount.evmAddress).equals(MISSING_NUM))
.collect(Collectors.toSet());

for (final var blockedInfo : blockedToCreate) {
final var newId = ids.newAccountId(); // get the next available new account ID
final var account = blockedAccountWith(blockedInfo);
accounts.put(newId, account); // add the account with the corresponding newId to state
accountsCreated.add(account); // add the account to the list of accounts created by this class
aliasManager.link(
blockedInfo.evmAddress,
EntityNum.fromAccountId(newId)); // link the EVM address alias to the new account ID
return Collections.emptyList();
}
return blocklist;
}

@NonNull
Expand Down
Expand Up @@ -89,7 +89,7 @@ public ServicesTxnManager(
this.blocklistAccountCreator = Objects.requireNonNull(blocklistAccountCreator);
}

private boolean needToPublishMigrationRecords = true;
private boolean isFirstTransaction = true;
private boolean createdStreamableRecord;

public void process(TxnAccessor accessor, Instant consensusTime, long submittingMember) {
Expand All @@ -104,7 +104,7 @@ public void process(TxnAccessor accessor, Instant consensusTime, long submitting
rewardCalculator.reset();
ledger.begin();

if (needToPublishMigrationRecords) {
if (isFirstTransaction) {
if (bootstrapProperties.getBooleanProperty(PropertyNames.ACCOUNTS_BLOCKLIST_ENABLED)) {
blocklistAccountCreator.createMissingAccounts();
}
Expand All @@ -113,7 +113,7 @@ public void process(TxnAccessor accessor, Instant consensusTime, long submitting
// state) shows that it needs to do so; our responsibility here is just to give it
// the opportunity
migrationRecordsManager.publishMigrationRecords(consensusTime);
needToPublishMigrationRecords = false;
isFirstTransaction = false;
}
if (accessor.isTriggeredTxn()) {
scopedTriggeredProcessing.run();
Expand Down
Expand Up @@ -44,6 +44,7 @@
import com.hedera.node.app.service.mono.ledger.backing.BackingAccounts;
import com.hedera.node.app.service.mono.sigs.EventExpansion;
import com.hedera.node.app.service.mono.state.DualStateAccessor;
import com.hedera.node.app.service.mono.state.exports.ExportingRecoveredStateListener;
import com.hedera.node.app.service.mono.state.exports.ServicesSignedStateListener;
import com.hedera.node.app.service.mono.state.exports.SignedStateBalancesExporter;
import com.hedera.node.app.service.mono.state.exports.ToStringAccountsExporter;
Expand Down Expand Up @@ -157,6 +158,9 @@ void objectGraphRootsAreAvailable() {
// Since we gave InitTrigger.EVENT_STREAM_RECOVERY, the record stream manager
// should be instantiated with a recovery writer
assertNotNull(subject.recordStreamManager().getRecoveryRecordsWriter());
final var maybeRecoveredStateListener = subject.maybeNewRecoveredStateListener();
assertTrue(maybeRecoveredStateListener.isPresent());
assertThat(maybeRecoveredStateListener.get(), instanceOf(ExportingRecoveredStateListener.class));
assertThat(subject.globalDynamicProperties(), instanceOf(GlobalDynamicProperties.class));
assertThat(subject.grpc(), instanceOf(NettyGrpcServerManager.class));
assertThat(subject.platformStatus(), instanceOf(CurrentPlatformStatus.class));
Expand Down
Expand Up @@ -53,6 +53,7 @@
import com.hedera.node.app.service.mono.ledger.accounts.staking.StakeStartupHelper;
import com.hedera.node.app.service.mono.sigs.EventExpansion;
import com.hedera.node.app.service.mono.state.DualStateAccessor;
import com.hedera.node.app.service.mono.state.exports.ExportingRecoveredStateListener;
import com.hedera.node.app.service.mono.state.forensics.HashLogger;
import com.hedera.node.app.service.mono.state.initialization.SystemAccountsCreator;
import com.hedera.node.app.service.mono.state.initialization.SystemFilesManager;
Expand Down Expand Up @@ -91,6 +92,7 @@
import com.swirlds.common.crypto.RunningHash;
import com.swirlds.common.crypto.engine.CryptoEngine;
import com.swirlds.common.metrics.noop.NoOpMetrics;
import com.swirlds.common.notification.NotificationEngine;
import com.swirlds.common.system.InitTrigger;
import com.swirlds.common.system.NodeId;
import com.swirlds.common.system.Platform;
Expand All @@ -100,6 +102,7 @@
import com.swirlds.common.system.address.Address;
import com.swirlds.common.system.address.AddressBook;
import com.swirlds.common.system.events.Event;
import com.swirlds.common.system.state.notifications.NewRecoveredStateListener;
import com.swirlds.fchashmap.FCHashMap;
import com.swirlds.merkle.map.MerkleMap;
import com.swirlds.platform.state.DualStateImpl;
Expand All @@ -110,6 +113,7 @@
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -195,6 +199,12 @@ class ServicesStateTest extends ResponsibleVMapUser {
@Mock
private VirtualMapFactory virtualMapFactory;

@Mock
private ExportingRecoveredStateListener recoveredStateListener;

@Mock
private NotificationEngine notificationEngine;

@Mock
private ServicesState.StakingInfoBuilder stakingInfoBuilder;

Expand Down Expand Up @@ -531,6 +541,8 @@ void nonGenesisInitReusesContextIfPresent() {
given(app.dualStateAccessor()).willReturn(dualStateAccessor);
given(platform.getSelfId()).willReturn(selfId);
given(platform.getAddressBook()).willReturn(addressBook);
given(app.maybeNewRecoveredStateListener()).willReturn(Optional.of(recoveredStateListener));
given(platform.getNotificationEngine()).willReturn(notificationEngine);
// and:
APPS.save(selfId.getId(), app);

Expand All @@ -543,6 +555,7 @@ void nonGenesisInitReusesContextIfPresent() {
// and:
verify(initFlow).runWith(eq(subject), any());
verify(hashLogger).logHashesFor(subject);
verify(notificationEngine).register(NewRecoveredStateListener.class, recoveredStateListener);
}

@Test
Expand Down
Expand Up @@ -21,14 +21,21 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

import com.hedera.node.app.service.mono.config.NetworkInfo;
import com.hedera.node.app.service.mono.context.MutableStateChildren;
import com.hedera.node.app.service.mono.context.properties.PropertySource;
import com.hedera.node.app.service.mono.state.exports.BalancesExporter;
import com.hedera.node.app.service.mono.state.exports.ExportingRecoveredStateListener;
import com.hedera.node.app.service.mono.store.schedule.ScheduleStore;
import com.hedera.node.app.service.mono.stream.RecordStreamManager;
import com.swirlds.common.system.InitTrigger;
import com.swirlds.common.system.NodeId;
import com.swirlds.common.system.Platform;
import com.swirlds.common.utility.CommonUtils;
import java.nio.charset.Charset;
Expand All @@ -53,7 +60,12 @@ class StateModuleTest {
private NetworkInfo networkInfo;

@Mock
private StateModule.ConsoleCreator consoleCreator;
private RecordStreamManager recordStreamManager;

@Mock
private BalancesExporter balancesExporter;

private final NodeId nodeId = new NodeId(false, 0);

@Test
void providesDefaultCharset() {
Expand Down Expand Up @@ -108,4 +120,21 @@ void failsWithClearlyInvalidGenesisKey() {
final var keySupplier = StateModule.provideSystemFileKey(properties);
assertThrows(IllegalStateException.class, keySupplier::get);
}

@Test
void providesNoRecoveredStateListenerIfNotInEventRecovery() {
final var maybeListener = StateModule.provideMaybeRecoveredStateListener(
InitTrigger.GENESIS, recordStreamManager, balancesExporter, nodeId);

assertTrue(maybeListener.isEmpty());
}

@Test
void providesRecoveredStateListenerIfNotInEventRecovery() {
final var maybeListener = StateModule.provideMaybeRecoveredStateListener(
InitTrigger.EVENT_STREAM_RECOVERY, recordStreamManager, balancesExporter, nodeId);

assertTrue(maybeListener.isPresent());
assertInstanceOf(ExportingRecoveredStateListener.class, maybeListener.get());
}
}

0 comments on commit 22de0d7

Please sign in to comment.