Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Notification based BufferedDataConnection #1143

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 22 additions & 26 deletions lib/dataconnection/BufferedConnection/BinaryPack.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BinaryPackChunker, concatArrayBuffers } from "./binaryPackChunker";
import { BinaryPackChunk, BinaryPackChunker, concatArrayBuffers, isBinaryPackChunk } from "./binaryPackChunker";
import logger from "../../logger";
import type { Peer } from "../../peer";
import { BufferedConnection } from "./BufferedConnection";
Expand Down Expand Up @@ -27,33 +27,29 @@ export class BinaryPack extends BufferedConnection {
}

// Handles a DataChannel message.
protected override _handleDataMessage({ data }: { data: Uint8Array }): void {
const deserializedData = unpack(data);

// PeerJS specific message
const peerData = deserializedData["__peerData"];
if (peerData) {
if (peerData.type === "close") {
this.close();
return;
}

// Chunked data -- piece things back together.
// @ts-ignore
this._handleChunk(deserializedData);
return;
}
protected _handleDataMessage({ data }: { data: Uint8Array }): void {
const deserializedData = unpack(data);

// PeerJS specific message
const peerData = deserializedData["__peerData"];
if (peerData) {
if (peerData.type === "close") {
this.close();
return;
}
}

if (isBinaryPackChunk(deserializedData)) {
this._handleChunk(deserializedData);
return;
}

this.emit("data", deserializedData);
}

this.emit("data", deserializedData);
}

