Skip to content

Commit

Permalink
Merge pull request #101 from catbref/master
Browse files Browse the repository at this point in the history
Fixes and improvements in sync and shutdown
  • Loading branch information
catbref committed Sep 26, 2018
2 parents 4b5f6e9 + f74e4b2 commit e0a3a5b
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 88 deletions.
8 changes: 5 additions & 3 deletions Qora/src/api/QoraResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.json.simple.JSONValue;

import controller.Controller;
import gui.ClosingDialog;
import gui.Gui;
import lang.Lang;
import settings.Settings;
import utils.APIUtils;
Expand All @@ -31,11 +33,11 @@ public String stop()
if(Controller.getInstance().doesWalletExists() && !Controller.getInstance().isWalletUnlocked()) {
throw ApiErrorFactory.getInstance().createError(ApiErrorFactory.ERROR_WALLET_LOCKED);
}

//STOP
Controller.getInstance().stopAll();
Controller.getInstance().stopAll();
System.exit(0);

//RETURN
return String.valueOf(true);
}
Expand Down
191 changes: 140 additions & 51 deletions Qora/src/controller/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -85,17 +86,18 @@
import utils.ObserverMessage;
import utils.Pair;
import utils.SysTray;
import utils.TransactionTimestampComparator;
import utils.UpdateUtil;
import webserver.WebService;

public class Controller extends Observable {

private static final Logger LOGGER = LogManager.getLogger(Controller.class);
private String version = "0.26.10";
private String buildTime = "2018-06-13 07:57:00 UTC";
private String version = "0.26.11";
private String buildTime = "2018-09-25 12:29:00 UTC";
private long buildTimestamp;

public static final String releaseVersion = "0.26.10";
public static final String releaseVersion = "0.26.11";

// TODO ENUM would be better here
public static final int STATUS_NO_CONNECTIONS = 0;
Expand Down Expand Up @@ -408,7 +410,7 @@ public void run() {
}
});

// TIMER TO SEND HEIGHT TO NETWORK EVERY 5 MIN
// TIMER TO SEND HEIGHT TO RANDOM PEER

this.timerPeerHeightUpdate.cancel();
this.timerPeerHeightUpdate = new Timer();
Expand All @@ -426,7 +428,7 @@ public void run() {
}
};

this.timerPeerHeightUpdate.schedule(action, 5 * 60 * 1000, 5 * 60 * 1000);
this.timerPeerHeightUpdate.schedule(action, 30 * 1000, 30 * 1000);

// REGISTER DATABASE OBSERVER
this.addObserver(DBSet.getInstance().getTransactionMap());
Expand Down Expand Up @@ -579,44 +581,50 @@ public void deleteWalletObserver(Observer o) {
}

private boolean isStopping = false;
private Object stoppingLock = new Object();

