Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions icebird/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"typecheck": "tsc"
},
"dependencies": {
"hightable": "0.17.0",
"hyperparam": "0.2.48",
"hightable": "0.18.1",
"hyperparam": "0.3.7",
"icebird": "0.3.0",
"react": "18.3.1",
"react-dom": "18.3.1"
Expand Down
146 changes: 137 additions & 9 deletions icebird/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { ReactNode } from 'react'
import Page, { PageProps } from './Page.js'
import Welcome from './Welcome.js'

import { DataFrame, asyncRows, rowCache, sortableDataFrame } from 'hightable'
import type { DataFrame, DataFrameEvents, ResolvedValue, UnsortableDataFrame } from 'hightable'
import { createEventTarget, sortableDataFrame } from 'hightable'
import { icebergListVersions, icebergMetadata, icebergRead } from 'icebird'
import type { Snapshot, TableMetadata } from 'icebird/src/types.js'
import { useCallback, useEffect, useState } from 'react'
Expand All @@ -11,7 +12,10 @@ import Layout from './Layout.js'
const empty: DataFrame = {
header: [],
numRows: 0,
rows: () => [],
eventTarget: createEventTarget<DataFrameEvents>(),
getRowNumber: () => undefined,
getCell: () => undefined,
fetch: () => Promise.resolve(undefined),
}

export default function App(): ReactNode {
Expand Down Expand Up @@ -56,7 +60,7 @@ export default function App(): ReactNode {
const metadataFileName = `${version}.metadata.json`
icebergMetadata({ tableUrl: tableUrl, metadataFileName }).then((metadata: TableMetadata) => {
const df = icebergDataFrame(tableUrl, metadataFileName, metadata)
setPageProps({ df, metadata, versions, version, setVersion, setError })
setPageProps({ df, metadata, versions, version, setVersion, setError: setUnknownError })
}).catch(setUnknownError)
}, [tableUrl, versions, version, setUnknownError])

Expand Down Expand Up @@ -86,13 +90,137 @@ function icebergDataFrame(tableUrl: string, metadataFileName: string, metadata:
const schema = metadata.schemas.find(s => s['schema-id'] === currentSchemaId)
if (!schema) throw new Error('Current schema not found in metadata')
const header = schema.fields.map(f => f.name)
return sortableDataFrame(rowCache({
const eventTarget = createEventTarget<DataFrameEvents>()

type CachedValue<T> = {
kind: 'fetched'
value: ResolvedValue<T>
} | {
kind: 'fetching'
} | undefined

const rowNumberCache: CachedValue<number>[] = []
const cellCache = new Map<string, CachedValue<unknown>[]>()
header.forEach(column => cellCache.set(column, []))

function getRowNumber({ row }: {row: number}): ResolvedValue<number> | undefined {
validateRow({ row, data: { numRows } })
const cachedValue = rowNumberCache[row]
return cachedValue?.kind === 'fetched' ? cachedValue.value : undefined
}
function getCell({ row, column }: {row: number, column: string}): ResolvedValue<unknown> | undefined {
validateRow({ row, data: { numRows } })
validateColumn({ column, data: { header } })
const cachedValue = cellCache.get(column)?.[row]
return cachedValue?.kind === 'fetched' ? cachedValue.value : undefined
}
function isCachedOrFetching({ row, columns }: {row: number, columns?: string[]}): boolean {
return rowNumberCache[row] !== undefined && (!columns || columns.length === 0 || columns.every(column => cellCache.get(column)?.[row] !== undefined))
}

// TODO: fetch by row groups, to avoid fetching row by row when we scroll

const unsortableDataFrame: UnsortableDataFrame = {
header,
numRows,
rows({ start, end }) {
const rows = icebergRead({ tableUrl, metadataFileName, metadata, rowStart: start, rowEnd: end })
.then(rows => rows.map((cells, index) => ({ cells, index: start + index })))
return asyncRows(rows, end - start, header)
eventTarget,
getRowNumber,
getCell,
async fetch({ rowStart, rowEnd, columns, signal }) {
validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, header } })
checkSignal(signal)

const ranges = []
let currentRange: [number, number] | undefined = undefined
for (let row = rowStart; row < rowEnd; row++) {
if (isCachedOrFetching({ row, columns })) {
if (currentRange) {
ranges.push(currentRange)
currentRange = undefined
}
} else {
if (!currentRange) {
currentRange = [row, row + 1]
} else {
currentRange[1] = row + 1
}
}
}
if (currentRange) {
ranges.push(currentRange)
}
console.log(`Fetching rows ${rowStart} - ${rowEnd} (${ranges.length} ranges)`, { ranges, columns, cache: { rowNumberCache, cellCache } })

const promises = ranges.map(async ([start, end]) => {
for (let row = start; row < end; row++) {
rowNumberCache[row] = { kind: 'fetching' }
for (const column of columns ?? []) {
const array = cellCache.get(column)
if (!array) throw new Error(`Column ${column} not found in cache`)
array[row] = { kind: 'fetching' }
}
}

const rows = await icebergRead({
tableUrl,
rowStart: start,
rowEnd: end,
metadataFileName,
metadata,
})

const rowsEnd = rows.length + start

for (const [i, cells] of rows.entries()) {
const row = i + start
rowNumberCache[row] = { kind: 'fetched', value: { value: row } }
for (const column of columns ?? []) {
const array = cellCache.get(column)
if (!array) throw new Error(`Column ${column} not found in cache`)
array[row] = { kind: 'fetched', value: { value: cells[column] } }
}
}
// Not sure if it's the best way to handle the missing rows
for (let row = start + rowsEnd; row < end; row++) {
rowNumberCache[row] = { kind: 'fetched', value: { value: -1 } } // Indicating that the row is not available - totally not a perfect idea, but it works for now
for (const column of columns ?? []) {
const array = cellCache.get(column)
if (!array) throw new Error(`Column ${column} not found in cache`)
array[row] = { kind: 'fetched', value: { value: undefined } }
}
}

eventTarget.dispatchEvent(new CustomEvent('resolve'))
})

await Promise.all(promises)
},
}))

}
return sortableDataFrame(unsortableDataFrame)

}

function validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, header } }: {rowStart: number, rowEnd: number, columns?: string[], data: Pick<DataFrame, 'numRows' | 'header'>}): void {
if (rowStart < 0 || rowEnd > numRows || !Number.isInteger(rowStart) || !Number.isInteger(rowEnd) || rowStart > rowEnd) {
throw new Error(`Invalid row range: ${rowStart} - ${rowEnd}, numRows: ${numRows}`)
}
if (columns?.some(column => !header.includes(column))) {
throw new Error(`Invalid columns: ${columns.join(', ')}. Available columns: ${header.join(', ')}`)
}
}
function validateRow({ row, data: { numRows } }: {row: number, data: Pick<DataFrame, 'numRows'>}): void {
if (row < 0 || row >= numRows || !Number.isInteger(row)) {
throw new Error(`Invalid row index: ${row}, numRows: ${numRows}`)
}
}
function validateColumn({ column, data: { header } }: {column: string, data: Pick<DataFrame, 'header'>}): void {
if (!header.includes(column)) {
throw new Error(`Invalid column: ${column}. Available columns: ${header.join(', ')}`)
}
}
function checkSignal(signal?: AbortSignal): void {
if (signal?.aborted) {
throw new DOMException('The operation was aborted.', 'AbortError')
}
}
2 changes: 1 addition & 1 deletion icebird/src/Page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export interface PageProps {
versions: string[]
version: string
setVersion: (version: string) => void
setError: (e: Error) => void
setError: (e: unknown) => void
}

/**
Expand Down