private _handleChunk(data: {
__peerData: number;
n: number;
total: number;
data: ArrayBuffer;
}): void {
const id = data.__peerData;
private _handleChunk(data: BinaryPackChunk): void {
const id = data.id;
const chunkInfo = this._chunkedData[id] || {
data: [],
count: 0,
Expand Down
35 changes: 31 additions & 4 deletions lib/dataconnection/BufferedConnection/binaryPackChunker.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
export interface BinaryPackChunk {
id: number
n: number
total: number
data: ArrayBuffer
};

export function isBinaryPackChunk(obj: any): obj is BinaryPackChunk {
return typeof obj === 'object' && 'id' in obj;
}

export class BinaryPackChunker {
readonly chunkedMTU = 16300; // The original 60000 bytes setting does not work when sending data from Firefox to Chrome, which is "cut off" after 16384 bytes and delivered individually.

// Binary stuff

private _dataCount: number = 1;

public get nextID(): number {
return this._dataCount;
}

chunk = (
blob: ArrayBuffer,
): { __peerData: number; n: number; total: number; data: Uint8Array }[] => {
const chunks = [];
): BinaryPackChunk[] => {
const chunks: BinaryPackChunk[] = [];
const size = blob.byteLength;
const total = Math.ceil(size / this.chunkedMTU);

Expand All @@ -19,8 +34,8 @@ export class BinaryPackChunker {
const end = Math.min(size, start + this.chunkedMTU);
const b = blob.slice(start, end);

const chunk = {
__peerData: this._dataCount,
const chunk: BinaryPackChunk = {
id: this._dataCount,
n: index,
data: b,
total,
Expand All @@ -36,6 +51,18 @@ export class BinaryPackChunker {

return chunks;
};

singleChunk = (blob: ArrayBuffer): BinaryPackChunk => {
const id = this._dataCount;
this._dataCount++;

return {
id,
n: 0,
total: 1,
data: new Uint8Array(blob),
};
}
}

export function concatArrayBuffers(bufs: Uint8Array[]) {
Expand Down
178 changes: 178 additions & 0 deletions lib/dataconnection/BufferedNotifyConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import { pack, unpack } from "peerjs-js-binarypack";
import logger from "../logger";
import { DataConnection, SendData } from "./DataConnection";
import { BinaryPackChunk, BinaryPackChunker, concatArrayBuffers, isBinaryPackChunk } from "./BufferedConnection/binaryPackChunker";


export class BufferedNotifyConnection extends DataConnection {
readonly serialization = 'notify';
private readonly chunker = new BinaryPackChunker();

private _chunkedData: {
[id: number]: {
data: Uint8Array[];
count: number;
total: number;
};
} = {};

private _buffer: BinaryPackChunk[] = [];
private _bufferSize = 0;
private _buffering = false;

public get bufferSize(): number {
return this._bufferSize;
}

public get nextID(): number {
return this.chunker.nextID;
}

public override _initializeDataChannel(dc: RTCDataChannel) {
super._initializeDataChannel(dc);
this.dataChannel.binaryType = "arraybuffer";
this.dataChannel.addEventListener("message", (e) =>
this._handleDataMessage(e),
);
}

// Handles a DataChannel message.
protected _handleDataMessage({ data }: { data: Uint8Array }): void {
const deserializedData = unpack(data);

// PeerJS specific message
const peerData = deserializedData["__peerData"];
if (peerData) {
if (peerData.type === "close") {
this.close();
return;
}
}

if (isBinaryPackChunk(deserializedData)) {
this._handleChunk(deserializedData);
return;
}

this.emit("data", deserializedData);
}

private _handleChunk(data: BinaryPackChunk): void {
const id = data.id;
const chunkInfo = this._chunkedData[id] || {
data: [],
count: 0,
total: data.total,
};

chunkInfo.data[data.n] = new Uint8Array(data.data);
chunkInfo.count++;
this._chunkedData[id] = chunkInfo;

if (chunkInfo.total === chunkInfo.count) {
// Clean up before making the recursive call to `_handleDataMessage`.
delete this._chunkedData[id];

// We've received all the chunks--time to construct the complete data.
// const data = new Blob(chunkInfo.data);
const data = concatArrayBuffers(chunkInfo.data);
this._handleDataMessage({ data });
}
}

public SendWithCallback(data: any, callback: (chunk: BinaryPackChunk) => void): SendData {
throw new Error("Method not implemented.");
}

protected _send(data: any): void {
const blob = pack(data);

if (blob.byteLength > this.chunker.chunkedMTU) {

const blobs = this.chunker.chunk(blob);
logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);

for (const blob of blobs) {
this._bufferedSend(blob);
}
return;

}
//We send everything in one chunk
const msg = this.chunker.singleChunk(blob);
this._bufferedSend(msg);
}

protected _bufferedSend(msg: BinaryPackChunk): void {
if (this._buffering || !this._trySend(msg)) {
this._buffer.push(msg);
this._bufferSize = this._buffer.length;
}
}

// Returns true if the send succeeds.
private _trySend(msg: BinaryPackChunk): boolean {
if (!this.open) {
return false;
}

if (this.dataChannel.bufferedAmount > DataConnection.MAX_BUFFERED_AMOUNT) {
this._buffering = true;
setTimeout(() => {
this._buffering = false;
this._tryBuffer();
}, 50);

return false;
}

try {
// Send notification
this.emit("sentChunk", { id: msg.id, n: msg.n, total: msg.total });
const msgPacked = pack(msg as any);
this.dataChannel.send(msgPacked);
} catch (e) {
logger.error(`DC#:${this.connectionId} Error when sending:`, e);
this._buffering = true;

this.close();

return false;
}

return true;
}

// Try to send the first message in the buffer.
private _tryBuffer(): void {
if (!this.open) {
return;
}

if (this._buffer.length === 0) {
return;
}

const msg = this._buffer[0];

if (this._trySend(msg)) {
this._buffer.shift();
this._bufferSize = this._buffer.length;
this._tryBuffer();
}
}

public override close(options?: { flush?: boolean }) {
if (options?.flush) {
this.send({
__peerData: {
type: "close",
},
});
return;
}
this._buffer = [];
this._bufferSize = 0;
super.close();
}
}
37 changes: 35 additions & 2 deletions lib/dataconnection/DataConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@ import type { ServerMessage } from "../servermessage";
import type { EventsWithError } from "../peerError";
import { randomToken } from "../utils/randomToken";

export interface SendData {
id: number,
total: number
}

export interface ChunkSentNotification extends SendData {
n: number
}

export interface DataConnectionEvents
extends EventsWithError<DataConnectionErrorType | BaseConnectionErrorType>,
BaseConnectionEvents<DataConnectionErrorType | BaseConnectionErrorType> {
BaseConnectionEvents<DataConnectionErrorType | BaseConnectionErrorType> {
/**
* Emitted when data is received from the remote peer.
*/
Expand All @@ -23,6 +32,13 @@ export interface DataConnectionEvents
* Emitted when the connection is established and ready-to-use.
*/
open: () => void;

/**
* Emitted when the connection sends out a chunk of data to the remote peer.
* Currently Only implemented by BufferedNotifyConnection.
*/
sentChunk: (chunk: ChunkSentNotification) => void;

}

/**
Expand Down Expand Up @@ -126,7 +142,24 @@ export abstract class DataConnection extends BaseConnection<

protected abstract _send(data: any, chunked: boolean): void;

/** Allows user to send data. */

/**
* Allows user to send data.
* @param data
* @param chunked
* @example
*
* const nextId = conn.nextID;
* conn.on('sentChunk', (chunk) => {
* if (chunk.id === nextId) {
* console.log('Sent chunk', chunk);
* if (chunk.n == chunk.total - 1) {
* console.log('Sent last chunk');
* }
* }
* });
* conn.send(arr);
*/
public send(data: any, chunked = false) {
if (!this.open) {
this.emitError(
Expand Down
3 changes: 2 additions & 1 deletion lib/exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ export type {
CallOption,
} from "./optionInterfaces";
export type { UtilSupportsObj } from "./util";
export type { DataConnection } from "./dataconnection/DataConnection";
export type { DataConnection, ChunkSentNotification, SendData } from "./dataconnection/DataConnection";
export type { MediaConnection } from "./mediaconnection";
export type { LogLevel } from "./logger";
export * from "./enums";

export { BufferedConnection } from "./dataconnection/BufferedConnection/BufferedConnection";
export { StreamConnection } from "./dataconnection/StreamConnection/StreamConnection";
export { Cbor } from "./dataconnection/StreamConnection/Cbor";
export { BufferedNotifyConnection } from "./dataconnection/BufferedNotifyConnection";
export { MsgPack } from "./dataconnection/StreamConnection/MsgPack";
export type { SerializerMapping } from "./peer";

Expand Down
2 changes: 2 additions & 0 deletions lib/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Raw } from "./dataconnection/BufferedConnection/Raw";
import { Json } from "./dataconnection/BufferedConnection/Json";

import { EventEmitterWithError, PeerError } from "./peerError";
import { BufferedNotifyConnection } from "./exports";

class PeerOptions implements PeerJSOption {
/**
Expand Down Expand Up @@ -118,6 +119,7 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
json: Json,
binary: BinaryPack,
"binary-utf8": BinaryPack,
notify: BufferedNotifyConnection,

default: BinaryPack,
};
Expand Down