public void stopAll() {
// PREVENT MULTIPLE CALLS
if (!this.isStopping) {
this.isStopping = true;

// STOP SENDING OUR HEIGHT TO PEERS
this.timerPeerHeightUpdate.cancel();

// STOP BLOCK PROCESSOR
LOGGER.info(Lang.getInstance().translate("Stopping block processor"));
ClosingDialog.getInstance().updateProgress("Stopping block processor");
this.synchronizer.stop();

// STOP BLOCK GENERATOR
LOGGER.info(Lang.getInstance().translate("Stopping block generator"));
ClosingDialog.getInstance().updateProgress("Stopping block generator");
this.blockGenerator.shutdown();

// STOP MESSAGE PROCESSOR
LOGGER.info(Lang.getInstance().translate("Stopping message processor"));
ClosingDialog.getInstance().updateProgress("Stopping message processor");
this.network.stop();

// CLOSE DATABASE
LOGGER.info(Lang.getInstance().translate("Closing database"));
ClosingDialog.getInstance().updateProgress("Closing database");
DBSet.getInstance().close();
// Prevent multiple calls.
// This method can be called via JVM shutdown hook (e.g. signal), API 'stop' or GUI 'exit', among others.
// In particular, ClosingDialog.getInstance() below can trigger a call to stopAll().
// We want all successive calls to block until the first call has finished.
synchronized (this.stoppingLock) {
if (!this.isStopping) {
this.isStopping = true;

// STOP SENDING OUR HEIGHT TO PEERS
this.timerPeerHeightUpdate.cancel();

// STOP BLOCK PROCESSOR
LOGGER.info(Lang.getInstance().translate("Stopping block processor"));
ClosingDialog.getInstance().updateProgress("Stopping block processor");
this.synchronizer.stop();

// STOP BLOCK GENERATOR
LOGGER.info(Lang.getInstance().translate("Stopping block generator"));
ClosingDialog.getInstance().updateProgress("Stopping block generator");
this.blockGenerator.shutdown();

// STOP MESSAGE PROCESSOR
LOGGER.info(Lang.getInstance().translate("Stopping message processor"));
ClosingDialog.getInstance().updateProgress("Stopping message processor");
this.network.stop();

// CLOSE DATABASE
LOGGER.info(Lang.getInstance().translate("Closing database"));
ClosingDialog.getInstance().updateProgress("Closing database");
DBSet.getInstance().close();

// CLOSE WALLET
LOGGER.info(Lang.getInstance().translate("Closing wallet"));
ClosingDialog.getInstance().updateProgress("Closing wallet");
this.wallet.close();
// CLOSE WALLET
LOGGER.info(Lang.getInstance().translate("Closing wallet"));
ClosingDialog.getInstance().updateProgress("Closing wallet");
this.wallet.close();

ClosingDialog.getInstance().updateProgress("Creating database backup");
createDataCheckpoint();
ClosingDialog.getInstance().updateProgress("Creating database backup");
createDataCheckpoint();

LOGGER.info(Lang.getInstance().translate("Closed."));
LOGGER.info(Lang.getInstance().translate("Closed."));
}
}
}

Expand Down Expand Up @@ -649,6 +657,19 @@ public List<Peer> getActivePeers() {
}

public void walletSyncStatusUpdate(int height) {
/*
* Prevent deadlock when a new block arrives from network while we're resyncing wallet.
*
* New block arrival locks Controller and wants Synchronizer,
* but it's possible Synchronizer is locked (e.g. by BlockGenerator) while performing a wallet sync
* and this.setChanged() would want a lock on Controller too, causing a deadlock.
*
* We avoid this by testing for block processing status and exiting early.
*/

if (DBSet.getInstance().getBlockMap().isProcessing())
return;

this.setChanged();
this.notifyObservers(new ObserverMessage(ObserverMessage.WALLET_SYNC_STATUS, height));
}
Expand All @@ -671,9 +692,6 @@ public void onConnect(Peer peer) {
if (DBSet.getInstance().isStopped())
return;

// GET HEIGHT
int height = this.blockChain.getHeight();

if (NTP.getTime() >= Transaction.getPOWFIX_RELEASE()) {
// SEND FOUNDMYSELF MESSAGE
peer.sendMessage(MessageFactory.getInstance().createFindMyselfMessage(Controller.getInstance().getFoundMyselfID()));
Expand All @@ -683,8 +701,19 @@ public void onConnect(Peer peer) {
}

// SEND HEIGHT MESSAGE
LOGGER.trace("Sending our height " + height + " to peer " + peer.getAddress());
peer.sendMessage(MessageFactory.getInstance().createHeightMessage(height));
getSendMyHeightToPeer(peer);

// Resend any unconfirmed transactions
List<Transaction> transactions = DBSet.getInstance().getTransactionMap().getTransactions();

// Sort transactions chronologically
Collections.sort(transactions, new TransactionTimestampComparator());

// Send unconfirmed transactions
for (Transaction transaction : transactions) {
Message message = MessageFactory.getInstance().createTransactionMessage(transaction);
peer.sendMessage(message);
}

if (this.status == STATUS_NO_CONNECTIONS) {
// UPDATE STATUS
Expand Down Expand Up @@ -743,12 +772,17 @@ public void onDisconnect(Peer peer) {
// in case they attempt to access DB
if (this.isStopping)
return;

// NOTIFY
this.setChanged();
this.notifyObservers(new ObserverMessage(ObserverMessage.NETWORK_STATUS, this.status));
}
}

// NOTIFY, but in separate thread to avoid MapDB interrupt issue
new Thread() {
@Override
public void run() {
Controller.getInstance().setChanged();
Controller.getInstance().notifyObservers(new ObserverMessage(ObserverMessage.NETWORK_STATUS, Controller.getInstance().status));
}
}.start();
}

public void onError(Peer peer) {
Expand Down Expand Up @@ -830,10 +864,25 @@ public void onMessage(Message message) {

case Message.BLOCK_TYPE:

// Don't process if we're synchronizing
if (this.status == STATUS_SYNCHRONIZING)
break;

BlockMessage blockMessage = (BlockMessage) message;

// ASK BLOCK FROM BLOCKCHAIN
// Get block from message
block = blockMessage.getBlock();
LOGGER.trace("Received block from peer " + message.getSender().getAddress());

// Compare to our blockchain tip
Block blockchainTip = this.blockChain.getLastBlock();
if (blockchainTip.getHeight() == blockMessage.getHeight() && Arrays.equals(blockchainTip.getSignature(), block.getSignature())) {
// We have this block already but update our peer DB to reflect peer's height anyway
synchronized (this.peerHeight) {
this.peerHeight.put(message.getSender(), blockMessage.getHeight());
}
break;
}

boolean isNewBlockValid = this.blockChain.isNewBlockValid(block);

Expand All @@ -846,6 +895,18 @@ public void onMessage(Message message) {
if (this.isProcessingWalletSynchronize())
break;

/*
* Prevent deadlock when a new block arrives from network while we're resyncing wallet.
*
* New block arrival locks Controller and wants Synchronizer,
* but it's possible Synchronizer is locked (e.g. by BlockGenerator) while performing a wallet sync
* and this.setChanged() would want a lock on Controller too, causing a deadlock.
*
* We avoid this by testing for block processing status and exiting early.
*/
if (DBSet.getInstance().getBlockMap().isProcessing())
break;

// CHECK IF VALID
if (isNewBlockValid && this.synchronizer.process(block)) {
LOGGER.info(Lang.getInstance().translate("received new valid block") + " " + block.getHeight());
Expand All @@ -858,6 +919,9 @@ public void onMessage(Message message) {
excludes.add(message.getSender());
this.network.broadcast(message, excludes);

// Let sender know we've updated
this.getSendMyHeightToPeer(message.getSender());

// UPDATE ALL PEER HEIGHTS TO OUR HEIGHT
/*
* synchronized(this.peerHeight) { for(Peer peer: this.peerHeight.keySet()) { this.peerHeight.put(peer, this.blockChain.getHeight()); }
Expand Down Expand Up @@ -983,9 +1047,22 @@ public void update() {
peer = this.getMaxHeightPeer();

if (peer != null) {
// Make a note of pre-sync height so we can tell if anything happened
int preSyncHeight = this.blockChain.getHeight();

// SYNCHRONIZE FROM PEER
LOGGER.info("Synchronizing using peer " + peer.getAddress().getHostAddress() + " with height " + peerHeight.get(peer) + " - ping " + peer.getPing() + "ms");
this.synchronizer.synchronize(peer);

// If our height has changed, notify our peers
if (this.blockChain.getHeight() > preSyncHeight) {
Block blockchainTip = this.blockChain.getLastBlock();
LOGGER.debug("Sending our new height " + blockchainTip.getHeight() + " to peers");
Message message = MessageFactory.getInstance().createHeightMessage(blockchainTip.getHeight());

List<Peer> excludes = new ArrayList<Peer>();
this.network.broadcast(message, excludes);
}
}

Thread.sleep(5 * 1000);
Expand All @@ -999,6 +1076,8 @@ public void update() {
// DISHONEST PEER
this.network.onError(peer, e.getMessage());
}

return;
}

if (this.peerHeight.size() == 0) {
Expand Down Expand Up @@ -1354,10 +1433,17 @@ public long getNextBlockGeneratingBalance(Block parent) {
// FORGE

public void newBlockGenerated(Block newBlock) {
this.synchronizer.process(newBlock);
// Only process if we have connections and are not synchronizing
if (this.status == STATUS_OK) {
if (this.synchronizer.process(newBlock)) {
LOGGER.info("Forged new block " + newBlock.getHeight());

// BROADCAST
this.broadcastBlock(newBlock);
// BROADCAST
this.broadcastBlock(newBlock);
} else {
LOGGER.info("Couldn't forge new block");
}
}
}

public List<Transaction> getUnconfirmedTransactions() {
Expand Down Expand Up @@ -1655,6 +1741,9 @@ public Pair<Transaction, Integer> sendMessage(PrivateKeyAccount sender, Account

public Block getBlockByHeight(int parseInt) {
byte[] b = DBSet.getInstance().getHeightMap().getBlockByHeight(parseInt);
if (b == null)
return null;

return DBSet.getInstance().getBlockMap().get(b);
}

Expand Down
4 changes: 2 additions & 2 deletions Qora/src/database/DBListValueMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected L getListForAdd(K key) {
L newList = this.newListValue();

// If we previous marked entry as deleted then we're essentially recreating a new list
if (this.deleted != null && this.deleted.contains(key))
if (this.deletedContains(key))
return newList;

// If the parent map contains a list for key then we need to duplicate its entries
Expand Down Expand Up @@ -148,7 +148,7 @@ protected L getListForRemove(K key) {
return this.map.get(key);

// If we previously marked entry as deleted then we have no list
if (this.deleted != null && this.deleted.contains(key))
if (this.deletedContains(key))
return null;

// If we have no parent or parent has no entry then there's no list
Expand Down
Loading

0 comments on commit e0a3a5b

Please sign in to comment.