diff --git a/clients/TypeScript/packages/client/src/ChainSync/ChainSyncClient.ts b/clients/TypeScript/packages/client/src/ChainSync/ChainSyncClient.ts index 99c7cce1df..072c4b8746 100644 --- a/clients/TypeScript/packages/client/src/ChainSync/ChainSyncClient.ts +++ b/clients/TypeScript/packages/client/src/ChainSync/ChainSyncClient.ts @@ -6,14 +6,12 @@ import { createClientContext } from '../Connection' import { UnknownResultError } from '../errors' -import { baseRequest } from '../Request' import { createPointFromCurrentTip, ensureSocketIsOpen } from '../util' import { findIntersect, Intersection } from './findIntersect' +import { requestNext } from './requestNext' export interface ChainSyncClient { context: InteractionContext - findIntersect: (points: Point[]) => ReturnType - initialIntersection: Intersection on: (messageHandlers: { rollBackward: (response: { point: Point, @@ -28,6 +26,7 @@ export interface ChainSyncClient { }) => void requestNext: (options?: { mirror?: Mirror }) => void shutdown: () => Promise + startSync: (points?: Point[], requestBuffer?: number) => Promise } export const createChainSyncClient = async (options?: { @@ -38,23 +37,8 @@ export const createChainSyncClient = async (options?: { const { socket } = context socket.once('error', reject) socket.once('open', async () => { - const initialIntersection = await findIntersect( - [await createPointFromCurrentTip({ socket, closeOnCompletion: false })], - { - socket, - closeOnCompletion: false - } - ) return resolve({ context, - findIntersect: (points) => { - ensureSocketIsOpen(socket) - return findIntersect(points, { - socket, - closeOnCompletion: false - }) - }, - initialIntersection, on (messageHandlers) { socket.on('message', (message: string) => { const response: Ogmios['RequestNextResponse'] = JSON.parse(message) @@ -79,17 +63,27 @@ export const createChainSyncClient = async (options?: { }, requestNext (options) { ensureSocketIsOpen(socket) - socket.send(JSON.stringify({ - ...baseRequest, - methodname: 'RequestNext', - mirror: options?.mirror - } as Ogmios['RequestNext'])) + return requestNext(socket, options) }, shutdown: () => new Promise(resolve => { ensureSocketIsOpen(socket) socket.once('close', resolve) socket.close() - }) + }), + startSync: async (points, requestBuffer) => { + const intersection = await findIntersect( + points || [await createPointFromCurrentTip({ socket, closeOnCompletion: false })], + { + socket, + closeOnCompletion: false + } + ) + ensureSocketIsOpen(socket) + for (let n = 0; n <= (requestBuffer || 100); n += 1) { + requestNext(socket) + } + return intersection + } }) }) }).catch(reject) diff --git a/clients/TypeScript/packages/client/src/ChainSync/index.ts b/clients/TypeScript/packages/client/src/ChainSync/index.ts index e0ba94a468..8e51330162 100644 --- a/clients/TypeScript/packages/client/src/ChainSync/index.ts +++ b/clients/TypeScript/packages/client/src/ChainSync/index.ts @@ -1,2 +1,3 @@ export * from './ChainSyncClient' export * from './findIntersect' +export * from './requestNext' diff --git a/clients/TypeScript/packages/client/src/ChainSync/requestNext.ts b/clients/TypeScript/packages/client/src/ChainSync/requestNext.ts new file mode 100644 index 0000000000..5ff65a27c2 --- /dev/null +++ b/clients/TypeScript/packages/client/src/ChainSync/requestNext.ts @@ -0,0 +1,15 @@ +import { Ogmios } from '@cardano-ogmios/schema' +import { baseRequest } from '../Request' +import { Mirror } from '../Connection' +import WebSocket from 'isomorphic-ws' + +export const requestNext = ( + socket: WebSocket, + options?: { mirror?: Mirror } +): void => { + socket.send(JSON.stringify({ + ...baseRequest, + methodname: 'RequestNext', + mirror: options?.mirror + } as Ogmios['RequestNext'])) +} diff --git a/clients/TypeScript/packages/client/test/ChainSync.test.ts b/clients/TypeScript/packages/client/test/ChainSync.test.ts index 4ff733dbee..af81f970ca 100644 --- a/clients/TypeScript/packages/client/test/ChainSync.test.ts +++ b/clients/TypeScript/packages/client/test/ChainSync.test.ts @@ -18,69 +18,103 @@ describe('ChainSync', () => { expect(client.context.socket.readyState).toBe(client.context.socket.OPEN) await client.shutdown() }) - it('selects the tip as the intersection if no point provided', async () => { - const client = await createChainSyncClient({ connection }) - if (client.initialIntersection.point === 'origin' || client.initialIntersection.tip === 'origin') { + + describe('startSync', () => { + it('selects the tip as the intersection if no point provided', async () => { + const client = await createChainSyncClient({ connection }) + const intersection = await client.startSync() + if (intersection.point === 'origin' || intersection.tip === 'origin') { + await client.shutdown() + throw new Error('Test network is not syncing') + } else if ('slot' in intersection.point && 'slot' in intersection.tip) { + expect(intersection.point.slot).toEqual(intersection.tip.slot) + expect(intersection.point.hash).toEqual(intersection.tip.hash) + } await client.shutdown() - throw new Error('Test network is not syncing') - } else if ('slot' in client.initialIntersection.point && 'slot' in client.initialIntersection.tip) { - expect(client.initialIntersection.point.slot).toEqual(client.initialIntersection.tip.slot) - expect(client.initialIntersection.point.hash).toEqual(client.initialIntersection.tip.hash) - } - await client.shutdown() - }) - it('intersects at the genesis if origin provided as point', async () => { - const client = await createChainSyncClient({ connection }) - const intersection = await client.findIntersect(['origin']) - expect(intersection.point).toEqual('origin') - expect(intersection.tip).toBeDefined() - await client.shutdown() + }) + + it('intersects at the genesis if origin provided as point', async () => { + const client = await createChainSyncClient({ connection }) + const intersection = await client.startSync(['origin']) + expect(intersection.point).toEqual('origin') + expect(intersection.tip).toBeDefined() + await client.shutdown() + }) + + it('accepts message handlers for roll back and roll forward messages', async () => { + const rollbackPoints: Point[] = [] + const blocks: Block[] = [] + const client = await createChainSyncClient({ connection }) + client.on({ + rollBackward: ({ point }) => { + rollbackPoints.push(point) + client.requestNext() + }, + rollForward: async ({ block }) => { + if (blocks.length < 10) { + blocks.push(block) + client.requestNext() + } + } + }) + await client.startSync(['origin'], 10) + await delay(1000) + await client.shutdown() + let firstBlockHash: Hash16 + if ('byron' in blocks[0]) { + const block = blocks[0] as { byron: BlockByron } + firstBlockHash = block.byron.hash + } else if ('shelley' in blocks[0]) { + const block = blocks[0] as { shelley: BlockShelley } + firstBlockHash = block.shelley.body[0].id + } else if ('allegra' in blocks[0]) { + const block = blocks[0] as { allegra: BlockAllegra } + firstBlockHash = block.allegra.body[0].id + } else if ('mary' in blocks[0]) { + const block = blocks[0] as { mary: BlockMary } + firstBlockHash = block.mary.body[0].id + } + expect(firstBlockHash).toBeDefined() + expect(rollbackPoints.length).toBe(1) + expect(blocks.length).toBe(10) + }) + + it('implements pipelining to increase sync performance', async () => { + type BlocksPerSecond = number + const run = async (requestBuffer?: number): Promise => { + const blocks: Block[] = [] + const client = await createChainSyncClient({ connection }) + const start = Date.now() + let stop: number + client.on({ + rollBackward: () => { + client.requestNext() + }, + rollForward: async ({ block }) => { + if (blocks.length < 1000) { + blocks.push(block) + client.requestNext() + } else if (stop === undefined) { + stop = Date.now() - start + } + } + }) + await client.startSync(['origin'], requestBuffer) + await delay(800) + await client.shutdown() + expect(blocks.length).toBe(1000) + return 1000 * blocks.length / stop + } + const pipelinedBlocksPerSecond = await run() + const nonPipelinedBlocksPerSecond = await run(1) + expect(pipelinedBlocksPerSecond).toBeGreaterThan(nonPipelinedBlocksPerSecond) + }) }) + it('rejects method calls after shutdown', async () => { const client = await createChainSyncClient({ connection }) await client.shutdown() - const run = () => client.findIntersect(['origin']) + const run = () => client.startSync(['origin']) await expect(run).rejects }) - it('accepts message handlers for roll back and roll forward messages', async () => { - const rollbackPoints: Point[] = [] - let count = 1 - const blocks: Block[] = [] - const client = await createChainSyncClient({ connection }) - await client.findIntersect(['origin']) - client.on({ - rollBackward: ({ point, reflection }) => { - rollbackPoints.push(point) - client.requestNext({ mirror: reflection }) - }, - rollForward: async ({ block, reflection }) => { - blocks.push(block) - count = reflection.count as number - if (reflection.count < 10) { - client.requestNext({ mirror: { count: reflection.count as number + 1 } }) - } else { - await client.shutdown() - } - } - }) - client.requestNext({ mirror: { count } }) - await delay(100) - let firstBlockHash: Hash16 - if ('byron' in blocks[0]) { - const block = blocks[0] as { byron: BlockByron } - firstBlockHash = block.byron.hash - } else if ('shelley' in blocks[0]) { - const block = blocks[0] as { shelley: BlockShelley } - firstBlockHash = block.shelley.body[0].id - } else if ('allegra' in blocks[0]) { - const block = blocks[0] as { allegra: BlockAllegra } - firstBlockHash = block.allegra.body[0].id - } else if ('mary' in blocks[0]) { - const block = blocks[0] as { mary: BlockMary } - firstBlockHash = block.mary.body[0].id - } - expect(firstBlockHash).toBeDefined() - expect(rollbackPoints.length).toBe(1) - expect(count).toBe(10) - }) })