Skip to content

Commit

Permalink
Add new fields to metadata headers to support partial batches correctly
Browse files Browse the repository at this point in the history
Integrate all updateOrdinal & flush logic inside pushRow, when root row is pushed, flush is performed automatically
Bump rc version
  • Loading branch information
guilledk committed May 21, 2024
1 parent d89d52c commit 8687caf
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 47 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@guilledk/arrowbatch-nodejs",
"version": "1.0.0-rc11",
"version": "1.0.0-rc12",
"description": "Arrow Batch Storage protocol",
"main": "./build/index.js",
"type": "module",
Expand All @@ -12,7 +12,8 @@
"scripts": {
"bootstrap": "yarn",
"build": "yarn run bootstrap && tsc",
"test-all": "c8 mocha build/tests/test*.spec.js"
"test-all": "mocha build/tests/test*.spec.js",
"coverage": "c8 mocha build/tests/test*.spec.js"
},
"repository": {
"type": "git",
Expand Down
30 changes: 16 additions & 14 deletions src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import moment from "moment";

export interface ArrowMetaCacheEntry {
ts: number,
meta: ArrowBatchFileMetadata, startRow: any[]
meta: ArrowBatchFileMetadata
}

export interface ArrowCachedTables {
Expand Down Expand Up @@ -55,12 +55,7 @@ export class ArrowBatchCache {
this.metadataCache.delete(cacheKey);
}

const firstTable = await ArrowBatchProtocol.readArrowBatchTable(
filePath, meta, 0);

const startRow = firstTable.get(0).toArray();

const result = { ts: moment.now(), meta, startRow };
const result = { ts: moment.now(), meta };
this.metadataCache.set(cacheKey, result);
return [result, true];
}
Expand All @@ -86,26 +81,33 @@ export class ArrowBatchCache {
];
}

