Skip to content

Commit

Permalink
arrow2csv: support reading arrow streams from stdin
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt committed May 13, 2018
1 parent e75da13 commit 78cba38
Showing 1 changed file with 53 additions and 32 deletions.
85 changes: 53 additions & 32 deletions js/src/bin/arrow2csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,63 @@
/* tslint:disable */

import * as fs from 'fs';
import { promisify } from 'util';
import * as Arrow from '../Arrow';

const readFile = promisify(fs.readFile);
const { parse } = require('json-bignum');
const optionList = [
{
type: String,
name: 'schema', alias: 's',
optional: true, multiple: true,
typeLabel: '[underline]{columns}',
description: 'A space-delimited list of column names'
},
{
type: String,
name: 'file', alias: 'f',
optional: false, multiple: true,
description: 'The Arrow file to read'
const argv = require(`command-line-args`)(cliOpts(), { partial: true });
const files = [...(argv.file || []), ...(argv._unknown || [])].filter(Boolean);

(async () => {
if (!files.length) {
let buffer = Buffer.from([]);
for await (let chunk of require('stream-to-iterator')(process.stdin as any)) {
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, 'binary');
buffer = Buffer.concat([buffer, chunk], buffer.length + chunk.length);
}
if (buffer.length === 0) {
return print_usage();
}
files.push(buffer);
}
for (let input of files) {
printTable(typeof input === 'string' ? await readFile(input) : input);
}
];
})().catch((e) => { console.error(e); process.exit(1); });

const argv = require(`command-line-args`)(optionList, { partial: true });
const files = [...argv.file, ...(argv._unknown || [])].filter(Boolean);
function printTable(input: any) {
let table: Arrow.Table;
try {
table = Arrow.Table.from(input);
} catch (e) {
table = Arrow.Table.from(parse(input + ''));
}
if (argv.schema && argv.schema.length) {
table = table.select(...argv.schema);
}
table.rowsToString().pipe(process.stdout);
}

if (!files.length) {
function cliOpts() {
return [
{
type: String,
name: 'schema', alias: 's',
optional: true, multiple: true,
typeLabel: '[underline]{columns}',
description: 'A space-delimited list of column names'
},
{
type: String,
name: 'file', alias: 'f',
optional: false, multiple: true,
description: 'The Arrow file to read'
}
];
}

function print_usage() {
console.log(require('command-line-usage')([
{
header: 'arrow2csv',
Expand All @@ -60,7 +94,7 @@ if (!files.length) {
{
header: 'Options',
optionList: [
...optionList,
...cliOpts(),
{
name: 'help',
description: 'Print this usage guide.'
Expand All @@ -81,17 +115,4 @@ if (!files.length) {
}
]));
process.exit(1);
}

files.forEach((source) => {
let table: Arrow.Table, input = fs.readFileSync(source);
try {
table = Arrow.Table.from(input);
} catch (e) {
table = Arrow.Table.from(parse(input + ''));
}
if (argv.schema && argv.schema.length) {
table = table.select(...argv.schema);
}
table.rowsToString().pipe(process.stdout);
});
}

0 comments on commit 78cba38

Please sign in to comment.