Skip to content

Commit

Permalink
Add trimming & scrolling
Browse files Browse the repository at this point in the history
  • Loading branch information
guilledk committed Apr 22, 2024
1 parent f978f2a commit ce2093b
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 53 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@guilledk/arrowbatch-nodejs",
"version": "1.0.0-rc1",
"version": "1.0.0-rc2",
"description": "Arrow Batch Storage protocol",
"main": "./build/index.js",
"type": "module",
Expand Down
34 changes: 17 additions & 17 deletions src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import {ArrowBatchContext} from "./context.js";
import moment from "moment";


export type CacheKey = string; // `${adjustedOrd}-${batchIdx}`

export interface ArrowMetaCacheEntry {
ts: number,
meta: ArrowBatchFileMetadata, startOrdinal: bigint
meta: ArrowBatchFileMetadata, startRow: any[]
}

export interface ArrowCachedTables {
Expand All @@ -30,38 +28,40 @@ export class ArrowBatchCache {

private ctx: ArrowBatchContext;

private tableCache = new Map<CacheKey, ArrowCachedTables>();
private cacheOrder: CacheKey[] = [];
private tableCache = new Map<string, ArrowCachedTables>();
private cacheOrder: string[] = [];

private metadataCache = new Map<number, ArrowMetaCacheEntry>();
private metadataCache = new Map<string, ArrowMetaCacheEntry>();

readonly dataDir: string;

constructor(ctx: ArrowBatchContext) {
this.ctx = ctx;
}

async getMetadataFor(adjustedOrdinal: number): Promise<[ArrowMetaCacheEntry, boolean]> {
const filePath = this.ctx.tableFileMap.get(adjustedOrdinal).get('root');
async getMetadataFor(adjustedOrdinal: number, tableName: string): Promise<[ArrowMetaCacheEntry, boolean]> {
const filePath = this.ctx.tableFileMap.get(adjustedOrdinal).get(tableName);
const meta = await ArrowBatchProtocol.readFileMetadata(filePath);

if (this.metadataCache.has(adjustedOrdinal)) {
const cacheKey = `${adjustedOrdinal}-${tableName}`

if (this.metadataCache.has(cacheKey)) {
// we might need to invalidate our metadata if file size changed
const cachedMeta = this.metadataCache.get(adjustedOrdinal);
const cachedMeta = this.metadataCache.get(cacheKey);
if (cachedMeta.meta.size === meta.size)
return [cachedMeta, false]; // size hasnt change, return cache

// invalidate and re-calculate
this.metadataCache.delete(adjustedOrdinal);
this.metadataCache.delete(cacheKey);
}

const firstTable = await ArrowBatchProtocol.readArrowBatchTable(
filePath, meta, 0);

const startOrdinal: bigint = firstTable.get(0).toArray()[0];
const startRow = firstTable.get(0).toArray();

const result = { ts: moment.now(), meta, startOrdinal };
this.metadataCache.set(adjustedOrdinal, result);
const result = { ts: moment.now(), meta, startRow };
this.metadataCache.set(cacheKey, result);
return [result, true];
}

Expand Down Expand Up @@ -89,15 +89,15 @@ export class ArrowBatchCache {
// metadata about the bucket we are going to get tables for, mostly need to
// figure out start ordinal for math to make sense in case non-aligned bucket
// boundary start
const [bucketMetadata, metadataUpdated] = await this.getMetadataFor(adjustedOrdinal);
const [bucketMetadata, metadataUpdated] = await this.getMetadataFor(adjustedOrdinal, 'root');

// index relative to bucket boundary
const relativeIndex = ordinal - bucketMetadata.startOrdinal;
const relativeIndex = ordinal - bucketMetadata.startRow[0];

// get batch table index, assuming config.dumpSize table size is respected
const batchIndex = Number(relativeIndex / BigInt(this.ctx.config.dumpSize));

const cacheKey: CacheKey = `${adjustedOrdinal}-${batchIndex}`;
const cacheKey = `${adjustedOrdinal}-${batchIndex}`;

if (this.tableCache.has(cacheKey)) {
// we have this tables cached, but only return if metadata wasnt invalidated
Expand Down
71 changes: 57 additions & 14 deletions src/reader/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class ArrowBatchReader extends ArrowBatchContext {
protected _auxiliaryBuffers: RowBuffers = new Map<string, TableBufferInfo>();

private isFirstUpdate: boolean = true;
private cache: ArrowBatchCache;
protected cache: ArrowBatchCache;

constructor(
config: ArrowBatchConfig,
Expand All @@ -40,7 +40,7 @@ export class ArrowBatchReader extends ArrowBatchContext {

this.cache = new ArrowBatchCache(this);

this._intermediateBuffers = this._initBuffer();
this._intermediateBuffers = this._createBuffers();
this._initIntermediate();
}

Expand All @@ -52,22 +52,23 @@ export class ArrowBatchReader extends ArrowBatchContext {
return this._lastOrdinal;
}

protected _initBuffer() {
const buffers = new Map<string, TableBufferInfo>();
for (const [tableName, tableMapping] of this.tableMappings.entries()) {
buffers.set(
tableName, {columns: new Map<string, any[]>()});
protected _createBuffer(tableName: string) {
const buffers = {columns: new Map<string, any[]>()};
for (const mapping of this.tableMappings.get(tableName))
buffers.columns.set(mapping.name, []);
return buffers;
}

const tableBuffers = buffers.get(tableName);
for (const mapping of tableMapping)
tableBuffers.columns.set(mapping.name, []);
}
protected _createBuffers() {
const buffers = new Map<string, TableBufferInfo>();
for (const tableName of this.tableMappings.keys())
buffers.set(tableName, this._createBuffer(tableName));
return buffers;
}

protected _initIntermediate() {
this._auxiliaryBuffers = this._intermediateBuffers;
this._intermediateBuffers = this._initBuffer();
this._intermediateBuffers = this._createBuffers();
this.logger.debug(`initialized buffers for ${[...this._intermediateBuffers.keys()]}`);
}

Expand Down Expand Up @@ -420,8 +421,8 @@ export class ArrowBatchReader extends ArrowBatchContext {

// fetch requested row from root table
const adjustedOrdinal = this.getOrdinal(ordinal);
const [bucketMetadata, _] = await this.cache.getMetadataFor(adjustedOrdinal);
const relativeIndex = ordinal - bucketMetadata.startOrdinal;
const [bucketMetadata, _] = await this.cache.getMetadataFor(adjustedOrdinal, 'root');
const relativeIndex = ordinal - bucketMetadata.startRow[0];
const tableIndex = Number(relativeIndex % BigInt(this.config.dumpSize));
const structRow = tables.root.get(tableIndex);

Expand All @@ -436,4 +437,46 @@ export class ArrowBatchReader extends ArrowBatchContext {
return this.genRowWithRefsFromTables('root', row, tables);
}

iter(params: {from: bigint, to: bigint}) : RowScroller {
return new RowScroller(this, params);
}
}

export class RowScroller {

private _isDone: boolean;
readonly from: bigint; // will push rows with ord >= `from`
readonly to: bigint; // will stop pushing rows when row with ord `to` is reached

protected reader: ArrowBatchReader;

private _lastYielded: bigint;

constructor(
reader: ArrowBatchReader,
params: {
from: bigint,
to: bigint
}
) {
this.reader = reader;
this.from = params.from;
this.to = params.to;
this._lastYielded = this.from - 1n;
}

async nextResult(): Promise<RowWithRefs> {
const nextBlock = this._lastYielded + 1n;
const row = await this.reader.getRow(nextBlock);
this._lastYielded = nextBlock;
this._isDone = this._lastYielded == this.to;
return row;
}

async *[Symbol.asyncIterator](): AsyncIterableIterator<RowWithRefs> {
do {
const row = await this.nextResult();
yield row;
} while (!this._isDone)
}
}
8 changes: 4 additions & 4 deletions src/tests/testCache.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import {
ArrowBatchConfig,
ArrowBatchReader,
ArrowBatchWriter,
createLogger
createLogger, waitEvent
} from '../index.js';
import {randomHexString, TestChainGenerator, testDataContext, waitEvent} from "./utils.js";
import {randomHexString, TestChainGenerator, testDataContext} from "./utils.js";
import {expect} from "chai";

describe('reader table cache', () => {
Expand Down Expand Up @@ -99,7 +99,7 @@ describe('reader table cache', () => {

expect(reader.cacheSize).to.be.equal(1);
// @ts-ignore
const firstMetaTs = reader.cache.metadataCache.get(1).ts;
const firstMetaTs = reader.cache.metadataCache.get('1-root').ts;

// add a byte at end of table file to trigger meta update
fs.appendFileSync(
Expand All @@ -112,7 +112,7 @@ describe('reader table cache', () => {

expect(reader.cacheSize).to.be.equal(1);
// @ts-ignore
const secondMetaTs = reader.cache.metadataCache.get(1).ts;
const secondMetaTs = reader.cache.metadataCache.get('1-root').ts;

expect(secondMetaTs).to.be.greaterThan(firstMetaTs);
});
Expand Down
10 changes: 5 additions & 5 deletions src/tests/testRW.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import {
ArrowBatchConfig,
ArrowBatchReader,
ArrowBatchWriter,
createLogger, RowWithRefs
createLogger, RowWithRefs, waitEvent
} from '../index.js';
import {randomHexString, TestChainGenerator, testDataContext, waitEvent} from "./utils.js";
import {randomHexString, TestChainGenerator, testDataContext} from "./utils.js";
import {expect} from "chai";

describe('read/write', () => {
Expand Down Expand Up @@ -47,10 +47,10 @@ describe('read/write', () => {

const blocks = [];

for (let i = BigInt(from); i <= BigInt(to); i++) {
const row = await reader.getRow(i);
for await (const row of reader.iter({
from: BigInt(from), to: BigInt(to)
}))
blocks.push(row);
}

return blocks;
};
Expand Down
7 changes: 0 additions & 7 deletions src/tests/utils.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import crypto from 'node:crypto';
import EventEmitter from 'node:events';

import moment from "moment";

import {ArrowBatchContextDef, RowWithRefs} from "../context.js";
import {ArrowBatchWriter} from "../writer";
import {ArrowBatchConfig} from "../types";
import {Logger} from "winston";

export function randomBytes(length: number): Buffer {
return crypto.randomBytes(length);
Expand All @@ -20,10 +17,6 @@ export function randomInteger(min: number, max: number): number {
return Math.floor(Math.random() * (max - min + 1)) + min;
}

export async function waitEvent(emitter: EventEmitter, event: string): Promise<void> {
return new Promise(resolve => emitter.once(event, resolve));
}

export const sleep = (ms: number) => new Promise(res => setTimeout(res, ms));


Expand Down
5 changes: 5 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import fs from 'node:fs';
import {format, LogEntry, loggers, transports} from "winston";
import Transport from "winston-transport";
import {ZSTDCompress} from 'simple-zstd';
import EventEmitter from "node:events";


// currentDir == build/ dir
Expand Down Expand Up @@ -139,4 +140,8 @@ export function createLogger(name: string, logLevel: string) {
]
}
return loggers.add(name, loggingOptions);
}

export async function waitEvent(emitter: EventEmitter, event: string): Promise<void> {
return new Promise(resolve => emitter.once(event, resolve));
}

0 comments on commit ce2093b

Please sign in to comment.