From 812cd9663702197f0ccceeaf7d906411cf78ef6c Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Thu, 31 Jul 2025 02:12:28 -0400 Subject: [PATCH] update hightable and implement dataframe v2 --- icebird/package.json | 4 +- icebird/src/App.tsx | 146 ++++++++++++++++++++++++++++++++++++++++--- icebird/src/Page.tsx | 2 +- 3 files changed, 140 insertions(+), 12 deletions(-) diff --git a/icebird/package.json b/icebird/package.json index 8ea7454..2661ef1 100644 --- a/icebird/package.json +++ b/icebird/package.json @@ -14,8 +14,8 @@ "typecheck": "tsc" }, "dependencies": { - "hightable": "0.17.0", - "hyperparam": "0.2.48", + "hightable": "0.18.1", + "hyperparam": "0.3.7", "icebird": "0.3.0", "react": "18.3.1", "react-dom": "18.3.1" diff --git a/icebird/src/App.tsx b/icebird/src/App.tsx index a4f2a62..0087fcb 100644 --- a/icebird/src/App.tsx +++ b/icebird/src/App.tsx @@ -2,7 +2,8 @@ import { ReactNode } from 'react' import Page, { PageProps } from './Page.js' import Welcome from './Welcome.js' -import { DataFrame, asyncRows, rowCache, sortableDataFrame } from 'hightable' +import type { DataFrame, DataFrameEvents, ResolvedValue, UnsortableDataFrame } from 'hightable' +import { createEventTarget, sortableDataFrame } from 'hightable' import { icebergListVersions, icebergMetadata, icebergRead } from 'icebird' import type { Snapshot, TableMetadata } from 'icebird/src/types.js' import { useCallback, useEffect, useState } from 'react' @@ -11,7 +12,10 @@ import Layout from './Layout.js' const empty: DataFrame = { header: [], numRows: 0, - rows: () => [], + eventTarget: createEventTarget(), + getRowNumber: () => undefined, + getCell: () => undefined, + fetch: () => Promise.resolve(undefined), } export default function App(): ReactNode { @@ -56,7 +60,7 @@ export default function App(): ReactNode { const metadataFileName = `${version}.metadata.json` icebergMetadata({ tableUrl: tableUrl, metadataFileName }).then((metadata: TableMetadata) => { const df = icebergDataFrame(tableUrl, metadataFileName, metadata) - setPageProps({ df, metadata, versions, version, setVersion, setError }) + setPageProps({ df, metadata, versions, version, setVersion, setError: setUnknownError }) }).catch(setUnknownError) }, [tableUrl, versions, version, setUnknownError]) @@ -86,13 +90,137 @@ function icebergDataFrame(tableUrl: string, metadataFileName: string, metadata: const schema = metadata.schemas.find(s => s['schema-id'] === currentSchemaId) if (!schema) throw new Error('Current schema not found in metadata') const header = schema.fields.map(f => f.name) - return sortableDataFrame(rowCache({ + const eventTarget = createEventTarget() + + type CachedValue = { + kind: 'fetched' + value: ResolvedValue + } | { + kind: 'fetching' + } | undefined + + const rowNumberCache: CachedValue[] = [] + const cellCache = new Map[]>() + header.forEach(column => cellCache.set(column, [])) + + function getRowNumber({ row }: {row: number}): ResolvedValue | undefined { + validateRow({ row, data: { numRows } }) + const cachedValue = rowNumberCache[row] + return cachedValue?.kind === 'fetched' ? cachedValue.value : undefined + } + function getCell({ row, column }: {row: number, column: string}): ResolvedValue | undefined { + validateRow({ row, data: { numRows } }) + validateColumn({ column, data: { header } }) + const cachedValue = cellCache.get(column)?.[row] + return cachedValue?.kind === 'fetched' ? cachedValue.value : undefined + } + function isCachedOrFetching({ row, columns }: {row: number, columns?: string[]}): boolean { + return rowNumberCache[row] !== undefined && (!columns || columns.length === 0 || columns.every(column => cellCache.get(column)?.[row] !== undefined)) + } + + // TODO: fetch by row groups, to avoid fetching row by row when we scroll + + const unsortableDataFrame: UnsortableDataFrame = { header, numRows, - rows({ start, end }) { - const rows = icebergRead({ tableUrl, metadataFileName, metadata, rowStart: start, rowEnd: end }) - .then(rows => rows.map((cells, index) => ({ cells, index: start + index }))) - return asyncRows(rows, end - start, header) + eventTarget, + getRowNumber, + getCell, + async fetch({ rowStart, rowEnd, columns, signal }) { + validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, header } }) + checkSignal(signal) + + const ranges = [] + let currentRange: [number, number] | undefined = undefined + for (let row = rowStart; row < rowEnd; row++) { + if (isCachedOrFetching({ row, columns })) { + if (currentRange) { + ranges.push(currentRange) + currentRange = undefined + } + } else { + if (!currentRange) { + currentRange = [row, row + 1] + } else { + currentRange[1] = row + 1 + } + } + } + if (currentRange) { + ranges.push(currentRange) + } + console.log(`Fetching rows ${rowStart} - ${rowEnd} (${ranges.length} ranges)`, { ranges, columns, cache: { rowNumberCache, cellCache } }) + + const promises = ranges.map(async ([start, end]) => { + for (let row = start; row < end; row++) { + rowNumberCache[row] = { kind: 'fetching' } + for (const column of columns ?? []) { + const array = cellCache.get(column) + if (!array) throw new Error(`Column ${column} not found in cache`) + array[row] = { kind: 'fetching' } + } + } + + const rows = await icebergRead({ + tableUrl, + rowStart: start, + rowEnd: end, + metadataFileName, + metadata, + }) + + const rowsEnd = rows.length + start + + for (const [i, cells] of rows.entries()) { + const row = i + start + rowNumberCache[row] = { kind: 'fetched', value: { value: row } } + for (const column of columns ?? []) { + const array = cellCache.get(column) + if (!array) throw new Error(`Column ${column} not found in cache`) + array[row] = { kind: 'fetched', value: { value: cells[column] } } + } + } + // Not sure if it's the best way to handle the missing rows + for (let row = start + rowsEnd; row < end; row++) { + rowNumberCache[row] = { kind: 'fetched', value: { value: -1 } } // Indicating that the row is not available - totally not a perfect idea, but it works for now + for (const column of columns ?? []) { + const array = cellCache.get(column) + if (!array) throw new Error(`Column ${column} not found in cache`) + array[row] = { kind: 'fetched', value: { value: undefined } } + } + } + + eventTarget.dispatchEvent(new CustomEvent('resolve')) + }) + + await Promise.all(promises) }, - })) + + } + return sortableDataFrame(unsortableDataFrame) + +} + +function validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, header } }: {rowStart: number, rowEnd: number, columns?: string[], data: Pick}): void { + if (rowStart < 0 || rowEnd > numRows || !Number.isInteger(rowStart) || !Number.isInteger(rowEnd) || rowStart > rowEnd) { + throw new Error(`Invalid row range: ${rowStart} - ${rowEnd}, numRows: ${numRows}`) + } + if (columns?.some(column => !header.includes(column))) { + throw new Error(`Invalid columns: ${columns.join(', ')}. Available columns: ${header.join(', ')}`) + } +} +function validateRow({ row, data: { numRows } }: {row: number, data: Pick}): void { + if (row < 0 || row >= numRows || !Number.isInteger(row)) { + throw new Error(`Invalid row index: ${row}, numRows: ${numRows}`) + } +} +function validateColumn({ column, data: { header } }: {column: string, data: Pick}): void { + if (!header.includes(column)) { + throw new Error(`Invalid column: ${column}. Available columns: ${header.join(', ')}`) + } +} +function checkSignal(signal?: AbortSignal): void { + if (signal?.aborted) { + throw new DOMException('The operation was aborted.', 'AbortError') + } } diff --git a/icebird/src/Page.tsx b/icebird/src/Page.tsx index a19ce2d..99fe32b 100644 --- a/icebird/src/Page.tsx +++ b/icebird/src/Page.tsx @@ -9,7 +9,7 @@ export interface PageProps { versions: string[] version: string setVersion: (version: string) => void - setError: (e: Error) => void + setError: (e: unknown) => void } /**