Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add sd-streams from https://github.com/stardazed/sd-streams/blob/mast… #3192

Merged
merged 12 commits into from Oct 28, 2019
77 changes: 76 additions & 1 deletion cli/js/body.ts
Expand Up @@ -3,6 +3,7 @@ import * as blob from "./blob.ts";
import * as encoding from "./text_encoding.ts";
import * as headers from "./headers.ts";
import * as domTypes from "./dom_types.ts";
import { ReadableStream } from "./streams/mod.ts";

const { Headers } = headers;

Expand All @@ -12,6 +13,13 @@ const { TextEncoder, TextDecoder } = encoding;
const Blob = blob.DenoBlob;
const DenoBlob = blob.DenoBlob;

type ReadableStreamReader = domTypes.ReadableStreamReader;

interface ReadableStreamController {
enqueue(chunk: string | ArrayBuffer): void;
close(): void;
}

export type BodySource =
| domTypes.Blob
| domTypes.BufferSource
Expand All @@ -37,6 +45,8 @@ function validateBodyType(owner: Body, bodySource: BodySource): boolean {
return true;
} else if (typeof bodySource === "string") {
return true;
} else if (bodySource instanceof ReadableStream) {
return true;
} else if (bodySource instanceof FormData) {
return true;
} else if (!bodySource) {
Expand All @@ -47,6 +57,58 @@ function validateBodyType(owner: Body, bodySource: BodySource): boolean {
);
}

function concatenate(...arrays: Uint8Array[]): ArrayBuffer {
let totalLength = 0;
for (const arr of arrays) {
totalLength += arr.length;
}
const result = new Uint8Array(totalLength);
let offset = 0;
for (const arr of arrays) {
result.set(arr, offset);
offset += arr.length;
}
return result.buffer as ArrayBuffer;
}

function bufferFromStream(stream: ReadableStreamReader): Promise<ArrayBuffer> {
return new Promise(
(resolve, reject): void => {
const parts: Uint8Array[] = [];
const encoder = new TextEncoder();
// recurse
(function pump(): void {
stream
.read()
.then(
({ done, value }): void => {
if (done) {
return resolve(concatenate(...parts));
}

if (typeof value === "string") {
parts.push(encoder.encode(value));
} else if (value instanceof ArrayBuffer) {
parts.push(new Uint8Array(value));
} else if (!value) {
// noop for undefined
} else {
reject("unhandled type on stream read");
}

return pump();
}
)
.catch(
(err): void => {
reject(err);
}
);
})();
}
);
}

function getHeaderValueParams(value: string): Map<string, string> {
const params = new Map();
// Forced to do so for some Map constructor param mismatch
Expand Down Expand Up @@ -81,8 +143,18 @@ export class Body implements domTypes.Body {
if (this._stream) {
return this._stream;
}

if (this._bodySource instanceof ReadableStream) {
// @ts-ignore
this._stream = this._bodySource;
}
if (typeof this._bodySource === "string") {
throw Error("not implemented");
this._stream = new ReadableStream({
start(controller: ReadableStreamController): void {
controller.enqueue(this._bodySource);
controller.close();
}
});
}
return this._stream;
}
Expand Down Expand Up @@ -259,6 +331,9 @@ export class Body implements domTypes.Body {
} else if (typeof this._bodySource === "string") {
const enc = new TextEncoder();
return enc.encode(this._bodySource).buffer as ArrayBuffer;
} else if (this._bodySource instanceof ReadableStream) {
// @ts-ignore
return bufferFromStream(this._bodySource.getReader());
} else if (this._bodySource instanceof FormData) {
const enc = new TextEncoder();
return enc.encode(this._bodySource.toString()).buffer as ArrayBuffer;
Expand Down
140 changes: 126 additions & 14 deletions cli/js/dom_types.ts
Expand Up @@ -248,7 +248,7 @@ export interface AddEventListenerOptions extends EventListenerOptions {
passive: boolean;
}

interface AbortSignal extends EventTarget {
export interface AbortSignal extends EventTarget {
readonly aborted: boolean;
onabort: ((this: AbortSignal, ev: ProgressEvent) => any) | null;
addEventListener<K extends keyof AbortSignalEventMap>(
Expand All @@ -273,19 +273,6 @@ interface AbortSignal extends EventTarget {
): void;
}

export interface ReadableStream {
readonly locked: boolean;
cancel(): Promise<void>;
getReader(): ReadableStreamReader;
tee(): [ReadableStream, ReadableStream];
}

export interface ReadableStreamReader {
cancel(): Promise<void>;
read(): Promise<any>;
releaseLock(): void;
}

export interface FormData extends DomIterable<string, FormDataEntryValue> {
append(name: string, value: string | Blob, fileName?: string): void;
delete(name: string): void;
Expand Down Expand Up @@ -343,6 +330,131 @@ export interface Body {
text(): Promise<string>;
}

export interface ReadableStream {
readonly locked: boolean;
cancel(reason?: any): Promise<void>;
getReader(): ReadableStreamReader;
tee(): ReadableStream[];
}

export interface UnderlyingSource<R = any> {
cancel?: ReadableStreamErrorCallback;
pull?: ReadableStreamDefaultControllerCallback<R>;
start?: ReadableStreamDefaultControllerCallback<R>;
type?: undefined;
}

export interface UnderlyingByteSource {
autoAllocateChunkSize?: number;
cancel?: ReadableStreamErrorCallback;
pull?: ReadableByteStreamControllerCallback;
start?: ReadableByteStreamControllerCallback;
type: "bytes";
}

export interface ReadableStreamReader {
cancel(reason?: any): Promise<void>;
read(): Promise<any>;
releaseLock(): void;
}

export interface ReadableStreamErrorCallback {
(reason: any): void | PromiseLike<void>;
}

export interface ReadableByteStreamControllerCallback {
(controller: ReadableByteStreamController): void | PromiseLike<void>;
}

export interface ReadableStreamDefaultControllerCallback<R> {
(controller: ReadableStreamDefaultController<R>): void | PromiseLike<void>;
}

export interface ReadableStreamDefaultController<R = any> {
readonly desiredSize: number | null;
close(): void;
enqueue(chunk: R): void;
error(error?: any): void;
}

export interface ReadableByteStreamController {
readonly byobRequest: ReadableStreamBYOBRequest | undefined;
readonly desiredSize: number | null;
close(): void;
enqueue(chunk: ArrayBufferView): void;
error(error?: any): void;
}

export interface ReadableStreamBYOBRequest {
readonly view: ArrayBufferView;
respond(bytesWritten: number): void;
respondWithNewView(view: ArrayBufferView): void;
}
/* TODO reenable these interfaces. These are needed to enable WritableStreams in js/streams/
export interface WritableStream<W = any> {
readonly locked: boolean;
abort(reason?: any): Promise<void>;
getWriter(): WritableStreamDefaultWriter<W>;
}

TODO reenable these interfaces. These are needed to enable WritableStreams in js/streams/
export interface UnderlyingSink<W = any> {
abort?: WritableStreamErrorCallback;
close?: WritableStreamDefaultControllerCloseCallback;
start?: WritableStreamDefaultControllerStartCallback;
type?: undefined;
write?: WritableStreamDefaultControllerWriteCallback<W>;
}

export interface PipeOptions {
preventAbort?: boolean;
preventCancel?: boolean;
preventClose?: boolean;
signal?: AbortSignal;
}


export interface WritableStreamDefaultWriter<W = any> {
readonly closed: Promise<void>;
readonly desiredSize: number | null;
readonly ready: Promise<void>;
abort(reason?: any): Promise<void>;
close(): Promise<void>;
releaseLock(): void;
write(chunk: W): Promise<void>;
}

export interface WritableStreamErrorCallback {
(reason: any): void | PromiseLike<void>;
}

export interface WritableStreamDefaultControllerCloseCallback {
(): void | PromiseLike<void>;
}

export interface WritableStreamDefaultControllerStartCallback {
(controller: WritableStreamDefaultController): void | PromiseLike<void>;
}

export interface WritableStreamDefaultControllerWriteCallback<W> {
(chunk: W, controller: WritableStreamDefaultController): void | PromiseLike<
void
>;
}

export interface WritableStreamDefaultController {
error(error?: any): void;
}
*/
export interface QueuingStrategy<T = any> {
highWaterMark?: number;
size?: QueuingStrategySizeCallback<T>;
}

export interface QueuingStrategySizeCallback<T = any> {
(chunk: T): number;
}

export interface Headers extends DomIterable<string, string> {
/** Appends a new value onto an existing header inside a `Headers` object, or
* adds the header if it does not already exist.
Expand Down
5 changes: 4 additions & 1 deletion cli/js/errors.ts
Expand Up @@ -76,5 +76,8 @@ export enum ErrorKind {
TooManyRedirects = 48,
Diagnostic = 49,
JSError = 50,
TypeError = 51
TypeError = 51,

/** TODO this is a DomException type, and should be moved out of here when possible */
DataCloneError = 52
}
1 change: 0 additions & 1 deletion cli/js/globals.ts
Expand Up @@ -26,7 +26,6 @@ import * as url from "./url.ts";
import * as urlSearchParams from "./url_search_params.ts";
import * as workers from "./workers.ts";
import * as performanceUtil from "./performance.ts";

import * as request from "./request.ts";

// These imports are not exposed and therefore are fine to just import the
Expand Down
10 changes: 9 additions & 1 deletion cli/js/request.ts
Expand Up @@ -2,8 +2,10 @@
import * as headers from "./headers.ts";
import * as body from "./body.ts";
import * as domTypes from "./dom_types.ts";
import * as streams from "./streams/mod.ts";

const { Headers } = headers;
const { ReadableStream } = streams;

function byteUpperCase(s: string): string {
return String(s).replace(/[a-z]/g, function byteUpperCaseReplace(c): string {
Expand Down Expand Up @@ -138,7 +140,13 @@ export class Request extends body.Body implements domTypes.Request {
headersList.push(header);
}

const body2 = this._bodySource;
let body2 = this._bodySource;

if (this._bodySource instanceof ReadableStream) {
const tees = (this._bodySource as domTypes.ReadableStream).tee();
this._stream = this._bodySource = tees[0];
body2 = tees[1];
}

const cloned = new Request(this.url, {
body: body2,
Expand Down
34 changes: 33 additions & 1 deletion cli/js/request_test.ts
@@ -1,5 +1,5 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import { test, assertEquals } from "./test_util.ts";
import { test, assert, assertEquals } from "./test_util.ts";

test(function fromInit(): void {
const req = new Request("https://example.com", {
Expand All @@ -15,3 +15,35 @@ test(function fromInit(): void {
assertEquals(req.url, "https://example.com");
assertEquals(req.headers.get("test-header"), "value");
});

test(function fromRequest(): void {
const r = new Request("https://example.com");
// @ts-ignore
r._bodySource = "ahoyhoy";
r.headers.set("test-header", "value");

const req = new Request(r);

// @ts-ignore
assertEquals(req._bodySource, r._bodySource);
assertEquals(req.url, r.url);
assertEquals(req.headers.get("test-header"), r.headers.get("test-header"));
});

test(async function cloneRequestBodyStream(): Promise<void> {
// hack to get a stream
const stream = new Request("", { body: "a test body" }).body;
const r1 = new Request("https://example.com", {
body: stream
});

const r2 = r1.clone();

const b1 = await r1.text();
const b2 = await r2.text();

assertEquals(b1, b2);

// @ts-ignore
assert(r1._bodySource !== r2._bodySource);
});
20 changes: 20 additions & 0 deletions cli/js/streams/mod.ts
@@ -0,0 +1,20 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT

/**
* @stardazed/streams - implementation of the web streams standard
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/

export { SDReadableStream as ReadableStream } from "./readable-stream.ts";
/* TODO The following are currently unused so not exported for clarity.
export { WritableStream } from "./writable-stream.ts";

export { TransformStream } from "./transform-stream.ts";
export {
ByteLengthQueuingStrategy,
CountQueuingStrategy
} from "./strategies.ts";
*/