Skip to content

Commit

Permalink
feat: add fallback for pubrawblock ZMQ filter
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 committed Feb 25, 2019
1 parent 5fa620a commit dd30930
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 21 deletions.
2 changes: 2 additions & 0 deletions docker/regtest/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ COPY regtest/data/lnd/macaroons /root/.lnd-ltc2/data/chain/litecoin/regtest/
# Configure ZMQ for Bitcoin and Litecoin Core
RUN echo "zmqpubrawtx=tcp://0.0.0.0:29000" >> /root/.bitcoin/bitcoin.conf
RUN echo "zmqpubrawblock=tcp://0.0.0.0:29001" >> /root/.bitcoin/bitcoin.conf
RUN echo "zmqpubhashblock=tcp://0.0.0.0:29002" >> /root/.bitcoin/bitcoin.conf

RUN echo "zmqpubrawtx=tcp://0.0.0.0:30000" >> /root/.litecoin/litecoin.conf
RUN echo "zmqpubrawblock=tcp://0.0.0.0:30001" >> /root/.litecoin/litecoin.conf
RUN echo "zmqpubhashblock=tcp://0.0.0.0:30002" >> /root/.litecoin/litecoin.conf

# Copy start scripts
COPY regtest/scripts /bin/
Expand Down
130 changes: 110 additions & 20 deletions lib/chain/ZmqClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import zmq from 'zeromq';
import AsyncLock from 'async-lock';
import { EventEmitter } from 'events';
import { Transaction, crypto } from 'bitcoinjs-lib';
import Logger from '../Logger';
Expand All @@ -12,6 +13,7 @@ type ZmqNotification = {
const filters = {
rawTx: 'pubrawtx',
rawBlock: 'pubrawblock',
hashBlock: 'pubhashblock',
};

interface ZmqClient {
Expand All @@ -30,22 +32,22 @@ class ZmqClient extends EventEmitter {
public utxos = new Set<string>();
public relevantOutputs = new Set<string>();

private rawtx = false;
private rawBlock = false;

private blockHeight = 0;
private bestBlockHash = '';

private hashBlockAddress?: string;

constructor(
private symbol: string,
private logger: Logger,
private getBlock: (hash: string) => Promise<{ tx: string[], previousblockhash: string }>,
private getBlock: (hash: string) => Promise<{ hash: string, height: number, tx: string[], previousblockhash: string }>,
private getBlockChainInfo: () => Promise<{ blocks: number, bestblockhash: string }>,
private getRawTransaction: (hash: string, verbose?: boolean, blockhash?: string) => Promise<string | any>) {
super();
}

public init = async (notifications: ZmqNotification[]) => {
const activeFilters: any = {};
const { blocks, bestblockhash } = await this.getBlockChainInfo();

this.blockHeight = blocks;
Expand All @@ -54,29 +56,45 @@ class ZmqClient extends EventEmitter {
for (const notification of notifications) {
switch (notification.type) {
case filters.rawTx:
this.rawtx = true;
activeFilters.rawtx = true;
this.initRawTransaction(notification.address);
break;

case filters.rawBlock:
this.rawBlock = true;
activeFilters.rawBlock = true;
this.initRawBlock(notification.address);
break;

case filters.hashBlock:
activeFilters.hashBlock = true;
this.hashBlockAddress = notification.address;
break;
}
}

if (!this.rawtx) {
throw this.getMissingStreamMessage(filters.rawTx);
if (!activeFilters.rawtx) {
throw `${filters.rawTx} ZMQ notifications are not enabled`;
}

if (!this.rawBlock) {
this.logger.warn(`Could not subscribe to ${this.symbol} chain ${filters.rawBlock}: ${this.getMissingStreamMessage(filters.rawBlock)}`);
const logCouldNotSubscribe = (filter: string) => {
this.logger.warn(`Could not find ${this.symbol} chain ZMQ filter: ${filter}`);
};

if (!activeFilters.rawBlock) {
logCouldNotSubscribe(filters.rawBlock);

if (!activeFilters.hashBlock) {
logCouldNotSubscribe(filters.hashBlock);
} else {
this.logger.warn(`Falling back to ${this.symbol} ${filters.hashBlock} ZMQ filter`);
this.initHashBlock();
}
}
}

public rescanChain = async (startHeight: number) => {
// Also rescan the block that got already added to the database to
// make sure that no transaction are missed
// make sure that no transactions are missed
const bestBlock = this.blockHeight;

let previousBlockHash = this.bestBlockHash;
Expand Down Expand Up @@ -136,6 +154,14 @@ class ZmqClient extends EventEmitter {
private initRawBlock = (address: string) => {
const socket = this.createSocket(address, 'rawblock');

socket.monitor();
socket.on('disconnect', () => {
socket.disconnect(address);

this.logger.warn(`${this.symbol} ${filters.rawBlock} ZMQ filter disconnected. Falling back to ${filters.hashBlock}`);
this.initHashBlock();
});

socket.on('message', async (_, rawBlock: Buffer) => {
const previousBlockHash = getHexString(
reverseBuffer(
Expand All @@ -155,19 +181,77 @@ class ZmqClient extends EventEmitter {
),
);

if (this.bestBlockHash === previousBlockHash) {
if (previousBlockHash === this.bestBlockHash) {
this.blockHeight += 1;
this.bestBlockHash = hash;

this.logger.silly(`New ${this.symbol} chain tip #${this.blockHeight}: ${hash}`);

this.emit('block', this.blockHeight);
this.newChainTip();
} else {
this.logger.debug(`Found ${this.symbol} orphan block: ${hash}`);
this.logOrphanBlock(hash);
}
});
}

private initHashBlock = () => {
if (this.hashBlockAddress) {
const socket = this.createSocket(this.hashBlockAddress, 'hashblock');

// Because the event handler is doing work asynchronously one has
// to use a lock to ensure the events get handled sequentially
const lock = new AsyncLock();
const lockKey = filters.hashBlock;

const handleBlock = async (blockHash: string) => {
const block = await this.getBlock(blockHash);

if (block.previousblockhash === this.bestBlockHash) {
this.blockHeight = block.height;
this.bestBlockHash = block.hash;

this.newChainTip();
} else {
if (block.height > this.blockHeight) {
const oldHeight = this.blockHeight;

this.blockHeight = block.height;
this.bestBlockHash = block.hash;

for (let i = 1; oldHeight + i <= this.blockHeight; i += 1) {
this.emit('block', oldHeight + i);
}

await this.rescanChain(oldHeight);
} else {
this.logOrphanBlock(block.hash);
}
}
};

socket.on('message', (_, blockHash: Buffer) => {
const blockHashString = getHexString(blockHash);

lock.acquire(lockKey, async () => {
try {
await handleBlock(blockHashString);
} catch (error) {
if (error.message === 'Block not found on disk') {
// If there are many blocks added to the chain at once Bitcoin Core might
// take a few milliseconds to write all of them to the disk. Therefore
// it just retries getting the block after a little delay
setTimeout(async () => {
await handleBlock(blockHashString);
}, 250);
} else {
this.logger.error(`${this.symbol} ${filters.hashBlock} ZMQ filter threw: ${JSON.stringify(error, undefined, 2)}`);
}
}
}, () => {});
});
} else {
this.logger.error(`Could not fall back to ${this.symbol} ${filters.hashBlock} ZMQ filter because it is not enabled`);
}
}

private isRelevantTransaction = (transaction: Transaction) => {
for (const output of transaction.outs) {
if (this.relevantOutputs.has(getHexString(output.script))) {
Expand All @@ -178,6 +262,16 @@ class ZmqClient extends EventEmitter {
return false;
}

private newChainTip = () => {
this.logger.silly(`New ${this.symbol} chain tip #${this.blockHeight}: ${this.bestBlockHash}`);

this.emit('block', this.blockHeight);
}

private logOrphanBlock = (hash: string) => {
this.logger.verbose(`Found ${this.symbol} orphan block: ${hash}`);
}

private createSocket = (address: string, filter: string) => {
const socket = zmq.socket('sub');
socket.connect(address);
Expand All @@ -187,10 +281,6 @@ class ZmqClient extends EventEmitter {

return socket;
}

private getMissingStreamMessage = (filter: string) => {
return `${filter} ZMQ notifications are not enabled`;
}
}

export default ZmqClient;
Expand Down
11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"nodemon:watch": "nodemon --watch dist -e js bin/boltzd",
"lint": "tslint --project tsconfig.json && tslint --config tslint-alt.json 'bin/*' 'test/**/*.ts'",
"lint:fix": "tslint --fix --project tsconfig.json && tslint --fix --config tslint-alt.json 'bin/*' 'test/**/*.ts'",
"docker:start": "docker run -d --name regtest -p 18443:18443 -p 19443:19443 -p 29000:29000 -p 29001:29001 -p 30000:30000 -p 30001:30001 -p 10009:10009 -p 10010:10010 -p 11009:11009 -p 11010:11010 -p 8081:8081 boltz/regtest",
"docker:start": "docker run -d --name regtest -p 18443:18443 -p 19443:19443 -p 29000:29000 -p 29001:29001 -p 29002:29002 -p 30000:30000 -p 30001:30001 -p 30002:30002 -p 10009:10009 -p 10010:10010 -p 11009:11009 -p 11010:11010 -p 8081:8081 boltz/regtest",
"docker:stop": "docker kill regtest && docker rm regtest",
"test": "npm run test:unit && npm run docker:start && npm run test:int && npm run docker:stop",
"test:unit": "mocha test/unit/*.spec.ts test/unit/wallet/*.spec.ts",
Expand Down Expand Up @@ -57,6 +57,7 @@
"LICENSE"
],
"dependencies": {
"async-lock": "^1.1.4",
"bip32": "^1.0.2",
"bip39": "^2.5.0",
"bitcoinjs-lib": "^4.0.3",
Expand All @@ -78,6 +79,7 @@
"zeromq": "^5.1.0"
},
"devDependencies": {
"@types/async-lock": "^1.1.1",
"@types/bip32": "^1.0.1",
"@types/bip39": "^2.4.2",
"@types/bitcoinjs-lib": "^4.0.0",
Expand Down

0 comments on commit dd30930

Please sign in to comment.