Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JS] Dictionary encoded values repeating between record batches #41683

Open
vivek1729 opened this issue May 16, 2024 · 7 comments · May be fixed by #41965
Open

[JS] Dictionary encoded values repeating between record batches #41683

vivek1729 opened this issue May 16, 2024 · 7 comments · May be fixed by #41965

Comments

@vivek1729
Copy link

Describe the bug, including details regarding any error messages, version, and platform.

We are trying to use the AsyncRecordBatchStreamReader to read several record batches from an http response stream. The record batches are dictionary encoded and we started noticing that the values of these records start repeating after reading the first record batch.

For a minimal repro, we can see that the RecordBatchStreamReader starts repeating values for record batches with regular dictionary encoding as well. I've added a sample txt file that contains the arrow serialized results for 2 record batches. Both the record batches are dictionary encoded (not delta), the first record batch contains 200 records and the second one just contains one. ArrowDebugging-WrongRecordBatch.txt

I'm simply trying to retrieve the value for the first column in these 2 batches which are expected to be different.
Here's a snippet to repro this behavior :

function readFileAsStream(fileName: string) {
    // read the contents of the text file as a string
    const base64String = readFileSync(fileName, 'utf-8');
    // Decode the base64 string
    const binaryString = atob(base64String);

    // Convert the binary string to a Uint8Array
    const bytes = new Uint8Array(binaryString.length);
    for (let i = 0; i < binaryString.length; i++) {
        bytes[i] = binaryString.charCodeAt(i);
    }

    const reader = arrow.RecordBatchStreamReader.from(bytes);

    // Read the record batches
    let batch;
    while (batch = reader.next()) {
        if (!batch || batch?.done) {
            break;
        }
        if (batch.value) {
            // Get the value of the first column
            console.log(batch.value.data.children[0].dictionary?.get(0));
        }
    }
}

Observed result:

'2013042345'
'2013042345'

Expected result (notice the second value is different from the first one):

'2013042345'
'2012020145'

Since the record batches are not delta dictionary encoded, I'd expect that the dictionary associated with the first record batch should get replaced with a separate dictionary when reading the second batch. I was looking at related issues and I wonder if this might be related to #23572 .

Additionally, I'd like to understand what's the recommended way to read multiple dictionary encoded record batches from an http stream. I imagine that we can use the reader.next() iterator pattern to keep reading record batches in a stream but I'd like to confirm my understanding.

Component(s)

JavaScript

@mirdaki
Copy link

mirdaki commented May 16, 2024

To add to this, we think the issue stems from the reader's internal dictionaries state not being reset after finishing a record batch.

@felipecrv felipecrv changed the title Dictionary encoded values repeating between record batches [JS] Dictionary encoded values repeating between record batches May 17, 2024
@pohuan
Copy link

pohuan commented Jun 4, 2024

@trxcllnt @domoritz I looked through the js commit and you two seems to be the main contributor. Could you help take a look?

@trxcllnt
Copy link
Contributor

trxcllnt commented Jun 4, 2024

IIRC Dictionaries were initially spec'd as immutable. I do recall implementing delta dictionaries, but it seems like around that time the spec was updated such that later DictionaryMessages should replace existing DictionaryMessages, and I just didn't catch it.

Specifically this section:

Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID.

I think this should be a straightforward fix, I'll try to file a PR soon.

@pohuan
Copy link

pohuan commented Jun 4, 2024

pohuan@1187cd8

Thank you Paul. This is the change that Keshuang suggested earlier within company.
If it's ok, I could file a PR as well

@trxcllnt
Copy link
Contributor

trxcllnt commented Jun 4, 2024

@vivek1729 As for asynchronously reading batches from a stream, you have the right idea. I recommend either using AsyncIterables (the for await(const batch of reader) { ... } syntax) or the node/WhatWG streaming APIs.

I have an old codepen example of different strategies here. Lines 29-39 are most relevant to your situation. I put this together to show a friend how to wrap the Arrow IPC format in a custom binary protocol, which is why it includes some intermediate transform steps in between the Arrow IPC writer and reader.

Here's a shorter example:

import * as Arrow from 'apache-arrow';
for await (const batch of await Arrow.RecordBatchReader.from(await fetch('/get-a-table'))) {
  console.log(batch.toArray());
}

One neat thing the JS implementation supports is reading/writing multiple tables on the same stream. This can be handy if you need to send data under latency-sensitive conditions, where opening/closing the underlying transport incurs unnecessary overhead (think HTTP response streams):

