From fb545a5d8147eb111eded0d5ac11eda03c574134 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 22 Dec 2023 14:31:39 -0500 Subject: [PATCH] Rewrite delta bitpack reader (#912) Allows for int64 output --- fastparquet/cencoding.pyx | 56 ++++++++++++++++++++++++------- fastparquet/core.py | 5 +-- fastparquet/test/test_encoding.py | 4 +-- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/fastparquet/cencoding.pyx b/fastparquet/cencoding.pyx index 90ba15db..64f54174 100644 --- a/fastparquet/cencoding.pyx +++ b/fastparquet/cencoding.pyx @@ -214,22 +214,30 @@ cpdef void read_rle_bit_packed_hybrid(NumpyIO io_obj, int32_t width, uint32_t le cdef void delta_read_bitpacked(NumpyIO file_obj, uint8_t bitwidth, - NumpyIO o, uint64_t count, uint8_t itemsize=4): + NumpyIO o, uint64_t count, uint8_t longval=0): cdef: uint64_t data = 0 - int8_t stop = -bitwidth + int8_t left = 0 + int8_t right = 0 uint64_t mask = 0XFFFFFFFFFFFFFFFF >> (64 - bitwidth) while count > 0: - if stop < 0: - data = ((data & 0X00FFFFFFFFFFFFFF) << 8) | file_obj.read_byte() - stop += 8 + if (left - right) < bitwidth: + data = data | (file_obj.read_byte() << left) + left += 8 + elif right > 8: + data >>= 8 + left -= 8 + right -= 8 else: - o.write_int((data >> stop) & mask) - stop -= bitwidth + if longval: + o.write_long((data >> right) & mask) + else: + o.write_int((data >> right) & mask) + right += bitwidth count -= 1 -cpdef void delta_binary_unpack(NumpyIO file_obj, NumpyIO o): +cpdef void delta_binary_unpack(NumpyIO file_obj, NumpyIO o, uint8_t longval=0): cdef: uint64_t block_size = read_unsigned_var_int(file_obj) uint64_t miniblock_per_block = read_unsigned_var_int(file_obj) @@ -248,19 +256,27 @@ cpdef void delta_binary_unpack(NumpyIO file_obj, NumpyIO o): temp = o.loc if count > 1: # no more diffs if on last value - delta_read_bitpacked(file_obj, bitwidth, o, values_per_miniblock, count) + delta_read_bitpacked(file_obj, bitwidth, o, values_per_miniblock, longval) o.loc = temp for j in range(values_per_miniblock): - temp = o.read_int() - o.loc -= 4 - o.write_int(value) + if longval: + temp = o.read_long() + o.loc -= 8 + o.write_long(value) + else: + temp = o.read_int() + o.loc -= 4 + o.write_int(value) value += min_delta + temp count -= 1 if count <= 0: return else: for j in range(values_per_miniblock): - o.write_int(value) + if longval: + o.write_long(value) + else: + o.write_int(value) value += min_delta count -= 1 if count <= 0: @@ -372,6 +388,20 @@ cdef class NumpyIO(object): ( self.get_pointer())[0] = i self.loc += 4 + cdef void write_long(self, int64_t i): + if self.nbytes - self.loc < 8: + return + ( self.get_pointer())[0] = i + self.loc += 8 + + cdef int64_t read_long(self): + cdef int64_t i + if self.nbytes - self.loc < 8: + return 0 + i = ( self.get_pointer())[0] + self.loc += 8 + return i + cdef void write_many(self, char b, int32_t count): cdef int32_t i for i in range(count): diff --git a/fastparquet/core.py b/fastparquet/core.py index 5e11983b..3facf893 100644 --- a/fastparquet/core.py +++ b/fastparquet/core.py @@ -164,9 +164,10 @@ def read_data_page(f, helper, header, metadata, skip_nulls=False, else: values = np.zeros(nval, dtype=np.int8) elif daph.encoding == parquet_thrift.Encoding.DELTA_BINARY_PACKED: - values = np.empty(daph.num_values - num_nulls, dtype=np.int32) + values = np.empty(daph.num_values - num_nulls, + dtype=np.int64 if metadata.type == 2 else np.int32) o = encoding.NumpyIO(values.view('uint8')) - encoding.delta_binary_unpack(io_obj, o) + encoding.delta_binary_unpack(io_obj, o, longval=metadata.type == 2) else: raise NotImplementedError('Encoding %s' % daph.encoding) return definition_levels, repetition_levels, values[:nval] diff --git a/fastparquet/test/test_encoding.py b/fastparquet/test/test_encoding.py index 48528b0f..bfbd7afb 100644 --- a/fastparquet/test/test_encoding.py +++ b/fastparquet/test/test_encoding.py @@ -165,8 +165,8 @@ def test_delta_from_def_2(): # one and only miniblock cencoding.encode_unsigned_varint(zigzag(-2), o) # minimum delta (zigzag) o.write_byte(2) # bit-width list (only one) - o.write_byte(0b00000011) # [0, 0, 0, 3] - o.write_byte(0b11111100) # [3, 3, 3, pad] + o.write_byte(0b11000000) # rev([0, 0, 0, 3]) + o.write_byte(0b00111111) # rev([3, 3, 3, pad]) o.seek(0)