Skip to content

Commit

Permalink
make arrow2csv support streaming files from stdin, add rowsToString()…
Browse files Browse the repository at this point in the history
… method to RecordBatch
  • Loading branch information
trxcllnt committed May 15, 2018
1 parent 7924e67 commit df43bc5
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 47 deletions.
35 changes: 17 additions & 18 deletions js/src/bin/arrow2csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,38 @@

import * as fs from 'fs';
import { promisify } from 'util';
import * as Arrow from '../Arrow';
import { Table, readStream } from '../Arrow';

const readFile = promisify(fs.readFile);
const { parse } = require('json-bignum');
const argv = require(`command-line-args`)(cliOpts(), { partial: true });
const files = [...(argv.file || []), ...(argv._unknown || [])].filter(Boolean);

(async () => {
if (!files.length) {
let buffers = [], totalLength = 0;
for await (let chunk of (process.stdin as any)) {
if (!Buffer.isBuffer(chunk)) {
chunk = Buffer.from(chunk, 'binary');
}
buffers.push(chunk);
totalLength += chunk.byteLength;
let hasRecords = false;
if (files.length > 0) {
hasRecords = true;
for (let input of files) {
printTable(await readFile(input));
}
if (buffers.length === 0) {
return print_usage();
} else {
let rowOffset = 0;
let maxColumnWidths: number[] = [];
for await (const recordBatch of readStream(process.stdin)) {
hasRecords = true;
recordBatch.rowsToString(' | ', rowOffset, maxColumnWidths).pipe(process.stdout);
rowOffset += recordBatch.length;
}
files.push(Buffer.concat(buffers, totalLength));
}
for (let input of files) {
printTable(typeof input === 'string' ? await readFile(input) : input);
}
return hasRecords ? null : print_usage();
})().catch((e) => { console.error(e); process.exit(1); });

function printTable(input: any) {
let table: Arrow.Table;
let table: Table;
try {
table = Arrow.Table.from(input);
table = Table.from(input);
} catch (e) {
table = Arrow.Table.from(parse(input + ''));
table = Table.from(parse(input + ''));
}
if (argv.schema && argv.schema.length) {
table = table.select(...argv.schema);
Expand Down
30 changes: 30 additions & 0 deletions js/src/recordbatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { Schema, Struct, DataType } from './type';
import { flatbuffers } from 'flatbuffers';
import { View, Vector, StructVector } from './vector';
import { Data, NestedData } from './data';
import { PipeIterator } from './util/node';
import { valueToString, leftPad } from './util/pretty';

import Long = flatbuffers.Long;

Expand Down Expand Up @@ -67,4 +69,32 @@ export class RecordBatch extends StructVector {
this.childData.filter((_, i) => namesToKeep[fields[i].name])
);
}
public rowsToString(separator = ' | ', rowOffset = 0, maxColumnWidths: number[] = []) {
return new PipeIterator(recordBatchRowsToString(this, separator, rowOffset, maxColumnWidths), 'utf8');
}
}

function* recordBatchRowsToString(recordBatch: RecordBatch, separator = ' | ', rowOffset = 0, maxColumnWidths: number[] = []) {
const fields = recordBatch.schema.fields;
const header = ['row_id', ...fields.map((f) => `${f}`)].map(valueToString);
header.forEach((x, i) => {
maxColumnWidths[i] = Math.max(maxColumnWidths[i] || 0, x.length);
});
// Pass one to convert to strings and count max column widths
for (let i = -1, n = recordBatch.length - 1; ++i < n;) {
let val, row = [rowOffset + i, ...recordBatch.get(i) as Struct['TValue']];
for (let j = -1, k = row.length; ++j < k; ) {
val = valueToString(row[j]);
maxColumnWidths[j] = Math.max(maxColumnWidths[j] || 0, val.length);
}
}
for (let i = -1; ++i < recordBatch.length;) {
if ((rowOffset + i) % 1000 === 0) {
yield header.map((x, j) => leftPad(x, ' ', maxColumnWidths[j])).join(separator);
}
yield [rowOffset + i, ...recordBatch.get(i) as Struct['TValue']]
.map((x) => valueToString(x))
.map((x, j) => leftPad(x, ' ', maxColumnWidths[j]))
.join(separator);
}
}
35 changes: 11 additions & 24 deletions js/src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,30 +297,17 @@ export class CountByResult extends Table implements DataFrame {
}

function* tableRowsToString(table: Table, separator = ' | ') {
const fields = table.schema.fields;
const header = ['row_id', ...fields.map((f) => `${f}`)].map(stringify);
const maxColumnWidths = header.map(x => x.length);
// Pass one to convert to strings and count max column widths
for (let i = -1, n = table.length - 1; ++i < n;) {
let val, row = [i, ...table.get(i)];
for (let j = -1, k = row.length; ++j < k; ) {
val = stringify(row[j]);
maxColumnWidths[j] = Math.max(maxColumnWidths[j], val.length);
}
let rowOffset = 0;
let maxColumnWidths: number[] = [];
let iterators: IterableIterator<string>[] = [];
// Gather all the `rowsToString` iterators into a list before iterating,
// so that `maxColumnWidths` is filled with the maxWidth for each column
// across all RecordBatches.
for (const batch of table.batches) {
iterators.push(batch.rowsToString(separator, rowOffset, maxColumnWidths));
rowOffset += batch.length;
}
yield header.map((x, j) => leftPad(x, ' ', maxColumnWidths[j])).join(separator);
for (let i = -1; ++i < table.length;) {
yield [i, ...table.get(i)]
.map((x) => stringify(x))
.map((x, j) => leftPad(x, ' ', maxColumnWidths[j]))
.join(separator);
for (const iterator of iterators) {
yield* iterator;
}
}

function leftPad(str: string, fill: string, n: number) {
return (new Array(n + 1).join(fill) + str).slice(-1 * n);
}

function stringify(x: any) {
return typeof x === 'string' ? `"${x}"` : ArrayBuffer.isView(x) ? `[${x}]` : JSON.stringify(x);
}
8 changes: 8 additions & 0 deletions js/src/util/pretty.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

export function leftPad(str: string, fill: string, n: number) {
return (new Array(n + 1).join(fill) + str).slice(-1 * n);
}

export function valueToString(x: any) {
return typeof x === 'string' ? `"${x}"` : ArrayBuffer.isView(x) ? `[${x}]` : JSON.stringify(x);
}
7 changes: 2 additions & 5 deletions js/src/vector/nested.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import { Data } from '../data';
import { View, Vector } from '../vector';
import { IterableArrayLike } from '../type';
import { valueToString } from '../util/pretty';
import { DataType, NestedType, DenseUnion, SparseUnion, Struct, Map_ } from '../type';

export abstract class NestedView<T extends NestedType> implements View<T> {
Expand Down Expand Up @@ -45,7 +46,7 @@ export abstract class NestedView<T extends NestedType> implements View<T> {
}
public toJSON(): any { return this.toArray(); }
public toString() {
return [...this].map((x) => stringify(x)).join(', ');
return [...this].map((x) => valueToString(x)).join(', ');
}
public get(index: number): T['TValue'] {
return this.getNested(this, index);
Expand Down Expand Up @@ -214,7 +215,3 @@ export class MapRowView extends RowView {
return child ? child.set(self.rowIndex, value) : null;
}
}

function stringify(x: any) {
return typeof x === 'string' ? `"${x}"` : Array.isArray(x) ? JSON.stringify(x) : ArrayBuffer.isView(x) ? `[${x}]` : `${x}`;
}

0 comments on commit df43bc5

Please sign in to comment.