-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-2116: [JS] implement IPC writers #2035
Conversation
…c` folder direct
…e consistent with the other nested Vectors, make it easier to do the writer
…ream, add support for reading File format
… method to RecordBatch
… compilation target
? process.stdin : fs.createReadStream(path.resolve(process.argv[2])); | ||
const out = process.argv.length < 4 | ||
? process.stdout : fs.createWriteStream(path.resolve(process.argv[3])); | ||
new PipeIterator(serializeStream(await Table.fromAsync(fromReadableStream(in_))), encoding).pipe(out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to wrap this logic into a method on Table
? Something like
public serializeTo (out) {
new PipeIterator(serializeStream(this, 'binary')).pipe(out)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TheNeuralBit yeah, that's a bit what the comment above is about.
Ideally we don't have to aggregate the input into a table, instead just stream the fromReadableStream()
values through a hypothetical writeStreamAsync
method:
writeStreamAsync: (input: AsyncIterable<Schema | RecordBatch>) => AsyncIterable<Uint8Array>;
That said, the table should also have a serialize()
method that returns a PipeIterator
instead of a concatenated buffer, like we do with rowsToString()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, the table should also have a serialize() method that returns a PipeIterator instead of a concatenated buffer, like we do with rowsToString()
Yeah that's all I was getting at
testTableToBuffersIntegration('json', 'file')(json, arrowBuffer); | ||
testTableToBuffersIntegration('binary', 'file')(json, arrowBuffer); | ||
testTableToBuffersIntegration('json', 'stream')(json, arrowBuffer); | ||
testTableToBuffersIntegration('binary', 'stream')(json, arrowBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are all of these different validations run when we run via integration_test.py
as well?
I think all of these are valuable to make sure we get coverage of the IPC reader and writer in the automated tests, but they're overkill for the actual integration tests. How would you feel about adding a flag to turn these on and off? Doesn't need to happen in this PR, we could just make a ticket to follow-up on this if you agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, having them here means they're run by the integration_test.py
as well. I did it for expediency testing locally, but I'll split them out into separate tests here in a few minutes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 LGTM!
a couple of API tweaks would be nice but we can make follow-up JIRAs for them
I'll try to have a final look through this today so we can get it merged soon! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left various comments, but besides my TypeScript-ignorance this looks good. In lieu of more unit tests (vs. relying on the integration tests to catch issues), is JS coverage hooked up yet? It would be helpful somehow to get a read on what code paths aren't being executed anywhere. Thanks for working on this!
@@ -101,193 +99,3 @@ Once generated, the flatbuffers format code needs to be adjusted for our TS and | |||
``` | |||
1. Add `/* tslint:disable:class-name */` to the top of `Schema.ts` | |||
1. Execute `npm run lint` to fix all the linting errors | |||
|
|||
## JavaScript (for Google Closure Compiler builds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this fixed now in upstream Flatbuffers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wesm I was able to update the version of google-closure-compiler
we use in commit 2df1a4a, which allowed me to remove a number of hacks we had in the build to create the ES5/UMD bundle. One of those hacks was that we had to generate two versions of the FlatBuffers code, once in legacy ES5 JS that closure could use, and the TypeScript used everything else. With the new update, closure-compiler doesn't choke on the compiled TS output anymore.
public bodyLength: number; | ||
constructor(public metaDataLength: number, bodyLength: Long | number, offset: Long | number) { | ||
this.offset = typeof offset === 'number' ? offset : offset.low; | ||
this.bodyLength = typeof bodyLength === 'number' ? bodyLength : bodyLength.low; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are files over 2GB disallowed someplace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, 2GB is the current JS limit for ArrayBuffers:
> new Uint8Array(2 * 1024 * 1024 * 1024)
RangeError: Invalid typed array length: 2147483648
@@ -126,6 +126,6 @@ export abstract class TypeDataLoader extends TypeVisitor { | |||
protected visitUnionType(type: DenseUnion | SparseUnion, { length, nullCount }: FieldMetadata = this.getFieldMetadata()) { | |||
return type.mode === UnionMode.Sparse ? | |||
new SparseUnionData(type as SparseUnion, length, this.readNullBitmap(type, nullCount), this.readTypeIds(type), this.visitFields(type.children), 0, nullCount) : | |||
new DenseUnionData(type as DenseUnion, length, this.readNullBitmap(type, nullCount), this.readOffsets(type), this.readTypeIds(type), this.visitFields(type.children), 0, nullCount); | |||
new DenseUnionData(type as DenseUnion, length, this.readNullBitmap(type, nullCount), this.readTypeIds(type), this.readOffsets(type), this.visitFields(type.children), 0, nullCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside: now that we have 2 implementations of unions as described in spec (i.e. I think what is in JS is the same as C++ now), we should move along the discussion on the ML about the slippage vs. Java. Otherwise it's going to be a huge pain to get everything reconciled. cc @jacques-n @icexelloss
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am +1 on this for sure.
let buffers = [], byteLength = 0; | ||
|
||
for (const message of messages) { | ||
buffers.push(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just so I understand, messages
is an async iterable, so that's why it must be accumulated here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, in this case messages
is a regular synchronous Iterable, but I'm concatenating all the messages into a single Buffer. Adding an alternative method that doesn't concat into a single buffer is one of the follow-on PR's I'd like to add soon. Doing it this way here purely for convenience, since it's called by the writeTableBinary
method.
public visit<T extends DataType>(vector: Vector<T>) { | ||
if (!DataType.isDictionary(vector.type)) { | ||
const { data, length, nullCount } = vector; | ||
if (length > 2147483647) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is length
a Long
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
length
is a 32-bit integer here. Since JS has the platform limitation that ArrayBuffers can't be larger than 2^32-1 bytes, the reader discards the high bits from the BufferMetadata
when it creates the ArrayData instances. @TheNeuralBit and I discussed this at one point, and reasoned JS would throw and/or overflow before we started reading, so it wasn't worth the headache of using a BigInt library. We can revisit this when more engines start shipping with BigInt support
throw new RangeError('Cannot write arrays larger than 2^31 - 1 in length'); | ||
} | ||
this.fieldNodes.push(new FieldMetadata(length, nullCount)); | ||
this.addBuffer(nullCount <= 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do negative null counts occur somehow in JS? We use -1
in C++ to indicate that the null count has not been computed (i.e. after an array slice has occurred)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We technically do the same thing as C++ in the base ArrayData when someone slices a Vector, and re-compute the nullCount
the first time the getter is called after the slice. So here it'll be the real nullCount
after any slices.
The kicker is the public nullCount
is almost definitely accessed immediately after the slice in the base Vector constructor, so the optimization is ultimately moot. We've left it in here so we have a starting point to optimize in the future.
// We can treat this like all other Nested types | ||
return this.visitNestedVector(vector); | ||
} else { | ||
// A sliced Dense Union is an unpleasant case. Because the offsets are different for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed... I recall that writing the unit tests to exercise this code path was a pain. Is this being tested now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heh, I lifted most of the Union serialization from the C++ implementation (comments and all). I haven't added tests for them yet, but meant to generate some test data from #987. I'll try that here shortly.
const buffer = new ArrayBuffer(2); | ||
new DataView(buffer).setInt16(0, 256, true /* littleEndian */); | ||
// Int16Array uses the platform's endianness. | ||
return new Int16Array(buffer)[0] === 256; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting tidbit, is there a reference on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I forgot I dropped that in there -- should probably be put into one of our util functions. This little helper is from the MDN example for the JS DataView class.
import { DataType, IterableArrayLike } from '../type'; | ||
import { getBool, setBool, iterateBits } from '../util/bit'; | ||
|
||
export class ValidityView<T extends DataType> implements View<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not completely related, but just for my own knowledge, what does this library do if the null count is 0 and the validity buffer is length-0? In Java, it allocates a buffer of all set bits. In C++ it doesn't allocate anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Reader will allocate a 0-length ArrayBuffer if the nullCount
is zero, ditto if an ArrayData instance if constructed manually with no nullBitmap.
I will leave it to you all to merge this but thanks for waiting for me to review! |
Thanks everyone |
since the [JS] implementation of IPC writers were merged in 2018, more than 2 years, but why the JS doc at https://arrow.apache.org/docs/js/classes/recordbatchwriter.html is still left mostly empty? When can we expect some JS examples of using RecordBatchWriter/RecordBatchFileWriter/RecordBatchStreamWriter to convert other data format (csv, json, avro, parq, ...) to arrow files? and since other implementations have 1.0.0 and 1.0.1 already, why the JS packages on NPM is still at 0.17.0; https://www.npmjs.com/package/apache-arrow So far I can only see the arrow2csv under apache-arrow-0.17.0 from NPM ➸ ls ./node_modules/apache-arrow/bin/
arrow2csv.d.ts arrow2csv.js arrow2csv.js.map arrow2csv.mjs arrow2csv.mjs.map https://github.com/apache/arrow/blob/master/js/bin/json-to-arrow.js#L86-L95 I have tried to run the master copy, but not sure what is a JSON Arrow file? can this script be used to convert any JSON file (to binary Arrow file)? {
header: 'json-to-arrow',
content: 'Script for converting a JSON Arrow file to a binary Arrow file'
},
{
header: 'Synopsis',
content: [
'$ json-to-arrow.js -j in.json -a out.arrow -f stream'
]
}, |
@t829702 no, the Arrow JSON IPC representation is only used to validate integration tests between the different Arrow implementations. It is not an optimized or ergonomic way to interact with Arrow. This There are a few strategies to convert arbitrary JavaScript types into Arrow tables, and the strategy you pick depends on your needs. They all use the Builder classes under the hood, and generally follow this pattern:
See this comment on the Builder constructor for a basic example, or the I also have this higher-level example that uses the |
Consider the other implementations have CSV Reader and JSON Reader, to read from arbitrary JSON and can infer schema,
Why not keep that
So, what is the better way to generate Arrow files? better way to convert many existing datasets in csv/json or other formats? |
Thanks @trxcllnt for all that information, now I am able to write a small script of pipelining the building blocks:: looks like this, and still have a few questions: const userNameDict = new arrow.Dictionary(new arrow.Utf8(), new arrow.Int32());
// const senderNameDict = new arrow.Dictionary(new arrow.Utf8(), new arrow.Int32());
const lineObjStruct = new arrow.Struct([
arrow.Field.new({ name: 'date', type: new arrow.DateMillisecond() }),
arrow.Field.new({ name: 'articleId', type: new arrow.Uint32() }),
arrow.Field.new({ name: 'title', type: new arrow.Utf8() }),
// Q1. here I have other columns are also userName, can different columns share a single Dictionary? I tried to create above one userNameDict and use it everywhere, but this ends up a messy arrow file with wrong names
arrow.Field.new({ name: 'userName', type: userNameDict }),
arrow.Field.new({ name: 'userId', type: new arrow.Uint32() }),
arrow.Field.new({ name: 'wordCount', type: new arrow.Uint32() }),
arrow.Field.new({ name: 'url', type: new arrow.Utf8() }),
arrow.Field.new({
name: 'tags',
type: new arrow.List({ type: new arrow.Dictionary(new arrow.Utf8(), new arrow.Int32()), }),
}),
// ...
async function main() {
await pipeline(
fs.createReadStream(inputFile, 'utf-8'),
async function* (source) {
// ... split each chunk to lines, and yield* lines
},
async function* (source) {
for await (const line of source) {
const obj = JSON.parse(line);
// Q2: the obj has a 'date' field and is a JavaScript Date's JSON format like "2020-09-18T21:42:30.324Z", but the arrow.DateMillisecond does not recognize that, got all 0's if I do not pre-parse it, the parseDate here is just (obj) => { obj.date = Date.parse(obj.date); } wonder is there a better arrow data type for JavaScript Date's JSON string format?
parseDate(obj); obj.appreciationsReceived.forEach(parseDate);
yield obj;
++count;
}
},
arrow.Builder.throughAsyncIterable({
type: lineObjStruct,
queueingStrategy: 'bytes', highWaterMark: 1<<20,
}),
async function* (source) {
for await (const chunk of source) {
// Q3: is there a better way to create RecordBatch than the static method arrow.RecordBatch.new ? I suppose some way in the source like arrow.RecordBatch.from(chunk) or new operator to call its constructor directly, got no success
const records = arrow.RecordBatch.new(chunk.data.childData, chunk.type.children);
yield records;
++batches;
allchunkbytes += records.byteLength;
console.log(new Date, `yield RecordBatch ${batches} with ${chunk.length} rows ${records.byteLength} bytes (accu ${allchunkbytes})`);
}
},
arrow.RecordBatchStreamWriter.throughNode(),
fs.createWriteStream(outputFile),
);
console.log(end, `written ${count} objs in ${batches} batches, ${allchunkbytes} bytes in ${+end-started}ms`); it runs in 2.3s convert a 50MB line-json file to a 21MB arrow file, not too bad for each single file, most of my dataset source files are less than 100MB, but there are huge number of them, what's the better (optimized and ergonomic way) to convert them? 2020-09-18T21:49:20.995Z yield RecordBatch 1 with 3872 rows 1522432 bytes (accu 1522432)
2020-09-18T21:49:21.165Z yield RecordBatch 2 with 3782 rows 1449216 bytes (accu 2971648)
2020-09-18T21:49:21.290Z yield RecordBatch 3 with 3765 rows 1413312 bytes (accu 4384960)
2020-09-18T21:49:21.411Z yield RecordBatch 4 with 3784 rows 1453952 bytes (accu 5838912)
2020-09-18T21:49:21.559Z yield RecordBatch 5 with 3801 rows 1505856 bytes (accu 7344768)
2020-09-18T21:49:21.728Z yield RecordBatch 6 with 3807 rows 1484672 bytes (accu 8829440)
2020-09-18T21:49:21.877Z yield RecordBatch 7 with 3806 rows 1513280 bytes (accu 10342720)
2020-09-18T21:49:22.013Z yield RecordBatch 8 with 3814 rows 1472640 bytes (accu 11815360)
2020-09-18T21:49:22.151Z yield RecordBatch 9 with 3848 rows 1467712 bytes (accu 13283072)
2020-09-18T21:49:22.270Z yield RecordBatch 10 with 3827 rows 1442624 bytes (accu 14725696)
2020-09-18T21:49:22.415Z yield RecordBatch 11 with 3984 rows 1554560 bytes (accu 16280256)
2020-09-18T21:49:22.560Z yield RecordBatch 12 with 4069 rows 1523200 bytes (accu 17803456)
2020-09-18T21:49:22.697Z yield RecordBatch 13 with 3986 rows 1398272 bytes (accu 19201728)
2020-09-18T21:49:22.821Z yield RecordBatch 14 with 3878 rows 1348096 bytes (accu 20549824)
2020-09-18T21:49:22.938Z yield RecordBatch 15 with 3118 rows 1091200 bytes (accu 21641024)
2020-09-18T21:49:22.975Z written 57141 objs in 15 batches, 21641024 bytes in 2186ms
real 0m2.334s
user 0m2.853s
sys 0m0.225s
if JS is not the way to interact with Arrow, then what is the purpose of JS implementation? Is the JS implementation supposed to be read-only uses? What are other good use cases for JS implementation? |
My comment was about the Arrow JSON IPC representation. The Arrow JS library is a valid way to interact with Arrow data and stream. We have a special JSON IPC reprepsentation to run integration tests between the different Arrow language implementations. The
This isn't prohibited in the Arrow IPC format, but this is not supported by the current
No, that's the correct API for the Date data types. Providing a separate utility in Arrow to parse dates would simply be duplicating the (relatively well optimized) Date implementations provided by the JS VM. If you want to parse the string directly into a numeric timestamp without using Date, you could use one of the Arrow
No, this is the recommended way to construct a RecordBatch zero-copy.
You may have success using a newline-delimited JSON parser like ndjson rather than JSON.parse. You can also pass a custom The easiest approach to speeding this up is to run your script in parallel on as many cores as you can afford, or use a different language implementation. If you have a GPU available, you can use cuDF to read newline-delimited JSON as a cuDF DataFrame, then serialize the DataFrame to disk as an Arrow table. |
I meant could RecordBatch provide another As the I don't see the doc for
I didn't mean to duplicate JS parsing code, but a way to for user can provide a special parser function to the constructor, something like Will this be an advanced usage if each DataType can have an optional parser? can be really flexible for anyone can leverage 3rd party code time parser, either
I have tried the Python implementation's json package, it has somewhat Automatic Type Inference, but it recognizes only the Rust implementation's json package looks like having a nicer when I said a I have read your csv-to-arrow; thanks again for all questions answered, but one more is why not ship it inside the apache-arrow NPM package, just because all other implementations have packaged one or more helper utils, that's really helpful when working with binary arrow files on the command line, and for even Shell scripts can do a lot of paralleled work in cron jobs...
Would be nice if the
|
https://issues.apache.org/jira/browse/ARROW-2116
https://issues.apache.org/jira/browse/ARROW-2115
This PR represents a first pass at implementing the IPC writers for binary stream and file formats in JS.
I've also added scripts to do the
json-to-arrow
,file-to-stream
, andstream-to-file
steps of the integration tests. These scripts rely on a new feature in Node 10 (the next LTS version), so please update. My attempts to use a library to remain backwards-compatible with Node 9 were unsuccessful.I've only done the APIs to serialize a preexisting Table to stream or file formats so far. We will want to refactor this soon to support end-to-end streaming.
Edit: Figured out why the integration tests weren't passing, fixed now 🥇