Skip to content

Commit

Permalink
Delta binary packed encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
platypii committed May 12, 2024
1 parent d39f499 commit 82db6a8
Show file tree
Hide file tree
Showing 7 changed files with 2,652 additions and 65 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
[![workflow status](https://github.com/hyparam/hyparquet/actions/workflows/ci.yml/badge.svg)](https://github.com/hyparam/hyparquet/actions)
[![mit license](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![dependencies](https://img.shields.io/badge/Dependencies-0-blueviolet)](https://www.npmjs.com/package/hyparquet?activeTab=dependencies)
![coverage](https://img.shields.io/badge/Coverage-94-darkred)
![coverage](https://img.shields.io/badge/Coverage-95-darkred)

Dependency free since 2023!

Expand Down
13 changes: 4 additions & 9 deletions src/datapage.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
/** @type {DecodedArray} */
let dataPage = []

// repetition levels
// repetition and definition levels
const repetitionLevels = readRepetitionLevels(reader, daph, schemaPath)

// definition levels
const { definitionLevels, numNulls } = readDefinitionLevels(reader, daph, schemaPath)

// read values based on encoding
Expand All @@ -38,11 +36,9 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
daph.encoding === 'RLE'
) {
// bit width is stored as single byte
let bitWidth
let bitWidth = 1
// TODO: RLE encoding uses bitWidth = schemaElement.type_length
if (columnMetadata.type === 'BOOLEAN') {
bitWidth = 1
} else {
if (columnMetadata.type !== 'BOOLEAN') {
bitWidth = view.getUint8(reader.offset)
reader.offset++
}
Expand All @@ -63,9 +59,8 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
/**
* Read a page containing dictionary data.
*
* @typedef {import("./types.d.ts").DictionaryPageHeader} DictionaryPageHeader
* @param {Uint8Array} bytes raw page data
* @param {DictionaryPageHeader} diph dictionary page header
* @param {import("./types.d.ts").DictionaryPageHeader} diph dictionary page header
* @param {ColumnMetaData} columnMetadata
* @returns {ArrayLike<any>} array of values
*/
Expand Down
87 changes: 41 additions & 46 deletions src/datapageV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { decompressPage } from './column.js'
import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
import { readPlain } from './plain.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
import { readVarInt, readZigZag } from './thrift.js'
import { readVarInt, readZigZagBigInt } from './thrift.js'

/**
* Read a data page from the given Uint8Array.
Expand All @@ -22,42 +22,35 @@ import { readVarInt, readZigZag } from './thrift.js'
export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, compressors) {
const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
const reader = { view, offset: 0 }
/** @type {any} */
let dataPage = []

const daph2 = ph.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')

// repetition levels
const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schemaPath)

if (reader.offset !== daph2.repetition_levels_byte_length) {
throw new Error(`parquet repetition levels byte length ${reader.offset} does not match expected ${daph2.repetition_levels_byte_length}`)
}
// assert(reader.offset === daph2.repetition_levels_byte_length)

// definition levels
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const definitionLevels = readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel)

if (reader.offset !== daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length) {
throw new Error(`parquet definition levels byte length ${reader.offset} does not match expected ${daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length}`)
}
// assert(reader.offset === daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length)

const uncompressedPageSize = ph.uncompressed_page_size - daph2.definition_levels_byte_length - daph2.repetition_levels_byte_length

let page = compressedBytes.subarray(reader.offset)
if (daph2.is_compressed && columnMetadata.codec !== 'UNCOMPRESSED') {
page = decompressPage(page, uncompressedPageSize, columnMetadata.codec, compressors)
}
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)

// read values based on encoding
/** @type {import('./types.d.ts').DecodedArray} */
let dataPage = []
const nValues = daph2.num_values - daph2.num_nulls
if (daph2.encoding === 'PLAIN') {
let page = compressedBytes.slice(reader.offset)
if (daph2.is_compressed && columnMetadata.codec !== 'UNCOMPRESSED') {
page = decompressPage(page, uncompressedPageSize, columnMetadata.codec, compressors)
}
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const pageReader = { view: pageView, offset: 0 }
dataPage = readPlain(pageReader, columnMetadata.type, nValues)
} else if (daph2.encoding === 'RLE') {
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec, compressors)
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const bitWidth = 1
if (daph2.num_nulls) {
throw new Error('parquet RLE encoding with nulls not supported')
Expand All @@ -70,17 +63,13 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
daph2.encoding === 'PLAIN_DICTIONARY' ||
daph2.encoding === 'RLE_DICTIONARY'
) {
compressedBytes = compressedBytes.subarray(reader.offset)
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec, compressors)
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const bitWidth = pageView.getUint8(0)
const pageReader = { view: pageView, offset: 1 }
dataPage = new Array(nValues)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, dataPage)
} else if (daph2.encoding === 'DELTA_BINARY_PACKED') {
if (daph2.num_nulls) throw new Error('parquet delta-int not supported')
const codec = daph2.is_compressed ? columnMetadata.codec : 'UNCOMPRESSED'
const page = decompressPage(compressedBytes, uncompressedPageSize, codec, compressors)
const int32 = columnMetadata.type === 'INT32'
dataPage = int32 ? new Int32Array(nValues) : new BigInt64Array(nValues)
deltaBinaryUnpack(page, nValues, dataPage)
} else {
throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)
Expand Down Expand Up @@ -134,48 +123,54 @@ function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) {
*
* @param {Uint8Array} page page data
* @param {number} nValues number of values to read
* @param {any[]} values array to write to
* @param {Int32Array | BigInt64Array} values array to write to
*/
function deltaBinaryUnpack(page, nValues, values) {
const int32 = values instanceof Int32Array
const view = new DataView(page.buffer, page.byteOffset, page.byteLength)
const reader = { view, offset: 0 }
const blockSize = readVarInt(reader)
const miniblockPerBlock = readVarInt(reader)
const count = readVarInt(reader)
let value = readZigZag(reader)
let count = readVarInt(reader)
let value = readZigZagBigInt(reader) // first value
let valueIndex = 0
values[valueIndex++] = int32 ? Number(value) : value

const valuesPerMiniblock = blockSize / miniblockPerBlock

for (let valueIndex = 0; valueIndex < nValues;) {
const minDelta = readZigZag(reader)
while (valueIndex < nValues) {
const minDelta = readZigZagBigInt(reader)
const bitWidths = new Uint8Array(miniblockPerBlock)
for (let i = 0; i < miniblockPerBlock; i++, reader.offset++) {
bitWidths[i] = page[reader.offset]
for (let i = 0; i < miniblockPerBlock; i++) {
bitWidths[i] = page[reader.offset++]
}

for (let i = 0; i < miniblockPerBlock; i++) {
const bitWidth = bitWidths[i]
let miniblockCount = Math.min(count, valuesPerMiniblock)
const bitWidth = BigInt(bitWidths[i])
if (bitWidth) {
if (count > 1) {
// no more diffs if on last value, delta read bitpacked
let data = 0
let stop = -bitWidth
// only works for bitWidth < 31
const mask = (1 << bitWidth) - 1
while (count) {
if (stop < 0) {
// fails when data gets too large
data = (data << 8) | view.getUint8(reader.offset++)
stop += 8
} else {
values.push((data >> stop) & mask)
const mask = (1n << bitWidth) - 1n
let bitpackPos = 0n
while (count && miniblockCount) {
let bits = (BigInt(view.getUint8(reader.offset)) >> bitpackPos) & mask // TODO: don't re-read value every time
bitpackPos += bitWidth
while (bitpackPos >= 8) {
bitpackPos -= 8n
reader.offset++
bits |= (BigInt(view.getUint8(reader.offset)) << bitWidth - bitpackPos) & mask
}
const delta = minDelta + bits
value += delta
values[valueIndex++] = int32 ? Number(value) : value
count--
miniblockCount--
}
}
} else {
for (let j = 0; j < valuesPerMiniblock && valueIndex < nValues; j++, valueIndex++) {
values[valueIndex] = value
for (let j = 0; j < valuesPerMiniblock && valueIndex < nValues; j++) {
value += minDelta
values[valueIndex++] = int32 ? Number(value) : value
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/thrift.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export function readVarInt(reader) {
while (true) {
const byte = reader.view.getUint8(reader.offset++)
result |= (byte & 0x7f) << shift
if ((byte & 0x80) === 0) {
if (!(byte & 0x80)) {
return result
}
shift += 7
Expand All @@ -143,15 +143,15 @@ export function readVarInt(reader) {
* @returns {bigint} value
*/
function readVarBigInt(reader) {
let result = BigInt(0)
let shift = BigInt(0)
let result = 0n
let shift = 0n
while (true) {
const byte = BigInt(reader.view.getUint8(reader.offset++))
result |= (byte & BigInt(0x7f)) << shift
if ((byte & BigInt(0x80)) === BigInt(0)) {
const byte = reader.view.getUint8(reader.offset++)
result |= BigInt(byte & 0x7f) << shift
if (!(byte & 0x80)) {
return result
}
shift += BigInt(7)
shift += 7n
}
}

Expand All @@ -162,7 +162,7 @@ function readVarBigInt(reader) {
* @param {DataReader} reader
* @returns {number} value
*/
export function readZigZag(reader) {
function readZigZag(reader) {
const zigzag = readVarInt(reader)
// convert zigzag to int
return (zigzag >>> 1) ^ -(zigzag & 1)
Expand All @@ -175,7 +175,7 @@ export function readZigZag(reader) {
* @param {DataReader} reader
* @returns {bigint} value
*/
function readZigZagBigInt(reader) {
export function readZigZagBigInt(reader) {
const zigzag = readVarBigInt(reader)
// convert zigzag to int
return (zigzag >> BigInt(1)) ^ -(zigzag & BigInt(1))
Expand Down

0 comments on commit 82db6a8

Please sign in to comment.