Skip to content

Commit

Permalink
feature: startSync with performance increase
Browse files Browse the repository at this point in the history
Closes #30

Adds a function to handle the sync setup, including request pipelining
to optimize performance. The default is 100 requests, but it's a configurable
argument. The `ChainSyncClient` no longer exposes `findIntersect`, and the awkward
'initial intersection' logic removed.
  • Loading branch information
rhyslbw committed May 5, 2021
1 parent 5d27524 commit fcaba7e
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 82 deletions.
42 changes: 18 additions & 24 deletions clients/TypeScript/packages/client/src/ChainSync/ChainSyncClient.ts
Expand Up @@ -6,14 +6,12 @@ import {
createClientContext
} from '../Connection'
import { UnknownResultError } from '../errors'
import { baseRequest } from '../Request'
import { createPointFromCurrentTip, ensureSocketIsOpen } from '../util'
import { findIntersect, Intersection } from './findIntersect'
import { requestNext } from './requestNext'

export interface ChainSyncClient {
context: InteractionContext
findIntersect: (points: Point[]) => ReturnType<typeof findIntersect>
initialIntersection: Intersection
on: (messageHandlers: {
rollBackward: (response: {
point: Point,
Expand All @@ -28,6 +26,7 @@ export interface ChainSyncClient {
}) => void
requestNext: (options?: { mirror?: Mirror }) => void
shutdown: () => Promise<void>
startSync: (points?: Point[], requestBuffer?: number) => Promise<Intersection>
}

export const createChainSyncClient = async (options?: {
Expand All @@ -38,23 +37,8 @@ export const createChainSyncClient = async (options?: {
const { socket } = context
socket.once('error', reject)
socket.once('open', async () => {
const initialIntersection = await findIntersect(
[await createPointFromCurrentTip({ socket, closeOnCompletion: false })],
{
socket,
closeOnCompletion: false
}
)
return resolve({
context,
findIntersect: (points) => {
ensureSocketIsOpen(socket)
return findIntersect(points, {
socket,
closeOnCompletion: false
})
},
initialIntersection,
on (messageHandlers) {
socket.on('message', (message: string) => {
const response: Ogmios['RequestNextResponse'] = JSON.parse(message)
Expand All @@ -79,17 +63,27 @@ export const createChainSyncClient = async (options?: {
},
requestNext (options) {
ensureSocketIsOpen(socket)
socket.send(JSON.stringify({
...baseRequest,
methodname: 'RequestNext',
mirror: options?.mirror
} as Ogmios['RequestNext']))
return requestNext(socket, options)
},
shutdown: () => new Promise(resolve => {
ensureSocketIsOpen(socket)
socket.once('close', resolve)
socket.close()
})
}),
startSync: async (points, requestBuffer) => {
const intersection = await findIntersect(
points || [await createPointFromCurrentTip({ socket, closeOnCompletion: false })],
{
socket,
closeOnCompletion: false
}
)
ensureSocketIsOpen(socket)
for (let n = 0; n <= (requestBuffer || 100); n += 1) {
requestNext(socket)
}
return intersection
}
})
})
}).catch(reject)
Expand Down
1 change: 1 addition & 0 deletions clients/TypeScript/packages/client/src/ChainSync/index.ts
@@ -1,2 +1,3 @@
export * from './ChainSyncClient'
export * from './findIntersect'
export * from './requestNext'
15 changes: 15 additions & 0 deletions clients/TypeScript/packages/client/src/ChainSync/requestNext.ts
@@ -0,0 +1,15 @@
import { Ogmios } from '@cardano-ogmios/schema'
import { baseRequest } from '../Request'
import { Mirror } from '../Connection'
import WebSocket from 'isomorphic-ws'

export const requestNext = (
socket: WebSocket,
options?: { mirror?: Mirror }
): void => {
socket.send(JSON.stringify({
...baseRequest,
methodname: 'RequestNext',
mirror: options?.mirror
} as Ogmios['RequestNext']))
}
150 changes: 92 additions & 58 deletions clients/TypeScript/packages/client/test/ChainSync.test.ts
Expand Up @@ -18,69 +18,103 @@ describe('ChainSync', () => {
expect(client.context.socket.readyState).toBe(client.context.socket.OPEN)
await client.shutdown()
})
it('selects the tip as the intersection if no point provided', async () => {
const client = await createChainSyncClient({ connection })
if (client.initialIntersection.point === 'origin' || client.initialIntersection.tip === 'origin') {

describe('startSync', () => {
it('selects the tip as the intersection if no point provided', async () => {
const client = await createChainSyncClient({ connection })
const intersection = await client.startSync()
if (intersection.point === 'origin' || intersection.tip === 'origin') {
await client.shutdown()
throw new Error('Test network is not syncing')
} else if ('slot' in intersection.point && 'slot' in intersection.tip) {
expect(intersection.point.slot).toEqual(intersection.tip.slot)
expect(intersection.point.hash).toEqual(intersection.tip.hash)
}
await client.shutdown()
throw new Error('Test network is not syncing')
} else if ('slot' in client.initialIntersection.point && 'slot' in client.initialIntersection.tip) {
expect(client.initialIntersection.point.slot).toEqual(client.initialIntersection.tip.slot)
expect(client.initialIntersection.point.hash).toEqual(client.initialIntersection.tip.hash)
}
await client.shutdown()
})
it('intersects at the genesis if origin provided as point', async () => {
const client = await createChainSyncClient({ connection })
const intersection = await client.findIntersect(['origin'])
expect(intersection.point).toEqual('origin')
expect(intersection.tip).toBeDefined()
await client.shutdown()
})

it('intersects at the genesis if origin provided as point', async () => {
const client = await createChainSyncClient({ connection })
const intersection = await client.startSync(['origin'])
expect(intersection.point).toEqual('origin')
expect(intersection.tip).toBeDefined()
await client.shutdown()
})

it('accepts message handlers for roll back and roll forward messages', async () => {
const rollbackPoints: Point[] = []
const blocks: Block[] = []
const client = await createChainSyncClient({ connection })
client.on({
rollBackward: ({ point }) => {
rollbackPoints.push(point)
client.requestNext()
},
rollForward: async ({ block }) => {
if (blocks.length < 10) {
blocks.push(block)
client.requestNext()
}
}
})
await client.startSync(['origin'], 10)
await delay(1000)
await client.shutdown()
let firstBlockHash: Hash16
if ('byron' in blocks[0]) {
const block = blocks[0] as { byron: BlockByron }
firstBlockHash = block.byron.hash
} else if ('shelley' in blocks[0]) {
const block = blocks[0] as { shelley: BlockShelley }
firstBlockHash = block.shelley.body[0].id
} else if ('allegra' in blocks[0]) {
const block = blocks[0] as { allegra: BlockAllegra }
firstBlockHash = block.allegra.body[0].id
} else if ('mary' in blocks[0]) {
const block = blocks[0] as { mary: BlockMary }
firstBlockHash = block.mary.body[0].id
}
expect(firstBlockHash).toBeDefined()
expect(rollbackPoints.length).toBe(1)
expect(blocks.length).toBe(10)
})

it('implements pipelining to increase sync performance', async () => {
type BlocksPerSecond = number
const run = async (requestBuffer?: number): Promise<BlocksPerSecond> => {
const blocks: Block[] = []
const client = await createChainSyncClient({ connection })
const start = Date.now()
let stop: number
client.on({
rollBackward: () => {
client.requestNext()
},
rollForward: async ({ block }) => {
if (blocks.length < 1000) {
blocks.push(block)
client.requestNext()
} else if (stop === undefined) {
stop = Date.now() - start
}
}
})
await client.startSync(['origin'], requestBuffer)
await delay(800)
await client.shutdown()
expect(blocks.length).toBe(1000)
return 1000 * blocks.length / stop
}
const pipelinedBlocksPerSecond = await run()
const nonPipelinedBlocksPerSecond = await run(1)
expect(pipelinedBlocksPerSecond).toBeGreaterThan(nonPipelinedBlocksPerSecond)
})
})

it('rejects method calls after shutdown', async () => {
const client = await createChainSyncClient({ connection })
await client.shutdown()
const run = () => client.findIntersect(['origin'])
const run = () => client.startSync(['origin'])
await expect(run).rejects
})
it('accepts message handlers for roll back and roll forward messages', async () => {
const rollbackPoints: Point[] = []
let count = 1
const blocks: Block[] = []
const client = await createChainSyncClient({ connection })
await client.findIntersect(['origin'])
client.on({
rollBackward: ({ point, reflection }) => {
rollbackPoints.push(point)
client.requestNext({ mirror: reflection })
},
rollForward: async ({ block, reflection }) => {
blocks.push(block)
count = reflection.count as number
if (reflection.count < 10) {
client.requestNext({ mirror: { count: reflection.count as number + 1 } })
} else {
await client.shutdown()
}
}
})
client.requestNext({ mirror: { count } })
await delay(100)
let firstBlockHash: Hash16
if ('byron' in blocks[0]) {
const block = blocks[0] as { byron: BlockByron }
firstBlockHash = block.byron.hash
} else if ('shelley' in blocks[0]) {
const block = blocks[0] as { shelley: BlockShelley }
firstBlockHash = block.shelley.body[0].id
} else if ('allegra' in blocks[0]) {
const block = blocks[0] as { allegra: BlockAllegra }
firstBlockHash = block.allegra.body[0].id
} else if ('mary' in blocks[0]) {
const block = blocks[0] as { mary: BlockMary }
firstBlockHash = block.mary.body[0].id
}
expect(firstBlockHash).toBeDefined()
expect(rollbackPoints.length).toBe(1)
expect(count).toBe(10)
})
})

0 comments on commit fcaba7e

Please sign in to comment.