async getTablesFor(ordinal: bigint) {
async getTablesFor(ordinal: bigint): Promise<[ArrowCachedTables, number]> {
const adjustedOrdinal = this.ctx.getOrdinal(ordinal);

// metadata about the bucket we are going to get tables for, mostly need to
// figure out start ordinal for math to make sense in case non-aligned bucket
// boundary start
const [bucketMetadata, metadataUpdated] = await this.getMetadataFor(adjustedOrdinal, 'root');

// index relative to bucket boundary
const relativeIndex = ordinal - bucketMetadata.startRow[0];
// ensure bucket contains ordinal
const bucketOrdStart = bucketMetadata.meta.batches[0].batch.startOrdinal;
const bucketOrdLast = bucketMetadata.meta.batches[
bucketMetadata.meta.batches.length - 1].batch.lastOrdinal;

// get batch table index, assuming config.dumpSize table size is respected
const batchIndex = Number(relativeIndex / BigInt(this.ctx.config.dumpSize));
if (ordinal < bucketOrdStart || ordinal > bucketOrdLast)
throw new Error(`Bucket does not contain ${ordinal}`);

let batchIndex = 0;
while (ordinal > bucketMetadata.meta.batches[batchIndex].batch.lastOrdinal) {
batchIndex++;
}

const cacheKey = `${adjustedOrdinal}-${batchIndex}`;

if (this.tableCache.has(cacheKey)) {
// we have this tables cached, but only return if metadata wasnt invalidated
if (!metadataUpdated)
return this.tableCache.get(cacheKey);
return [this.tableCache.get(cacheKey), batchIndex];

// delete stale cache
this.tableCache.delete(cacheKey)
Expand Down Expand Up @@ -136,7 +138,7 @@ export class ArrowBatchCache {
this.tableCache.delete(oldest);
}

return tables;
return [tables, batchIndex];
}

get size() : number {
Expand Down
48 changes: 39 additions & 9 deletions src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {tableFromIPC} from "apache-arrow";
import {ZSTDDecompress} from 'simple-zstd';
import RLP from "rlp";

import {bigintToUint8Array} from "./utils.js";
import {bigintToUint8Array, numberToUint8Array} from "./utils.js";

export enum ArrowBatchCompression {
UNCOMPRESSED = 0,
Expand All @@ -13,12 +13,15 @@ export enum ArrowBatchCompression {

export interface ArrowBatchGlobalHeader {
versionConstant: string;
batchIndexStart: number;
}

export interface ArrowBatchHeader {
headerConstant: string;
batchByteSize: bigint;
compression: ArrowBatchCompression;
startOrdinal: bigint;
lastOrdinal: bigint;
}

export interface ArrowBatchFileMetadata {
Expand Down Expand Up @@ -72,27 +75,50 @@ export class ArrowBatchProtocol {
* queries that only affect that small batch.
*/
static readonly ARROW_BATCH_VERSION_CONSTANT = 'ARROW-BATCH1';
static readonly GLOBAL_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_VERSION_CONSTANT.length;
static readonly GLOBAL_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_VERSION_CONSTANT.length + 4;

static readonly ARROW_BATCH_HEADER_CONSTANT = 'ARROW-BATCH-TABLE';
static readonly BATCH_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_HEADER_CONSTANT.length + 8 + 1;
static readonly BATCH_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_HEADER_CONSTANT.length + 8 + 1 + 8 + 8;

static newGlobalHeader(): Uint8Array {
return new TextEncoder().encode(
static newGlobalHeader(batchIndexStart: number = 0): Uint8Array {
const strBytes = new TextEncoder().encode(
ArrowBatchProtocol.ARROW_BATCH_VERSION_CONSTANT);

const batchIndexBytes = numberToUint8Array(batchIndexStart);

const buffer = new Uint8Array(
strBytes.length + 4
);

buffer.set(strBytes, 0);
buffer.set(batchIndexBytes, strBytes.length);

return buffer;
}

static newBatchHeader(byteSize: bigint, compression: ArrowBatchCompression) {
static newBatchHeader(
byteSize: bigint,
compression: ArrowBatchCompression,
startOrdinal: bigint,
lastOrdinal: bigint
) {
const strBytes = new TextEncoder().encode(
ArrowBatchProtocol.ARROW_BATCH_HEADER_CONSTANT);

const batchSizeBytes = bigintToUint8Array(byteSize);
const compressionByte = new Uint8Array([compression]);
const startOrdinalBytes = bigintToUint8Array(startOrdinal);
const lastOrdinalBytes = bigintToUint8Array(lastOrdinal);

const buffer = new Uint8Array(
strBytes.length + batchSizeBytes.length + compressionByte.length + startOrdinalBytes.length + lastOrdinalBytes.length
);

const buffer = new Uint8Array(strBytes.length + batchSizeBytes.length + 1);
buffer.set(strBytes, 0);
buffer.set(batchSizeBytes, strBytes.length);
buffer.set(compressionByte, strBytes.length + batchSizeBytes.length);
buffer.set(startOrdinalBytes, strBytes.length + batchSizeBytes.length + compressionByte.length);
buffer.set(lastOrdinalBytes, strBytes.length + batchSizeBytes.length + compressionByte.length + startOrdinalBytes.length);

return buffer;
}
Expand All @@ -102,7 +128,9 @@ export class ArrowBatchProtocol {
const versionConstantBytes = buffer.subarray(0, versionConstantLength);
const versionConstant = new TextDecoder("utf-8").decode(versionConstantBytes);

return { versionConstant };
const batchIndexStart = buffer.readUInt32LE(versionConstantLength);

return { versionConstant, batchIndexStart };
}

static readBatchHeader(buffer: Buffer): ArrowBatchHeader {
Expand All @@ -113,8 +141,10 @@ export class ArrowBatchProtocol {
const sizeStart = headerConstantLength;
const batchByteSize = buffer.readBigUInt64LE(sizeStart);
const compression = buffer.readUint8(sizeStart + 8);
const startOrdinal = buffer.readBigInt64LE(sizeStart + 8 + 1);
const lastOrdinal = buffer.readBigInt64LE(sizeStart + 8 + 1 + 8);

return { headerConstant, batchByteSize, compression };
return { headerConstant, batchByteSize, compression, startOrdinal, lastOrdinal };
}

static async readFileMetadata(filePath: string): Promise<ArrowBatchFileMetadata> {
Expand Down
35 changes: 28 additions & 7 deletions src/reader/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
RowWithRefs,
TableBufferInfo
} from "../context.js";
import {ArrowBatchCache, ArrowCachedTables, isCachedTables} from "../cache.js";
import {ArrowBatchCache, ArrowCachedTables, ArrowMetaCacheEntry, isCachedTables} from "../cache.js";


export class ArrowBatchReader extends ArrowBatchContext {
Expand Down Expand Up @@ -168,7 +168,7 @@ export class ArrowBatchReader extends ArrowBatchContext {
this._firstOrdinal = this._lastOrdinal;

// maybe start flush
if (this.intermediateSize === Number(this.config.dumpSize))
if ((ordinal + 1n) % this.config.dumpSize === 0n)
this.beginFlush();
}

Expand All @@ -189,6 +189,10 @@ export class ArrowBatchReader extends ArrowBatchContext {
return this.getColumn('root', this.definition.root.ordinal).length;
}

get intermediateFirstOrdinal(): bigint {
return this.getColumn('root', this.definition.root.ordinal)[0];
}

get intermediateLastOrdinal(): bigint {
return this.getColumn('root', this.definition.root.ordinal)[this.intermediateSize - 1];
}
Expand Down Expand Up @@ -412,6 +416,24 @@ export class ArrowBatchReader extends ArrowBatchContext {
return mappings.map(
m => tableBuff.columns.get(m.name)[index]);
}

getRelativeTableIndex(ordinal: bigint, metadata: ArrowMetaCacheEntry): [number, bigint] {
// ensure bucket contains ordinal
const bucketOrdStart = metadata.meta.batches[0].batch.startOrdinal;
const bucketOrdLast = metadata.meta.batches[
metadata.meta.batches.length - 1].batch.lastOrdinal;

if (ordinal < bucketOrdStart || ordinal > bucketOrdLast)
throw new Error(`Bucket does not contain ${ordinal}`);

let batchIndex = 0;
while (ordinal > metadata.meta.batches[batchIndex].batch.lastOrdinal) {
batchIndex++;
}

return [batchIndex, ordinal - metadata.meta.batches[batchIndex].batch.startOrdinal];
}

async getRow(ordinal: bigint): Promise<RowWithRefs> {
const ordinalField = this.definition.root.ordinal;

Expand Down Expand Up @@ -440,20 +462,19 @@ export class ArrowBatchReader extends ArrowBatchContext {
}

// is row on disk?
const tables = await this.cache.getTablesFor(ordinal);
const [tables, batchIndex] = await this.cache.getTablesFor(ordinal);

if (!(isCachedTables(tables)))
throw new Error(`Tables for ordinal ${ordinal} not found`);

// fetch requested row from root table
const adjustedOrdinal = this.getOrdinal(ordinal);
const [bucketMetadata, _] = await this.cache.getMetadataFor(adjustedOrdinal, 'root');
const relativeIndex = ordinal - bucketMetadata.startRow[0];
const tableIndex = Number(relativeIndex % BigInt(this.config.dumpSize));
const structRow = tables.root.get(tableIndex);
const [__, relativeIndex] = this.getRelativeTableIndex(ordinal, bucketMetadata);
const structRow = tables.root.get(Number(relativeIndex));

if (!structRow)
throw new Error(`Could not find row ${tableIndex}!`);
throw new Error(`Could not find row root-${adjustedOrdinal}-${batchIndex}-${relativeIndex}!`);

const row = structRow.toArray();
this.tableMappings.get('root').map.forEach((m, i) => {
Expand Down
2 changes: 0 additions & 2 deletions src/tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ export class TestChainGenerator {
for (let i = from; i <= to; i++) {
const block = blockRows[i];
writer.pushRow('block', block);

writer.updateOrdinal(i);
}
};

Expand Down
10 changes: 10 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {format, LogEntry, loggers, transports} from "winston";
import Transport from "winston-transport";
import {ZSTDCompress} from 'simple-zstd';
import EventEmitter from "node:events";
import {number} from "zod";


// currentDir == build/ dir
Expand All @@ -25,6 +26,15 @@ export function bigintToUint8Array (big: bigint): Uint8Array {
}
return byteArray;
}

export function numberToUint8Array(int: number): Uint8Array {
const byteArray = new Uint8Array(4);
for (let i = 0; i < byteArray.length; i++) {
byteArray[i] = Math.floor(int / Math.pow(256, i)) % 256;
}
return byteArray;
}

export async function compressUint8Array(input: Uint8Array, compressionLevel = 3) {
// Convert Uint8Array to a Buffer since Node.js streams work with Buffers
const inputBuffer = Buffer.from(input);
Expand Down
25 changes: 15 additions & 10 deletions src/writer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import {format, Logger, loggers, transports} from "winston";
import {ArrowBatchReader} from "../reader/index.js";
import {ArrowBatchConfig} from "../types";
import {ArrowBatchContextDef, RowWithRefs} from "../context.js";
import {isWorkerLogMessage, ROOT_DIR, waitEvent, WorkerLogMessage} from "../utils.js";
import {extendedStringify, isWorkerLogMessage, ROOT_DIR, waitEvent, WorkerLogMessage} from "../utils.js";
import {WriterControlRequest, WriterControlResponse} from "./worker.js";
import {ArrowTableMapping, DEFAULT_STREAM_BUF_MEM} from "../protocol.js";
import ArrowBatchBroadcaster from "./broadcast.js";
import bytes from "bytes";

export const DEFAULT_BROADCAST_HOST = '127.0.0.1';
export const DEFAULT_BROADCAST_PORT = 4200;
export const DEFAULT_BROADCAST_PORT = 4201;

export class ArrowBatchWriter extends ArrowBatchReader {

Expand Down Expand Up @@ -113,6 +113,8 @@ export class ArrowBatchWriter extends ArrowBatchReader {

workerInfo.tid++;
workerInfo.worker.postMessage(msg);

// this.logger.debug(`sent ${extendedStringify(msg)} to worker ${name}`);
}

private writersMessageHandler(msg: WriterControlResponse | WorkerLogMessage) {
Expand Down Expand Up @@ -234,6 +236,8 @@ export class ArrowBatchWriter extends ArrowBatchReader {
fs.mkdirSync(this.wipBucketPath, {recursive: true});

const isUnfinished = this.intermediateSize < this.config.dumpSize;
const startOrdinal = this.intermediateFirstOrdinal;
const lastOrdinal = this.intermediateLastOrdinal;

// push intermediate to auxiliary and clear it
this._initIntermediate();
Expand All @@ -244,7 +248,8 @@ export class ArrowBatchWriter extends ArrowBatchReader {
method: 'flush',
params: {
writeDir: this.wipBucketPath,
unfinished: isUnfinished
unfinished: isUnfinished,
startOrdinal, lastOrdinal
}
})
});
Expand Down Expand Up @@ -278,8 +283,10 @@ export class ArrowBatchWriter extends ArrowBatchReader {

this.addRow(tableName, row.row);

if (tableName === 'root')
if (tableName === 'root') {
this.broadcaster.broadcastRow(row);
this.updateOrdinal(row.row[0]);
}
}

private async trimOnBuffers(ordinal: bigint) {
Expand Down Expand Up @@ -338,28 +345,26 @@ export class ArrowBatchWriter extends ArrowBatchReader {
const [bucketMetadata, _] = await this.cache.getMetadataFor(adjustedOrdinal, 'root');

// trim idx relative to bucket start
const relativeIndex = ordinal - bucketMetadata.startRow[0];
const [batchIndex, relativeIndex] = this.getRelativeTableIndex(ordinal, bucketMetadata);

// table index might need to be loaded into buffers & be partially edited
// everything after table index can be deleted
const tableIndex = Number(relativeIndex % BigInt(this.config.dumpSize));

if (tableIndex >= bucketMetadata.meta.batches.length)
if (batchIndex >= bucketMetadata.meta.batches.length)
return;

// truncate files from next table onwards
await Promise.all(
[...this.tableMappings.keys()]
.map(table => this.cache.getMetadataFor(adjustedOrdinal, table).then(
([meta, _]) => {
const tableIndexEnd = meta.meta.batches[tableIndex].end;
const tableIndexEnd = meta.meta.batches[batchIndex].end;
const fileName = this.tableFileMap.get(adjustedOrdinal).get(table);
return pfs.truncate(fileName, tableIndexEnd + 1);
})));

// unwrap adjustedOrdinal:tableIndex table into fresh intermediate
this._intermediateBuffers = this._createBuffers();
const tables = await this.cache.getTablesFor(ordinal);
const [tables, __] = await this.cache.getTablesFor(ordinal);

Object.entries(
{root: tables.root, ...tables.others}
Expand Down

0 comments on commit 8687caf

Please sign in to comment.