Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
"lib0": "^0.2.76",
"uuid": "^9.0.0",
"webpack": "^5.77.0",
"webpack-cli": "^5.0.1"
"webpack-cli": "^5.0.1",
"y-protocols": "^1.0.6"
},
"resolutions": {
"@jupyterlab/apputils": "~4.0.0",
Expand Down
8 changes: 8 additions & 0 deletions src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { JSONExt, JSONObject, PromiseDelegate } from '@lumino/coreutils';
import { ISignal, Signal } from '@lumino/signaling';
import * as Y from 'yjs';

import type { Awareness } from 'y-protocols/awareness';
import { IJupyterYDoc, IJupyterYModel } from './types';

export class JupyterYModel implements IJupyterYModel {
Expand Down Expand Up @@ -49,6 +50,13 @@ export class JupyterYModel implements IJupyterYModel {
return this._ready.promise;
}

/**
* Override in subclasses that attach to a @jupyter/ydoc YDocument
*/
get awareness(): Awareness | undefined {
return undefined;
}

protected async initialize(commMetadata: { [key: string]: any }) {
this.ydoc = this.ydocFactory(commMetadata);
this.sharedModel = new JupyterYDoc(commMetadata, this._ydoc);
Expand Down
3 changes: 2 additions & 1 deletion src/notebookrenderer/widgetManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ export class WidgetModelRegistry implements IJupyterYWidgetModelRegistry {

new YCommProvider({
comm,
ydoc: yModel.sharedModel.ydoc
ydoc: yModel.sharedModel.ydoc,
awareness: yModel.awareness
});
this._yModels.set(comm.commId, yModel);
};
Expand Down
79 changes: 73 additions & 6 deletions src/notebookrenderer/yCommProvider.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { Kernel, KernelMessage } from '@jupyterlab/services';
import * as decoding from 'lib0/decoding';
import * as encoding from 'lib0/encoding';
import {
Awareness,
applyAwarenessUpdate,
encodeAwarenessUpdate
} from 'y-protocols/awareness';
import * as syncProtocol from 'y-protocols/sync';
import * as Y from 'yjs';
import { IDisposable } from '@lumino/disposable';
Expand All @@ -9,17 +14,45 @@ export enum YMessageType {
SYNC = 0,
AWARENESS = 1
}

export interface IYCommProviderOptions {
comm: Kernel.IComm;
ydoc: Y.Doc;
/**
* If omitted, a new Awareness is created for this doc.
* When the UI is backed by a shared Y doc (e.g. @jupyter/ydoc), pass that
* document’s Awareness so comm traffic matches the rest of the session.
*/
awareness?: Awareness;
}

export class YCommProvider implements IDisposable {
constructor(options: { comm: Kernel.IComm; ydoc: Y.Doc }) {
constructor(options: IYCommProviderOptions) {
this._comm = options.comm;
this._ydoc = options.ydoc;

if (options.awareness) {
this._awareness = options.awareness;
this._ownsAwareness = false;
} else {
this._awareness = new Awareness(this._ydoc);
this._ownsAwareness = true;
}

this._ydoc.on('update', this._updateHandler);
this._awareness.on('update', this._awarenessUpdateHandler);

this._connect();
}

get doc(): Y.Doc {
return this._ydoc;
}

get awareness(): Awareness {
return this._awareness;
}

get synced(): boolean {
return this._synced;
}
Expand All @@ -38,9 +71,15 @@ export class YCommProvider implements IDisposable {
if (this._isDisposed) {
return;
}
this._ydoc.off('update', this._updateHandler);
this._awareness.off('update', this._awarenessUpdateHandler);
if (this._ownsAwareness) {
this._awareness.destroy();
}
this._comm.close();
this._isDisposed = true;
}

private _onMsg = (msg: KernelMessage.ICommMsgMsg<'iopub' | 'shell'>) => {
if (msg.buffers) {
const buffer = msg.buffers[0] as ArrayBuffer;
Expand All @@ -54,13 +93,31 @@ export class YCommProvider implements IDisposable {
}
};

private _updateHandler = (update, origin) => {
private _updateHandler = (update: Uint8Array, origin: unknown) => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, YMessageType.SYNC);
syncProtocol.writeUpdate(encoder, update);
this._sendOverComm(encoding.toUint8Array(encoder));
};

private _awarenessUpdateHandler = (change: {
added: number[];
updated: number[];
removed: number[];
}) => {
const { added, updated, removed } = change;
const clients = added.concat(updated, removed);
if (clients.length === 0) {
return;
}

const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, YMessageType.AWARENESS);
const awarenessBody = encodeAwarenessUpdate(this._awareness, clients);
encoding.writeVarUint8Array(encoder, awarenessBody);
this._sendOverComm(encoding.toUint8Array(encoder));
};

private _connect() {
this._sync();
this._comm.onMsg = this._onMsg;
Expand All @@ -79,6 +136,8 @@ export class YCommProvider implements IDisposable {

private _comm: Kernel.IComm;
private _ydoc: Y.Doc;
private _awareness: Awareness;
private _ownsAwareness: boolean;
private _synced: boolean;
private _isDisposed = false;
}
Expand Down Expand Up @@ -106,6 +165,7 @@ namespace Private {
provider.synced = true;
}
}

export function readMessage(
provider: YCommProvider,
buf: Uint8Array,
Expand All @@ -115,10 +175,17 @@ namespace Private {
const encoder = encoding.createEncoder();
const messageType = decoding.readVarUint(decoder);

if (messageType === YMessageType.SYNC) {
syncMessageHandler(encoder, decoder, provider, emitSynced);
} else {
console.error('Unable to compute message');
switch (messageType) {
case YMessageType.SYNC:
syncMessageHandler(encoder, decoder, provider, emitSynced);
break;
case YMessageType.AWARENESS: {
const awarenessUpdate = decoding.readVarUint8Array(decoder);
applyAwarenessUpdate(provider.awareness, awarenessUpdate, null);
break;
}
default:
console.error('Unable to compute message');
}
return encoder;
}
Expand Down
7 changes: 7 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as Y from 'yjs';
import { ISignal } from '@lumino/signaling';
import { JSONObject } from '@lumino/coreutils';
import { IDisposable } from '@lumino/disposable';
import type { Awareness } from 'y-protocols/awareness';

export interface IJupyterYDocChange {
attrsChange?: MapChange;
Expand Down Expand Up @@ -32,4 +33,10 @@ export interface IJupyterYModel extends IDisposable {
disposed: ISignal<any, void>;

ready: Promise<void>;

/**
* When set YCommProvider uses this Awareness
* instead of creating a second instance on the same Y.Doc
*/
awareness?: Awareness;
}
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5728,6 +5728,17 @@ __metadata:
languageName: node
linkType: hard

"y-protocols@npm:^1.0.6":
version: 1.0.7
resolution: "y-protocols@npm:1.0.7"
dependencies:
lib0: ^0.2.85
peerDependencies:
yjs: ^13.0.0
checksum: d58579ee542b6b9f4e3c3223fd24efcfd4af6f97acdb40a3d362713dcce812dd3dd086cc1f221a6ccb50bd373ff4b3d30fc2462a17f7804e0d1a8bd187099f31
languageName: node
linkType: hard

"yjs-widgets@workspace:.":
version: 0.0.0-use.local
resolution: "yjs-widgets@workspace:."
Expand Down Expand Up @@ -5761,6 +5772,7 @@ __metadata:
uuid: ^9.0.0
webpack: ^5.77.0
webpack-cli: ^5.0.1
y-protocols: ^1.0.6
languageName: unknown
linkType: soft

Expand Down
Loading