Skip to content
This repository has been archived by the owner on Oct 12, 2020. It is now read-only.

Commit

Permalink
Sequential unreconciliation updates
Browse files Browse the repository at this point in the history
  • Loading branch information
hodlforjesus committed Aug 13, 2019
1 parent 2a189c5 commit 816dbaf
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 89 deletions.
185 changes: 97 additions & 88 deletions cron/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,11 @@ console.dateLog = (...log) => {
* @param {Number} sequence For blockchain sequencing (last sequence of inserted block)
*/
async function syncBlocks(start, stop, sequence) {
const lastMovement = await CarverMovement.findOne().sort({ sequence: -1 }); // Finds last sequence from last block (because we removed all other movements)
let block = null;

const sequences = {
movements: lastMovement ? lastMovement.sequence : 0
}

// Instead of fetching addresses each tiem from db we'll store a certain number in cache (this is in config)
// Instead of fetching addresses each time from db we'll store a certain number in cache (this is in config)
const commonAddressCache = new Map();

let block;
for (let height = start + 1; height <= stop; height++) {
const hash = await rpc.call('getblockhash', [height]);
const rpcblock = await rpc.call('getblock', [hash]);
Expand All @@ -67,6 +62,7 @@ async function syncBlocks(start, stop, sequence) {
isConfirmed: rpcblock.confirmations > config.blockConfirmations // We can instantly confirm a block if it reached the required number of confirmations (that way we don't have to reconfirm it later)
});


// Flush cache every 10000 addresses
if (commonAddressCache.size > config.blockSyncAddressCacheLimit) {
commonAddressCache.clear();
Expand Down Expand Up @@ -117,98 +113,106 @@ async function syncBlocks(start, stop, sequence) {
parsedMovements.forEach(parsedMovement => {
sequence++;

let canFlowSameAddress = false; // If addresses are same on same sequence continue. This way we can unwind movements and handle hard errors
//let canFlowSameAddress = false; // If addresses are same on same sequence continue. This way we can unwind movements and handle hard errors
const from = updatedAddresses.has(parsedMovement.from.label) ? updatedAddresses.get(parsedMovement.from.label) : parsedMovement.from;
if (sequence > from.sequence) {
from.countOut++;
from.balance -= parsedMovement.amount;
from.valueOut += parsedMovement.amount;
from.sequence = sequence;
from.lastMovementDate = blockDate;
canFlowSameAddress = true;

updatedAddresses.set(from.label, from);
const lastFromSequence = from.sequence;

if (from.sequence >= sequence) {
throw `RECONCILIATION ERROR: Out-of-sequence from movement: ${from.sequence}>${sequence}`;
}

const to = updatedAddresses.has(parsedMovement.to.label) ? updatedAddresses.get(parsedMovement.to.label) : parsedMovement.to;
if (sequence > to.sequence || canFlowSameAddress && from === to) {
to.countIn++;
to.balance += parsedMovement.amount;
to.valueIn += parsedMovement.amount;
to.sequence = sequence;
to.lastMovementDate = blockDate;

switch (parsedMovement.carverMovementType) {
case CarverMovementType.PosRewardToTx:
to.posMovement = parsedMovement._id;
posRewardAmount = parsedMovement.amount; // Notice we're setting tx-wide pos reward
break;
case CarverMovementType.MasternodeRewardToTx:
to.mnMovement = parsedMovement._id;
break;
case CarverMovementType.TxToCoinbaseRewardAddress:
to.powCountIn++;
to.powValueIn += parsedMovement.amount;
break;
case CarverMovementType.TxToPosAddress:
// This gets set set in PosRewardToTx above (one per tx)
if (posRewardAmount) {
to.posCountIn++;
to.posValueIn += posRewardAmount;
}
break;
case CarverMovementType.TxToMnAddress:
to.mnCountIn++;
to.mnValueIn += parsedMovement.amount;
break;
}
from.countOut++;
from.balance -= parsedMovement.amount;
from.valueOut += parsedMovement.amount;
from.sequence = sequence;
from.lastMovementDate = blockDate;
canFlowSameAddress = true;

// Erase the amount after first encounter (so we only set it once)
if (parsedMovement.carverMovementType === CarverMovementType.TxToPosAddress) {
posRewardAmount = null;
}
updatedAddresses.set(from.label, from);

updatedAddresses.set(to.label, to);
const to = updatedAddresses.has(parsedMovement.to.label) ? updatedAddresses.get(parsedMovement.to.label) : parsedMovement.to;
let lastToSequence = lastFromSequence;
if (from !== to) {
lastToSequence = to.sequence;
}

if (to.sequence >= sequence && from !== to) {
throw `RECONCILIATION ERROR: Out-of-sequence to movement: ${to.sequence}>${sequence}`;
}

if (sequence > sequences.movements) {
to.countIn++;
to.balance += parsedMovement.amount;
to.valueIn += parsedMovement.amount;
to.sequence = sequence;
to.lastMovementDate = blockDate;

switch (parsedMovement.carverMovementType) {
case CarverMovementType.PosRewardToTx:
to.posMovement = parsedMovement._id;
posRewardAmount = parsedMovement.amount; // Notice we're setting tx-wide pos reward
break;
case CarverMovementType.MasternodeRewardToTx:
to.mnMovement = parsedMovement._id;
break;
case CarverMovementType.TxToCoinbaseRewardAddress:
to.powCountIn++;
to.powValueIn += parsedMovement.amount;
break;
case CarverMovementType.TxToPosAddress:
// This gets set set in PosRewardToTx above (one per tx)
if (posRewardAmount) {
to.posCountIn++;
to.posValueIn += posRewardAmount;
}
break;
case CarverMovementType.TxToMnAddress:
to.mnCountIn++;
to.mnValueIn += parsedMovement.amount;
break;
}
updatedAddresses.set(to.label, to);

const targetAddress = from.carverAddressType === CarverAddressType.Tx ? to._id : from._id;
const targetTx = to.carverAddressType === CarverAddressType.Tx ? to._id : from._id;
const targetAddress = from.carverAddressType === CarverAddressType.Tx ? to._id : from._id;
const targetTx = to.carverAddressType === CarverAddressType.Tx ? to._id : from._id;

let newCarverMovement = new CarverMovement({
_id: new mongoose.Types.ObjectId(),
let newCarverMovement = new CarverMovement({
_id: new mongoose.Types.ObjectId(),

label: parsedMovement.label,
amount: parsedMovement.amount,
label: parsedMovement.label,
amount: parsedMovement.amount,

date: blockDate,
blockHeight: rpcblock.height,
date: blockDate,
blockHeight: rpcblock.height,

from: from._id,
to: to._id,
destinationAddress: parsedMovement.destinationAddress ? parsedMovement.destinationAddress._id : null,
from: from._id,
to: to._id,
destinationAddress: parsedMovement.destinationAddress ? parsedMovement.destinationAddress._id : null,

fromBalance: from.balance + parsedMovement.amount, // (store previous value before movement happened for perfect ledger)
toBalance: to.balance - parsedMovement.amount, // (store previous value before movement happened for perfect ledger)
fromBalance: from.balance + parsedMovement.amount, // (store previous value before movement happened for perfect ledger)
toBalance: to.balance - parsedMovement.amount, // (store previous value before movement happened for perfect ledger)

carverMovementType: parsedMovement.carverMovementType,
sequence: sequence,
carverMovementType: parsedMovement.carverMovementType,
sequence,
lastFromSequence,
lastToSequence,

targetAddress,
targetTx,
posRewardAmount: parsedMovement.posRewardAmount
});
targetAddress,
targetTx,
posRewardAmount: parsedMovement.posRewardAmount
});

switch (parsedMovement.carverMovementType) {
case CarverMovementType.PosRewardToTx:
newCarverMovement.posInputAmount = parsedMovement.posInputAmount;
newCarverMovement.posInputBlockHeightDiff = parsedMovement.posInputBlockHeightDiff;
break;
}
switch (parsedMovement.carverMovementType) {
case CarverMovementType.PosRewardToTx:
newCarverMovement.posInputAmount = parsedMovement.posInputAmount;
newCarverMovement.posInputBlockHeightDiff = parsedMovement.posInputBlockHeightDiff;
break;
}

newMovements.push(newCarverMovement);
newMovements.push(newCarverMovement);

// Erase the amount after first encounter (so we only set it once)
if (parsedMovement.carverMovementType === CarverMovementType.TxToPosAddress) {
posRewardAmount = null;
}
});

Expand Down Expand Up @@ -241,15 +245,15 @@ async function syncBlocks(start, stop, sequence) {


// Uncomment to test unreconciliation (5% chance to unreconcile last 1-10 blocks)
/*

if (Math.floor((Math.random() * 100) + 1) < 5) {
var dropNumBlocks = Math.floor((Math.random() * 10) + 1);
console.log(`Dropping ${dropNumBlocks} blocks`)
await undoCarverBlockMovements(height - dropNumBlocks + 1);
height -= dropNumBlocks;
commonAddressCache.clear(); // Clear cache because the addresses could now be invalid
}
*/

}
}
/**
Expand All @@ -276,29 +280,32 @@ async function undoCarverBlockMovements(height) {

parsedMovements.forEach(parsedMovement => {
sequence = parsedMovement.sequence;

let canFlowSameAddress = false; // If addresses are same on same sequence continue. This way we can unwind movements and handle hard errors
let from = null;

// Notice that we check if .from and .to exist. It is possible to insert a new movement, update one address and the other one fails to save (ex: new address)
// We can still undo the partial movement that was performed
if (parsedMovement.from) {
from = updatedAddresses.has(parsedMovement.from.label) ? updatedAddresses.get(parsedMovement.from.label) : parsedMovement.from;
if (sequence < from.sequence) {
if (sequence === from.sequence) {
from.countOut--;
from.balance += parsedMovement.amount;
from.valueOut -= parsedMovement.amount;

canFlowSameAddress = true;

from.sequence = sequence;
from.sequence = parsedMovement.lastFromSequence;
from.lastMovementDate = parsedMovement.date;

updatedAddresses.set(from.label, from);
} else if (from.sequence > sequence) {
throw `UNRECONCILIATION ERROR: Out-of-sequence from movement: ${from.sequence}>${sequence}`;
}
}

if (parsedMovement.to) {
const to = updatedAddresses.has(parsedMovement.to.label) ? updatedAddresses.get(parsedMovement.to.label) : parsedMovement.to;
if (sequence < to.sequence || canFlowSameAddress && from === to) {
if (sequence === to.sequence || canFlowSameAddress && from === to) {
to.countIn--;
to.balance -= parsedMovement.amount;
to.valueIn -= parsedMovement.amount;
Expand All @@ -321,9 +328,11 @@ async function undoCarverBlockMovements(height) {
break;
}

to.sequence = sequence;
to.sequence = parsedMovement.lastToSequence;
to.lastMovementDate = parsedMovement.date;
updatedAddresses.set(to.label, to);
} else if (to.sequence > sequence) {
throw `UNRECONCILIATION ERROR: Out-of-sequence from movement: ${to.sequence}>${sequence}`;
}
}

Expand Down
1 change: 0 additions & 1 deletion cron/carver2d.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ async function parseRequiredMovements(params) {

blockHeight: params.rpcblock.height,
date: blockDate,
lastMovementDate: blockDate,
carverAddressType,

// for stats
Expand Down
6 changes: 6 additions & 0 deletions model/carver2d.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ const carverMovementsSchema = new mongoose.Schema({

sequence: { unique: true, required: true, type: Number },

// These two fields are required for unreconciliation. When we undo a carver movement we set the from/to address sequences back to what they were before the movement happened.
lastFromSequence: { required: true, type: Number },
lastToSequence: { required: true, type: Number },

// For POS rewards store additional info
//@todo Remove all of these (in favor of new address type TxReward). From there we can figure out all of this data
destinationAddress: { type: mongoose.Schema.Types.ObjectId, ref: 'CarverAddress' }, // POS, MN & POW Rewards will also have a destinationAddress
Expand All @@ -79,6 +83,8 @@ const carverMovementsSchema = new mongoose.Schema({
posRewardAmount: { type: Number } // Because POS TX can have multiple outputs we'll need to assign it to one of these outputs (for unreconciliation)
}, { _id: false, versionKey: false });

carverMovementsSchema.index({ blockHeight: 1, sequence: 1 }, { unique: true }); // For unreconciliation query (blockHeight: gte, order by sequence)

// When viewing specific address we'll be filtering by from/to and sorting by sequence so we'll need these two compound indexes
carverMovementsSchema.index({ targetAddress: 1, sequence: 1 }, { unique: true });
carverMovementsSchema.index({ targetTx: 1, sequence: 1 }, { unique: true });
Expand Down

0 comments on commit 816dbaf

Please sign in to comment.