Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,88 @@ const dashCoreRpcClient = require('../../../../../lib/externalApis/dashcore/rpc'

const subscribeToNewBlockHeaders = require('../../../../../lib/grpcServer/handlers/blockheaders-stream/subscribeToNewBlockHeaders');
const ChainDataProvider = require('../../../../../lib/chainDataProvider/ChainDataProvider');
const blockHeadersCache = require('../../../../../lib/chainDataProvider/BlockHeadersCache');
const BlockHeadersCache = require('../../../../../lib/chainDataProvider/BlockHeadersCache');
const { NEW_BLOCK_HEADERS_PROPAGATE_INTERVAL } = require('../../../../../lib/grpcServer/handlers/blockheaders-stream/constants');
const ProcessMediator = require('../../../../../lib/grpcServer/handlers/blockheaders-stream/ProcessMediator');
const wait = require('../../../../../lib/utils/wait');

describe('subscribeToNewBlockHeaders', async () => {
describe('subscribeToNewBlockHeaders', () => {
let mediator;
let zmqClient;
let chainDataProvider;
let blockHeadersCache;
let block;

const blockHeaders = {};
const chainLocks = {};

this.sinon.stub(dashCoreRpcClient, 'getBlockHeader')
.callsFake(async (hash) => blockHeaders[hash].toBuffer().toString('hex'));
beforeEach(async function beforeEach() {
mediator = new ProcessMediator();

const mockCoreAPI = this.sinon.stub();
const mockZmqClient = this.sinon.stub();
this.sinon.stub(dashCoreRpcClient, 'getBlockHeader')
.callsFake(async (hash) => blockHeaders[hash].toBuffer().toString('hex'));

const chainDataProvider = new ChainDataProvider(mockCoreAPI, mockZmqClient);
await chainDataProvider.init();
this.sinon.stub(dashCoreRpcClient, 'getBestChainLock').resolves({
height: 1,
signature: Buffer.from('fakeSig'),
blockHash: Buffer.from('fakeHash'),
});

beforeEach(async () => {
mediator = new ProcessMediator();
zmqClient = new ZmqClient();

blockHeadersCache = new BlockHeadersCache();

chainDataProvider = new ChainDataProvider(dashCoreRpcClient, zmqClient, blockHeadersCache);
await chainDataProvider.init();

dashCoreRpcClient.getBlockHeader.resetHistory();
blockHeadersCache.purge();

zmqClient = new ZmqClient();
this.sinon.stub(zmqClient.subscriberSocket, 'connect')
.callsFake(() => {
zmqClient.subscriberSocket.emit('connect');
});

await zmqClient.start();

block = new Block({
header: {
hash: '000000c546f0fdf0e20432a309e64ed75f05a6fdbb503bee46c813af6d4ef46d',
version: 536870912,
prevHash: '00000063859f5d58228bedbba96485e18c6aee5a55f72cb6eccbb00ffcb00afd',
merkleRoot: '22b9b6dde8516991186f77687e49b4c09eb96e5d2adebdf1e0ae9a01251c13ee',
time: 1608793053,
bits: 503445090,
nonce: 35025,
},
transactions: [
{
hash: '22b9b6dde8516991186f77687e49b4c09eb96e5d2adebdf1e0ae9a01251c13ee',
version: 3,
inputs: [
{
prevTxId: '0000000000000000000000000000000000000000000000000000000000000000',
outputIndex: 4294967295,
sequenceNumber: 4294967295,
script: '024c0f010b',
},
],
outputs: [
{
satoshis: 20000000000,
script: '76a91416b93a3b9168a20605cc3cda62f6135a3baa531a88ac',
},
{
satoshis: 30000000000,
script: '76a91416b93a3b9168a20605cc3cda62f6135a3baa531a88ac',
},
],
nLockTime: 0,
type: 5,
extraPayload: '02004c0f00003d8e273bf286d48ccba5a87b5adf332ed070a15e4e2d81eeb9ff685373be5656961e0b73ea855fdac9cc530782a7f0a22d25d1eaab4b2068efa647e9da0915d0',
},
],
});

const blockHeaderOne = new BlockHeader({
version: 536870913,
prevHash: '0000000000000000000000000000000000000000000000000000000000000000',
Expand Down Expand Up @@ -131,8 +178,16 @@ describe('subscribeToNewBlockHeaders', async () => {
zmqClient.subscriberSocket.emit('message', zmqClient.topics.hashblock, Buffer.from(hashes[2], 'hex'));

const locksHeights = Object.keys(chainLocks);
zmqClient.subscriberSocket.emit('message', zmqClient.topics.rawchainlock, chainLocks[locksHeights[0]].toBuffer());
zmqClient.subscriberSocket.emit('message', zmqClient.topics.rawchainlock, chainLocks[locksHeights[1]].toBuffer());
zmqClient.subscriberSocket.emit(
'message',
zmqClient.topics.rawchainlocksig,
Buffer.concat([block.toBuffer(), chainLocks[locksHeights[0]].toBuffer()]),
);
zmqClient.subscriberSocket.emit(
'message',
zmqClient.topics.rawchainlocksig,
Buffer.concat([block.toBuffer(), chainLocks[locksHeights[1]].toBuffer()]),
);

mediator.emit(ProcessMediator.EVENTS.HISTORICAL_DATA_SENT);

Expand Down Expand Up @@ -187,10 +242,23 @@ describe('subscribeToNewBlockHeaders', async () => {
);

const locksHeights = Object.keys(chainLocks);
zmqClient.subscriberSocket.emit('message', zmqClient.topics.rawchainlock, chainLocks[locksHeights[0]].toBuffer());
zmqClient.subscriberSocket.emit(
'message',
zmqClient.topics.rawchainlocksig,
Buffer.concat([block.toBuffer(), chainLocks[locksHeights[0]].toBuffer()]),
);
mediator.emit(ProcessMediator.EVENTS.HISTORICAL_DATA_SENT);
zmqClient.subscriberSocket.emit('message', zmqClient.topics.rawchainlock, chainLocks[locksHeights[1]].toBuffer());
zmqClient.subscriberSocket.emit('message', zmqClient.topics.rawchainlock, chainLocks[locksHeights[2]].toBuffer());
zmqClient.subscriberSocket.emit(
'message',
zmqClient.topics.rawchainlocksig,
Buffer.concat([block.toBuffer(),
chainLocks[locksHeights[1]].toBuffer()]),
);
zmqClient.subscriberSocket.emit(
'message',
zmqClient.topics.rawchainlocksig,
Buffer.concat([block.toBuffer(), chainLocks[locksHeights[2]].toBuffer()]),
);
await wait(NEW_BLOCK_HEADERS_PROPAGATE_INTERVAL + 100);
mediator.emit(ProcessMediator.EVENTS.CLIENT_DISCONNECTED);
const expectedChainLocks = { ...chainLocks };
Expand All @@ -199,7 +267,6 @@ describe('subscribeToNewBlockHeaders', async () => {
});

it('should use cache when historical data is sent', async () => {
const spyCache = this.sinon.spy(blockHeadersCache);
const receivedHeaders = {};

mediator.on(ProcessMediator.EVENTS.BLOCK_HEADERS, (headers) => {
Expand All @@ -219,8 +286,16 @@ describe('subscribeToNewBlockHeaders', async () => {
zmqClient.subscriberSocket.emit('message', zmqClient.topics.hashblock, Buffer.from(hashes[2], 'hex'));

const locksHeights = Object.keys(chainLocks);
zmqClient.subscriberSocket.emit('message', zmqClient.topics.rawchainlock, chainLocks[locksHeights[0]].toBuffer());
zmqClient.subscriberSocket.emit('message', zmqClient.topics.rawchainlock, chainLocks[locksHeights[1]].toBuffer());
zmqClient.subscriberSocket.emit(
'message',
zmqClient.topics.rawchainlocksig,
Buffer.concat([block.toBuffer(), chainLocks[locksHeights[0]].toBuffer()]),
);
zmqClient.subscriberSocket.emit(
'message',
zmqClient.topics.rawchainlocksig,
Buffer.concat([block.toBuffer(), chainLocks[locksHeights[1]].toBuffer()]),
);

mediator.emit(ProcessMediator.EVENTS.HISTORICAL_DATA_SENT);

Expand All @@ -239,13 +314,19 @@ describe('subscribeToNewBlockHeaders', async () => {
zmqClient.subscriberSocket.emit('message', zmqClient.topics.hashblock, Buffer.from(hashes[1], 'hex'));
zmqClient.subscriberSocket.emit('message', zmqClient.topics.hashblock, Buffer.from(hashes[2], 'hex'));

zmqClient.subscriberSocket.emit('message', zmqClient.topics.rawchainlock, chainLocks[locksHeights[0]].toBuffer());
zmqClient.subscriberSocket.emit('message', zmqClient.topics.rawchainlock, chainLocks[locksHeights[1]].toBuffer());
zmqClient.subscriberSocket.emit(
'message',
zmqClient.topics.rawchainlocksig,
Buffer.concat([block.toBuffer(), chainLocks[locksHeights[0]].toBuffer()]),
);
zmqClient.subscriberSocket.emit(
'message',
zmqClient.topics.rawchainlocksig,
Buffer.concat([block.toBuffer(), chainLocks[locksHeights[1]].toBuffer()]),
);

mediator.emit(ProcessMediator.EVENTS.HISTORICAL_DATA_SENT);

expect(dashCoreRpcClient.getBlockHeader.callCount).to.be.equal(0);
expect(spyCache.set.callCount).to.be.equal(3);
expect(spyCache.get.callCount).to.be.equal(6);
});
});