diff --git a/package-lock.json b/package-lock.json index 047acdd..45fae56 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,10 +11,12 @@ "dependencies": { "@types/lodash": "^4.14.195", "ably": "^1.2.41", + "js-base64": "^3.7.5", "lodash": "^4.17.21", "pino": "^8.14.1", "rxjs": "^7.8.1", - "typedoc-plugin-missing-exports": "^2.1.0" + "typedoc-plugin-missing-exports": "^2.1.0", + "yjs": "^13.6.8" }, "devDependencies": { "@typescript-eslint/eslint-plugin": "^6.1.0", @@ -3180,6 +3182,15 @@ "integrity": "sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw==", "dev": true }, + "node_modules/isomorphic.js": { + "version": "0.2.5", + "resolved": "https://registry.npmjs.org/isomorphic.js/-/isomorphic.js-0.2.5.tgz", + "integrity": "sha512-PIeMbHqMt4DnUP3MA/Flc0HElYjMXArsw1qwJZcm9sqR8mq3l8NYizFMty0pWwE/tzIGH3EKK5+jes5mAr85yw==", + "funding": { + "type": "GitHub Sponsors ❤", + "url": "https://github.com/sponsors/dmonad" + } + }, "node_modules/istanbul-lib-coverage": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/istanbul-lib-coverage/-/istanbul-lib-coverage-3.2.0.tgz", @@ -3230,6 +3241,11 @@ "node": ">=8" } }, + "node_modules/js-base64": { + "version": "3.7.5", + "resolved": "https://registry.npmjs.org/js-base64/-/js-base64-3.7.5.tgz", + "integrity": "sha512-3MEt5DTINKqfScXKfJFrRbxkrnk2AxPWGBL/ycjz4dK8iqiSJ06UxD8jh8xuh6p10TX4t2+7FsBYVxxQbMg+qA==" + }, "node_modules/js-yaml": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz", @@ -3306,6 +3322,25 @@ "node": ">= 0.8.0" } }, + "node_modules/lib0": { + "version": "0.2.87", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.87.tgz", + "integrity": "sha512-TbB63XJixvNToW2IHWAFsCJj9tVnajmwjE14p69i51Rx8byOQd2IP4ourE8v4d7vhyO++nVm1sQk3ePslfbucg==", + "dependencies": { + "isomorphic.js": "^0.2.4" + }, + "bin": { + "0gentesthtml": "bin/gentesthtml.js", + "0serve": "bin/0serve.js" + }, + "engines": { + "node": ">=16" + }, + "funding": { + "type": "GitHub Sponsors ❤", + "url": "https://github.com/sponsors/dmonad" + } + }, "node_modules/local-pkg": { "version": "0.4.3", "resolved": "https://registry.npmjs.org/local-pkg/-/local-pkg-0.4.3.tgz", @@ -5114,6 +5149,22 @@ "node": ">=10" } }, + "node_modules/yjs": { + "version": "13.6.8", + "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.6.8.tgz", + "integrity": "sha512-ZPq0hpJQb6f59B++Ngg4cKexDJTvfOgeiv0sBc4sUm8CaBWH7OQC4kcCgrqbjJ/B2+6vO49exvTmYfdlPtcjbg==", + "dependencies": { + "lib0": "^0.2.74" + }, + "engines": { + "node": ">=16.0.0", + "npm": ">=8.0.0" + }, + "funding": { + "type": "GitHub Sponsors ❤", + "url": "https://github.com/sponsors/dmonad" + } + }, "node_modules/yocto-queue": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.0.0.tgz", @@ -7292,6 +7343,11 @@ "integrity": "sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw==", "dev": true }, + "isomorphic.js": { + "version": "0.2.5", + "resolved": "https://registry.npmjs.org/isomorphic.js/-/isomorphic.js-0.2.5.tgz", + "integrity": "sha512-PIeMbHqMt4DnUP3MA/Flc0HElYjMXArsw1qwJZcm9sqR8mq3l8NYizFMty0pWwE/tzIGH3EKK5+jes5mAr85yw==" + }, "istanbul-lib-coverage": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/istanbul-lib-coverage/-/istanbul-lib-coverage-3.2.0.tgz", @@ -7330,6 +7386,11 @@ "istanbul-lib-report": "^3.0.0" } }, + "js-base64": { + "version": "3.7.5", + "resolved": "https://registry.npmjs.org/js-base64/-/js-base64-3.7.5.tgz", + "integrity": "sha512-3MEt5DTINKqfScXKfJFrRbxkrnk2AxPWGBL/ycjz4dK8iqiSJ06UxD8jh8xuh6p10TX4t2+7FsBYVxxQbMg+qA==" + }, "js-yaml": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz", @@ -7394,6 +7455,14 @@ "type-check": "~0.4.0" } }, + "lib0": { + "version": "0.2.87", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.87.tgz", + "integrity": "sha512-TbB63XJixvNToW2IHWAFsCJj9tVnajmwjE14p69i51Rx8byOQd2IP4ourE8v4d7vhyO++nVm1sQk3ePslfbucg==", + "requires": { + "isomorphic.js": "^0.2.4" + } + }, "local-pkg": { "version": "0.4.3", "resolved": "https://registry.npmjs.org/local-pkg/-/local-pkg-0.4.3.tgz", @@ -8648,6 +8717,14 @@ "integrity": "sha512-y11nGElTIV+CT3Zv9t7VKl+Q3hTQoT9a1Qzezhhl6Rp21gJ/IVTW7Z3y9EWXhuUBC2Shnf+DX0antecpAwSP8w==", "dev": true }, + "yjs": { + "version": "13.6.8", + "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.6.8.tgz", + "integrity": "sha512-ZPq0hpJQb6f59B++Ngg4cKexDJTvfOgeiv0sBc4sUm8CaBWH7OQC4kcCgrqbjJ/B2+6vO49exvTmYfdlPtcjbg==", + "requires": { + "lib0": "^0.2.74" + } + }, "yocto-queue": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.0.0.tgz", diff --git a/package.json b/package.json index 7349418..56ab7bd 100644 --- a/package.json +++ b/package.json @@ -59,9 +59,11 @@ "dependencies": { "@types/lodash": "^4.14.195", "ably": "^1.2.41", + "js-base64": "^3.7.5", "lodash": "^4.17.21", "pino": "^8.14.1", "rxjs": "^7.8.1", - "typedoc-plugin-missing-exports": "^2.1.0" + "typedoc-plugin-missing-exports": "^2.1.0", + "yjs": "^13.6.8" } } diff --git a/src/AblyProvider.test.ts b/src/AblyProvider.test.ts new file mode 100644 index 0000000..24f5ad9 --- /dev/null +++ b/src/AblyProvider.test.ts @@ -0,0 +1,146 @@ +import { Realtime, Types as AblyTypes } from 'ably/promises'; +import { fromUint8Array, toUint8Array } from 'js-base64'; +import { Subject } from 'rxjs'; +import { it, describe, expect, vi, beforeEach } from 'vitest'; +import * as Y from 'yjs'; + +import AblyProvider from './AblyProvider.js'; +import { customMessage } from './utilities/test/messages.js'; + +vi.mock('ably/promises'); + +interface ProviderTestContext { + ably: AblyTypes.RealtimePromise; + sendChannel: any; + receiveChannel: any; +} + +describe('AblyProvider', () => { + beforeEach((context) => { + const ably = new Realtime({}); + context.ably = ably; + const sendChannel = { + on: vi.fn(), + publish: vi.fn(), + attach: vi.fn(), + detach: vi.fn(), + }; + const receiveChannel = { + on: vi.fn(), + attach: vi.fn(), + detach: vi.fn(), + subscribe: vi.fn(), + history: vi.fn(), + }; + receiveChannel.history = vi.fn( + async (): Promise>> => ({ + items: [], + hasNext: () => false, + }), + ); + + context.sendChannel = sendChannel; + context.receiveChannel = receiveChannel; + + context.ably.channels.get = vi.fn((name: string) => { + if (name.startsWith('send')) { + return context.sendChannel; + } + + return context.receiveChannel; + }); + context.ably.channels.release = vi.fn(); + + ably.connection.whenState = vi.fn<[AblyTypes.ConnectionState], Promise>( + async () => { + return { + current: 'connected', + previous: 'initialized', + }; + }, + ); + + context.sendChannel = sendChannel; + context.receiveChannel = receiveChannel; + }); + + it('sends updates', (context) => { + const clientId = 'abc123'; + context.ably.clientId = clientId; + + const sendChannel = context.sendChannel; + + const provider = new AblyProvider(new Y.Doc(), { + ably: context.ably, + sendChannel: 'sendChannel', + receiveChannel: 'receiveChannel', + bootstrap: exampleBootstrapFunction, + }); + + provider.doc.getMap('mymap').set('field', 'value'); + + expect(sendChannel.publish).toHaveBeenCalledTimes(1); + const update = sendChannel.publish.mock.calls[0][1].update; + expect(sendChannel.publish).toHaveBeenCalledWith('update', { update: update, origin: clientId }); + + const yUpdate = toUint8Array(update); + const got = new Y.Doc(); + Y.applyUpdate(got, yUpdate); + expect(got.getMap('mymap').get('field')).toEqual('value'); + }); + + it('applies updates', async (context) => { + const clientId = 'foobarbaz'; + context.ably.clientId = clientId; + const remoteDoc = new Y.Doc(); + + let updates = new Subject(); + context.receiveChannel.subscribe = vi.fn(async (callback) => { + updates.subscribe((update) => { + callback(update); + }); + return { + current: 'attached', + previous: 'attaching', + resumed: false, + hasBacklog: false, + }; + }); + + // When a change is applied to the 'remoteDoc', that change + // becomes a channel message on the 'updates' subject, and applied + // to the local YDoc. + remoteDoc.on('update', (update, origin) => { + updates.next(customMessage('1', 'update', JSON.stringify({ update: fromUint8Array(update), origin: origin }))); + }); + + const provider = new AblyProvider(new Y.Doc(), { + ably: context.ably, + sendChannel: 'sendChannel', + receiveChannel: 'receiveChannel', + bootstrap: exampleBootstrapFunction, + }); + await new Promise((resolve) => { + provider.whenState('ready', '', () => { + resolve(); + }); + }); + const localUpdateReceived = new Promise((resolve) => { + provider.doc.on('update', (update, origin) => { + resolve(origin); + }); + }); + + // Change the 'remoteDoc', update should be propagated to local YDoc + remoteDoc.getMap('mymap').set('field', 'value'); + + // Wait for local update to be received + const origin = await localUpdateReceived; + expect(origin).toEqual('server'); + expect(provider.doc.getMap('mymap').get('field')).toEqual('value'); + }); +}); + +const exampleBootstrapFunction = async () => { + return { serverStateVector: Uint8Array.of(), serverUpdate: Uint8Array.of() }; +}; diff --git a/src/AblyProvider.ts b/src/AblyProvider.ts new file mode 100644 index 0000000..db5f90f --- /dev/null +++ b/src/AblyProvider.ts @@ -0,0 +1,98 @@ +import type { Types as AblyTypes } from 'ably/promises'; +import { fromUint8Array, toUint8Array } from 'js-base64'; +import * as Y from 'yjs'; + +import Model from './Model.js'; +import ModelsClient from './ModelsClient.js'; +import { OptimisticEvent, ConfirmedEvent } from './types/model.js'; +import EventEmitter from './utilities/EventEmitter.js'; + +type ProviderBootstrapFunc = (doc: Y.Doc) => Promise<{ serverStateVector: Uint8Array; serverUpdate: Uint8Array }>; + +export type ProviderOptions = { + ably: AblyTypes.RealtimePromise; + sendChannel: string; + receiveChannel: string; + bootstrap: ProviderBootstrapFunc; +}; + +type YJsModelStateType = { ydoc: Y.Doc; serverStateVector: Uint8Array }; + +type ModelSyncFunc = (doc: Y.Doc) => Promise<{ data: YJsModelStateType; sequenceID: string }>; + +export default class AblyProvider extends EventEmitter> { + private ably: AblyTypes.RealtimePromise; + private client: ModelsClient; + model: Model; + + private sendChannel: AblyTypes.RealtimeChannelPromise; + + constructor(readonly doc: Y.Doc, options: ProviderOptions) { + super(); + this.ably = options.ably; + this.client = new ModelsClient({ ably: options.ably }); + this.sendChannel = options.ably.channels.get(options.sendChannel); + + this.model = this.client.models.get({ + name: 'yjs:' + doc.guid, + channelName: options.receiveChannel, + sync: yjsSyncAdaptor(options.bootstrap, this.sendChannel, this.ably.clientId), + merge: yjsDefaultMerge, + }); + + doc.on('update', (update, origin) => { + if (origin === 'server') { + return; + } + + this.sendChannel.publish('update', { update: fromUint8Array(update), origin: origin || this.ably.clientId }); + }); + + this.model.sync(doc); + this.model.on('ready', () => { + this.emit('ready', 'ready'); + }); + } +} + +const yjsSyncAdaptor = function ( + fn: ProviderBootstrapFunc, + send: AblyTypes.RealtimeChannelPromise, + origin: string, +): (doc: Y.Doc) => Promise<{ data: YJsModelStateType; sequenceID: string }> { + return async (doc) => { + const { serverStateVector, serverUpdate } = await fn(doc); + + if (serverUpdate.length > 0) { + Y.applyUpdate(doc, serverUpdate, 'server'); + } + + if (serverStateVector.length > 0) { + const update = Y.encodeStateAsUpdate(doc, serverStateVector); + send.publish('sync', { update: fromUint8Array(update), origin: origin }); + } + + return Promise.resolve({ + data: { ydoc: doc, serverStateVector: serverStateVector }, + sequenceID: '', + }); + }; +}; + +/* + * This merge function is a default for merging an update into a YDoc + */ +const yjsDefaultMerge = async (state: YJsModelStateType, event: OptimisticEvent | ConfirmedEvent) => { + if (!event.confirmed) { + return state; + } + + if (event.name !== 'update') { + return state; + } + + const data = JSON.parse(event.data) as { update: string; origin: string }; + const yUpdate = toUint8Array(data.update); + Y.applyUpdate(state.ydoc, yUpdate, data.origin || 'server'); + return state; +};