import * as Arrow from 'apache-arrow';
let tableCount = 0;
for await (const reader of Arrow.RecordBatchReader.readAll(await fetch('/get-many-tables'))) {
  switch(tableCount++) {
    case 0: doThingWithTheFirstTable(reader); break;
    case 1: doThingWithTheSecondTable(reader); break;
    // ...
  }
}

function doThingWithTheFirstTable(
  reader: Arrow.AsyncRecordBatchStreamReader<{ strs: Arrow.Utf8 }>
) { /*...*/ }

function doThingWithTheSecondTable(
  reader: Arrow.AsyncRecordBatchStreamReader<{ ints: Arrow.Int32 }>
) { /*...*/ }

And this is what that might look like on the server:

import * as https from 'node:https';
import * as Arrow from 'apache-arrow';

https.createServer(options, async (req, res) => {
  res.writeHead(200);
  // set autoDestroy: false so the underlying response isn't closed after sending the first table
  // initially call `.reset(res)` so the writer has a sink and immediately flushes each batch
  const writer = new Arrow.RecordBatchStreamWriter({ autoDestroy: false }).reset(res);
  // alternatively, you can write each table as individual batches if producing asynchronously

  // sleep then write the first table
  await new Promise((r) => setTimeout(r, 1000));
  writer.write(new Arrow.Table({
    strs: Arrow.vectorFromArray(['a', 'b', 'c'], new Arrow.Utf8)
  });

  // sleep then write the second table
  await new Promise((r) => setTimeout(r, 1000));
  writer.write(new Arrow.Table({
    ints: Arrow.vectorFromArray([0, 1, 2, 3, 4, 5], new Arrow.Int32)
  });

  // must explicitly close when done (this will call `res.end()` on the response)
  writer.close();
}).listen(8000); 

@vivek1729
Copy link
Author

vivek1729 commented Jun 7, 2024

Thanks a lot for taking a look at this issue @trxcllnt . Not sure if we can use the readAll abstraction as we don't expect arrow tables from our http response. Instead we expect many sequence of record batches each sharing a common schema. Specifically, our data could look something like this:

<recordBatch1_1Schema1><recordBatch1_2Schema1>|<recordBatch2_1Schema2>|...

Yes, we are using the async iterables as you suggested in our code. Here's what the high level code looks like for our case. Notice how we create a single instance of the reader and continue reading even after we get an empty record batch because that could signify the end of a result set for us and there could be more record batches in the http response stream.

import { AsyncRecordBatchStreamReader, Table } from 'apache-arrow';

const resultTables = [];
const arrowReader = await AsyncRecordBatchStreamReader.from(responseStream);
await arrowReader.open({ autoDestroy: false });
while (true) {
    let batches = [];/*RecordBatch<any>[]*/
    let batch;/*IteratorResult<RecordBatch<any>, any> */
    while (batch = await arrowReader.next()) {
        // End of result set
        if (!batch || batch?.done)
        {
            break;
        }
        if (batch.value) {
            batches.push(batch.value);
        }
    }
    // End of stream
    if (batches.length === 0) {
        break;
    }
    resultTables.push(new Table(batches));
}

Specifically, I noticed that we had to do await arrowReader.open({ autoDestroy: false }); otherwise the reader would auto close after reading the first record batch.

Does our approach sound sensible?

@trxcllnt
Copy link
Contributor

trxcllnt commented Jun 7, 2024

@vivek1729 It looks like you've re-implemented readAll 🙂.

The autoDestroy: false option exists to prevent the RecordBatchReader from closing the underlying transport after reading the EOS bytes from the IPC stream.

Notice how we create a single instance of the reader

The readAll() implementation does this as well, though that's more for convenience than anything else. Yielding a new RecordBatchReader instance for each IPC stream would not impact performance in any way (as long as the underlying ReadableStream was not closed after reading each Arrow IPC stream).

Here's a more concise version of your example, using readAll():

import { RecordBatchReader, Table } from 'apache-arrow';

const resultTables = [];

for await (const reader of RecordBatchReader.readAll(responseStream)) {
  resultTables.push(new Table(await reader.readAll()));
}

You must exhaust the reader in this loop, e.g. by using readAll() to collect all the RecordBatches into an Array, or by using an inner for await (const recordBatch of reader) { ... } loop to read each batch as it arrives.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants