Skip to content

Commit

Permalink
add support for reading streaming format via node streams
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt committed May 13, 2018
1 parent 4e80851 commit e75da13
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 30 deletions.
1 change: 1 addition & 0 deletions js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
"rxjs": "5.5.6",
"shx": "0.2.2",
"source-map-loader": "0.2.3",
"stream-to-iterator": "3.0.1",
"trash": "4.2.1",
"ts-jest": "22.0.1",
"tslint": "5.9.1",
Expand Down
8 changes: 6 additions & 2 deletions js/src/Arrow.externs.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,13 @@ Predicate.prototype.not;
Predicate.prototype.ands;
var Literal = function() {};

var TableToStringIterator = function() {};
var PipeIterator = function() {};
/** @type {?} */
TableToStringIterator.prototype.pipe;
PipeIterator.prototype.pipe;

var AsyncPipeIterator = function() {};
/** @type {?} */
AsyncPipeIterator.prototype.pipe;

var RecordBatch = function() {};
/** @type {?} */
Expand Down
16 changes: 14 additions & 2 deletions js/src/Arrow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import * as data_ from './data';
import * as vector_ from './vector';
import * as util_int_ from './util/int';
import * as util_bit_ from './util/bit';
import * as util_node from './util/node';
import * as visitor_ from './visitor';
import * as view_ from './vector/view';
import * as predicate_ from './predicate';
import { Vector } from './vector';
import { RecordBatch } from './recordbatch';
import { Schema, Field, Type } from './type';
import { Table, DataFrame, NextFunc, BindFunc, CountByResult } from './table';
import { read, readAsync } from './ipc/reader/arrow';
import { fromNodeStream } from './ipc/reader/node';
import { read, readAsync, readNodeStream } from './ipc/reader/arrow';
import { serializeFile, serializeStream } from './ipc/writer/binary';

export import View = vector_.View;
export import VectorLike = vector_.VectorLike;
Expand All @@ -36,7 +39,9 @@ export import IntBitWidth = type_.IntBitWidth;
export import TimeBitWidth = type_.TimeBitWidth;
export import TypedArrayConstructor = type_.TypedArrayConstructor;

export { read, readAsync };
export { fromNodeStream };
export { read, readAsync, readNodeStream };
export { serializeFile, serializeStream };
export { Table, DataFrame, NextFunc, BindFunc, CountByResult };
export { Field, Schema, RecordBatch, Vector, Type };

Expand All @@ -45,6 +50,8 @@ export namespace util {
export import Int64 = util_int_.Int64;
export import Int128 = util_int_.Int128;
export import packBools = util_bit_.packBools;
export import PipeIterator = util_node.PipeIterator;
export import AsyncPipeIterator = util_node.AsyncPipeIterator;
}

export namespace data {
Expand Down Expand Up @@ -202,6 +209,11 @@ try {

Arrow['read'] = read;
Arrow['readAsync'] = readAsync;
Arrow['readNodeStream'] = readNodeStream;
Arrow['fromNodeStream'] = fromNodeStream;

Arrow['serializeFile'] = serializeFile;
Arrow['serializeStream'] = serializeStream;

Arrow['Type'] = Type;
Arrow['Field'] = Field;
Expand Down
7 changes: 7 additions & 0 deletions js/src/ipc/reader/arrow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

import { readJSON } from './json';
import { fromNodeStream } from './node';
import { RecordBatch } from '../../recordbatch';
import { readBuffers, readBuffersAsync } from './binary';
import { readRecordBatches, readRecordBatchesAsync, TypeDataLoader } from './vector';
Expand Down Expand Up @@ -46,3 +47,9 @@ export async function* readAsync(sources: AsyncIterable<Uint8Array | Buffer | st
yield recordBatch;
}
}

export async function* readNodeStream(stream: NodeJS.ReadableStream) {
for await (const recordBatch of readAsync(fromNodeStream(stream))) {
yield recordBatch as RecordBatch;
}
}
65 changes: 65 additions & 0 deletions js/src/ipc/reader/node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import { PADDING } from '../magic';
import { flatbuffers } from 'flatbuffers';
import * as Message_ from '../../fb/Message';
import ByteBuffer = flatbuffers.ByteBuffer;
import _Message = Message_.org.apache.arrow.flatbuf.Message;

export async function* fromNodeStream(stream: NodeJS.ReadableStream) {

let bb: ByteBuffer;
let bytesRead = 0, bytes = new Uint8Array(0);
let messageLength = 0, message: _Message | null = null;

for await (let chunk of (stream as any as AsyncIterable<Uint8Array | Buffer | string>)) {

const grown = new Uint8Array(bytes.length + chunk.length);

if (typeof chunk !== 'string') {
grown.set(bytes, 0) || grown.set(chunk, bytes.length);
} else {
for (let i = -1, j = bytes.length, n = chunk.length; ++i < n;) {
grown[i + j] = chunk.charCodeAt(i);
}
}

bytes = grown;

if (messageLength <= 0) {
messageLength = new DataView(bytes.buffer).getInt32(0, true);
}

while (messageLength < bytes.length) {
if (!message) {
(bb = new ByteBuffer(bytes)).setPosition(4);
if (message = _Message.getRootAsMessage(bb)) {
messageLength += message.bodyLength().low;
continue;
}
throw new Error(`Invalid message at position ${bytesRead}`);
}
bytesRead += messageLength + PADDING;
yield bytes.subarray(0, messageLength);
bytes = bytes.subarray(messageLength + PADDING);
messageLength = bytes.length <= 0 ? 0 :
new DataView(bytes.buffer).getInt32(bytes.byteOffset, true);
message = null;
}
}
}
29 changes: 3 additions & 26 deletions js/src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Col, Predicate } from './predicate';
import { Schema, Field, Struct } from './type';
import { read, readAsync } from './ipc/reader/arrow';
import { writeTableBinary } from './ipc/writer/arrow';
import { PipeIterator } from './util/node';
import { isPromise, isAsyncIterable } from './util/compat';
import { Vector, DictionaryVector, IntVector, StructVector } from './vector';
import { ChunkedView } from './vector/chunked';
Expand Down Expand Up @@ -184,8 +185,8 @@ export class Table implements DataFrame {
public serialize(encoding = 'binary', stream = true) {
return writeTableBinary(this, stream);
}
public rowsToString(separator = ' | '): TableToStringIterator {
return new TableToStringIterator(tableRowsToString(this, separator));
public rowsToString(separator = ' | ') {
return new PipeIterator(tableRowsToString(this, separator), 'utf8');
}
}

Expand Down Expand Up @@ -295,30 +296,6 @@ export class CountByResult extends Table implements DataFrame {
}
}

