Skip to content

Commit

Permalink
Remove unnecesary batchIndexStart field from global header
Browse files Browse the repository at this point in the history
Make batch dump new table condition configurable
Optimize reader's init method to take advantage of new batch table metadata fields
Add debug logging on writer-worker flush messaging
Bump rc version
  • Loading branch information
guilledk committed May 21, 2024
1 parent 8687caf commit 691d2f4
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 50 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@guilledk/arrowbatch-nodejs",
"version": "1.0.0-rc12",
"version": "1.0.0-rc13",
"description": "Arrow Batch Storage protocol",
"main": "./build/index.js",
"type": "module",
Expand Down
43 changes: 20 additions & 23 deletions src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import {tableFromIPC} from "apache-arrow";
import {ZSTDDecompress} from 'simple-zstd';
import RLP from "rlp";

import {bigintToUint8Array, numberToUint8Array} from "./utils.js";
import {bigintToUint8Array} from "./utils.js";
import {ArrowBatchConfig} from "./types.js";

export enum ArrowBatchCompression {
UNCOMPRESSED = 0,
Expand All @@ -13,7 +14,6 @@ export enum ArrowBatchCompression {

export interface ArrowBatchGlobalHeader {
versionConstant: string;
batchIndexStart: number;
}

export interface ArrowBatchHeader {
Expand All @@ -33,6 +33,10 @@ export interface ArrowBatchFileMetadata {
export const DEFAULT_BUCKET_SIZE = BigInt(1e7);
export const DEFAULT_DUMP_SIZE = BigInt(1e5);

export const DUMP_CONDITION = (ordinal: bigint, config: ArrowBatchConfig): boolean => {
return (ordinal + 1n) % config.dumpSize === 0n;
}

export const DEFAULT_STREAM_BUF_MEM = 32 * 1024 * 1024;

export class ArrowBatchProtocol {
Expand Down Expand Up @@ -75,25 +79,14 @@ export class ArrowBatchProtocol {
* queries that only affect that small batch.
*/
static readonly ARROW_BATCH_VERSION_CONSTANT = 'ARROW-BATCH1';
static readonly GLOBAL_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_VERSION_CONSTANT.length + 4;
static readonly GLOBAL_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_VERSION_CONSTANT.length;

static readonly ARROW_BATCH_HEADER_CONSTANT = 'ARROW-BATCH-TABLE';
static readonly BATCH_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_HEADER_CONSTANT.length + 8 + 1 + 8 + 8;

static newGlobalHeader(batchIndexStart: number = 0): Uint8Array {
const strBytes = new TextEncoder().encode(
return new TextEncoder().encode(
ArrowBatchProtocol.ARROW_BATCH_VERSION_CONSTANT);

const batchIndexBytes = numberToUint8Array(batchIndexStart);

const buffer = new Uint8Array(
strBytes.length + 4
);

buffer.set(strBytes, 0);
buffer.set(batchIndexBytes, strBytes.length);

return buffer;
}

static newBatchHeader(
Expand All @@ -114,11 +107,17 @@ export class ArrowBatchProtocol {
strBytes.length + batchSizeBytes.length + compressionByte.length + startOrdinalBytes.length + lastOrdinalBytes.length
);

buffer.set(strBytes, 0);
buffer.set(batchSizeBytes, strBytes.length);
buffer.set(compressionByte, strBytes.length + batchSizeBytes.length);
buffer.set(startOrdinalBytes, strBytes.length + batchSizeBytes.length + compressionByte.length);
buffer.set(lastOrdinalBytes, strBytes.length + batchSizeBytes.length + compressionByte.length + startOrdinalBytes.length);
let offset = 0;
const appendBuff = (buf) => {
buffer.set(buf, offset);
offset += buf.length;
}

appendBuff(strBytes);
appendBuff(batchSizeBytes);
appendBuff(compressionByte);
appendBuff(startOrdinalBytes);
appendBuff(lastOrdinalBytes);

return buffer;
}
Expand All @@ -128,9 +127,7 @@ export class ArrowBatchProtocol {
const versionConstantBytes = buffer.subarray(0, versionConstantLength);
const versionConstant = new TextDecoder("utf-8").decode(versionConstantBytes);

const batchIndexStart = buffer.readUInt32LE(versionConstantLength);

return { versionConstant, batchIndexStart };
return { versionConstant };
}

static readBatchHeader(buffer: Buffer): ArrowBatchHeader {
Expand Down
42 changes: 21 additions & 21 deletions src/reader/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Logger} from "winston";

import {ArrowBatchProtocol, ArrowTableMapping, decodeRowValue} from "../protocol.js";
import {ArrowBatchProtocol, ArrowTableMapping, decodeRowValue, DUMP_CONDITION} from "../protocol.js";
import {ArrowBatchConfig} from "../types.js";
import {
ArrowBatchContext, ArrowBatchContextDef, generateMappingsFromDefs, genereateReferenceMappings,
Expand Down Expand Up @@ -88,18 +88,25 @@ export class ArrowBatchReader extends ArrowBatchContext {
// wip files found, load unfinished table into buffers and init partially
if (this.wipFilesMap.size > 0) {
for (const [tableName, tablePath] of this.wipFilesMap.entries()) {
const metadata = await ArrowBatchProtocol.readFileMetadata(tablePath);
const fileMeta = await ArrowBatchProtocol.readFileMetadata(tablePath);

if (metadata.batches.length > 1)
if (fileMeta.batches.length > 1)
throw new Error(`Expected on-disk wip table to have only one batch!`);

const metadata = fileMeta.batches[0];

const wipTable = await ArrowBatchProtocol.readArrowBatchTable(
tablePath, metadata, 0);
tablePath, fileMeta, 0);

// if its root load lastOrdinal from there
if (tableName === 'root' && wipTable.numRows > 0) {
const lastRow = wipTable.get(wipTable.numRows - 1).toArray();
this._lastOrdinal = lastRow[0];

// sanity check
if (this._lastOrdinal !== metadata.batch.lastOrdinal)
throw new Error(
`Mismatch between table lastOrdinal (${this._lastOrdinal.toLocaleString()}) and metadata\'s (${metadata.batch.lastOrdinal.toLocaleString()})`)
}

// load all rows using helper
Expand All @@ -118,27 +125,20 @@ export class ArrowBatchReader extends ArrowBatchContext {
.reverse()[0];

const rootFirstPath = this.tableFileMap.get(lowestBucket).get('root');
const firstMetadata = await ArrowBatchProtocol.readFileMetadata(rootFirstPath);
const firstTable = await ArrowBatchProtocol.readArrowBatchTable(
rootFirstPath, firstMetadata, 0
);
if (firstTable.numRows > 0) {
const firstRow = firstTable.get(0).toArray();
this._firstOrdinal = firstRow[0];
}
const firstMetadata = (await ArrowBatchProtocol.readFileMetadata(rootFirstPath)).batches[0];
const firstTableSize = firstMetadata.batch.lastOrdinal - firstMetadata.batch.startOrdinal;
if (firstTableSize > 0n)
this._firstOrdinal = firstMetadata.batch.startOrdinal;

// only load if lastOrdinal isnt set, (will be set if loaded wip)
if (typeof this._lastOrdinal === 'undefined') {
const rootLastPath = this.tableFileMap.get(highestBucket).get('root');
const lastMetadata = await ArrowBatchProtocol.readFileMetadata(rootLastPath);
const lastTable = await ArrowBatchProtocol.readArrowBatchTable(
rootLastPath, lastMetadata, lastMetadata.batches.length - 1
);
const lastFileMetadata = await ArrowBatchProtocol.readFileMetadata(rootLastPath);
const lastMetadata = lastFileMetadata.batches[lastFileMetadata.batches.length - 1];
const lastTableSize = lastMetadata.batch.lastOrdinal - lastMetadata.batch.startOrdinal;

if (lastTable.numRows > 0) {
const lastRow = lastTable.get(lastTable.numRows - 1).toArray();
this._lastOrdinal = lastRow[0];
}
if (lastTableSize > 0)
this._lastOrdinal = lastMetadata.batch.lastOrdinal;
}
}
}
Expand Down Expand Up @@ -168,7 +168,7 @@ export class ArrowBatchReader extends ArrowBatchContext {
this._firstOrdinal = this._lastOrdinal;

// maybe start flush
if ((ordinal + 1n) % this.config.dumpSize === 0n)
if (DUMP_CONDITION(ordinal, this.config))
this.beginFlush();
}

Expand Down
13 changes: 8 additions & 5 deletions src/writer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {ArrowBatchConfig} from "../types";
import {ArrowBatchContextDef, RowWithRefs} from "../context.js";
import {extendedStringify, isWorkerLogMessage, ROOT_DIR, waitEvent, WorkerLogMessage} from "../utils.js";
import {WriterControlRequest, WriterControlResponse} from "./worker.js";
import {ArrowTableMapping, DEFAULT_STREAM_BUF_MEM} from "../protocol.js";
import {ArrowTableMapping, DEFAULT_STREAM_BUF_MEM, DUMP_CONDITION} from "../protocol.js";
import ArrowBatchBroadcaster from "./broadcast.js";
import bytes from "bytes";

Expand Down Expand Up @@ -114,7 +114,8 @@ export class ArrowBatchWriter extends ArrowBatchReader {
workerInfo.tid++;
workerInfo.worker.postMessage(msg);

// this.logger.debug(`sent ${extendedStringify(msg)} to worker ${name}`);
if (msg.method === 'flush')
this.logger.debug(`sent ${extendedStringify(msg)} to worker ${name}`);
}

private writersMessageHandler(msg: WriterControlResponse | WorkerLogMessage) {
Expand Down Expand Up @@ -171,6 +172,9 @@ export class ArrowBatchWriter extends ArrowBatchReader {

workerInfo.tasks.delete(msg.tid);

if (msg.method === 'flush')
this.logger.debug(`worker replied to ${msg.method} with id ${msg.tid}`);

const allWorkersReady = [...this.writeWorkers.values()]
.every(w => w.ackTid == w.tid - 1);

Expand Down Expand Up @@ -235,9 +239,9 @@ export class ArrowBatchWriter extends ArrowBatchReader {
if (!fs.existsSync(this.wipBucketPath))
fs.mkdirSync(this.wipBucketPath, {recursive: true});

const isUnfinished = this.intermediateSize < this.config.dumpSize;
const startOrdinal = this.intermediateFirstOrdinal;
const lastOrdinal = this.intermediateLastOrdinal;
const unfinished = !DUMP_CONDITION(lastOrdinal, this.config);

// push intermediate to auxiliary and clear it
this._initIntermediate();
Expand All @@ -248,8 +252,7 @@ export class ArrowBatchWriter extends ArrowBatchReader {
method: 'flush',
params: {
writeDir: this.wipBucketPath,
unfinished: isUnfinished,
startOrdinal, lastOrdinal
unfinished, startOrdinal, lastOrdinal
}
})
});
Expand Down

0 comments on commit 691d2f4

Please sign in to comment.