export class TableToStringIterator implements IterableIterator<string> {
constructor(private iterator: IterableIterator<string>) {}
[Symbol.iterator]() { return this.iterator; }
next(value?: any) { return this.iterator.next(value); }
throw(error?: any) { return this.iterator.throw && this.iterator.throw(error) || { done: true, value: '' }; }
return(value?: any) { return this.iterator.return && this.iterator.return(value) || { done: true, value: '' }; }
pipe(stream: NodeJS.WritableStream) {
let res: IteratorResult<string>;
let write = () => {
if (stream['writable']) {
do {
if ((res = this.next()).done) { break; }
} while (stream['write'](res.value + '\n', 'utf8'));
}
if (!res || !res.done) {
stream['once']('drain', write);
} else if (!(stream as any)['isTTY']) {
stream['end']('\n');
}
};
write();
}
}

function* tableRowsToString(table: Table, separator = ' | ') {
const fields = table.schema.fields;
const header = ['row_id', ...fields.map((f) => `${f}`)].map(stringify);
Expand Down
84 changes: 84 additions & 0 deletions js/src/util/node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@

export class PipeIterator<T> implements IterableIterator<T> {
constructor(protected iterator: IterableIterator<T>, protected encoding?: any) {}
[Symbol.iterator]() { return this.iterator; }
next(value?: any) { return this.iterator.next(value); }
throw(error?: any) {
if (typeof this.iterator.throw === 'function') {
return this.iterator.throw(error);
}
return { done: true, value: null as any };
}
return(value?: any) {
if (typeof this.iterator.return === 'function') {
return this.iterator.return(value);
}
return { done: true, value: null as any };
}
pipe(stream: NodeJS.WritableStream) {
let { encoding } = this;
let res: IteratorResult<T>;
let write = (err?: any) => {
stream['removeListener']('error', write);
stream['removeListener']('drain', write);
if (err) return this.throw(err);
if (stream['writable']) {
do {
if ((res = this.next()).done) break;
} while (emit(stream, encoding, res.value));
}
return wait(stream, encoding, res && res.done, write);
};
write();
return stream;
}
}

export class AsyncPipeIterator<T> implements AsyncIterableIterator<T> {
constructor(protected iterator: AsyncIterableIterator<T>, protected encoding?: any) {}
[Symbol.asyncIterator]() { return this.iterator; }
next(value?: any) { return this.iterator.next(value); }
async throw(error?: any) {
if (typeof this.iterator.throw === 'function') {
return this.iterator.throw(error);
}
return { done: true, value: null as any };
}
async return(value?: any) {
if (typeof this.iterator.return === 'function') {
return this.iterator.return(value);
}
return { done: true, value: null as any };
}
pipe(stream: NodeJS.WritableStream) {
let { encoding } = this;
let res: IteratorResult<T>;
let write = async (err?: any) => {
stream['removeListener']('error', write);
stream['removeListener']('drain', write);
if (err) return this.throw(err);
if (stream['writable']) {
do {
if ((res = await this.next()).done) break;
} while (emit(stream, encoding, res.value));
}
return wait(stream, encoding, res && res.done, write);
};
write();
return stream;
}
}

function emit(stream: NodeJS.WritableStream, encoding: string, value: any) {
return stream['write']((encoding === 'utf8' ? value + '\n' : value) as any, encoding);
}

function wait(stream: NodeJS.WritableStream, encoding: string, done: boolean, write: (x?: any) => void) {
const p = eval('process'); // defeat closure compiler
if (!done) {
stream['once']('error', write);
stream['once']('drain', write);
} else if (!(!p || stream === p.stdout) && !(stream as any)['isTTY']) {
stream['end'](<any> (encoding === 'utf8' ? '\n' : new Uint8Array(0)));
}
}

0 comments on commit e75da13

Please sign in to comment.