Skip to content

Commit

Permalink
Merge pull request #996 from PyTables/fix-blosc2-opt
Browse files Browse the repository at this point in the history
Use hsize_t for offsets (fixes #995)
  • Loading branch information
FrancescAlted committed Jan 25, 2023
2 parents 054b3cc + 313e65f commit 1d47756
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 25 deletions.
28 changes: 11 additions & 17 deletions src/H5TB-opt.c
Expand Up @@ -430,8 +430,8 @@ herr_t read_records_blosc2( char* filename,
blosc2_context *dctx = blosc2_create_dctx(dparams);

/* Gather data for the interesting part */
hsize_t nrecords_chunk = chunklen - start_chunk;
if (nrecords_chunk > nrecords - total_records) {
int nrecords_chunk = chunklen - start_chunk;
if (nrecords_chunk > (nrecords - total_records)) {
nrecords_chunk = nrecords - total_records;
}

Expand All @@ -445,7 +445,7 @@ herr_t read_records_blosc2( char* filename,
}
else {
/* Less than 1 chunk to read; use a getitem call */
rbytes = (int) blosc2_getitem_ctx(dctx, chunk, cbytes, start_chunk, (int) nrecords_chunk, data, chunksize);
rbytes = blosc2_getitem_ctx(dctx, chunk, cbytes, start_chunk, nrecords_chunk, data, chunksize);
if (rbytes != nrecords_chunk * typesize) {
BLOSC_TRACE_ERROR("Cannot get (all) items for lazychunk\n");
goto out;
Expand Down Expand Up @@ -758,15 +758,14 @@ herr_t write_records_blosc2( hid_t dataset_id,
goto out;
}
int typesize = cd_values[2];
hsize_t cshape[1];
H5Pget_chunk(dcpl, 1, cshape);
hsize_t chunklen;
H5Pget_chunk(dcpl, 1, &chunklen);
if (H5Pclose(dcpl) < 0)
goto out;
int chunklen = (int) cshape[0];
int cstart = (int) (start / chunklen);
int cstop = (int) (start + nrecords - 1) / chunklen + 1;
int data_offset = 0;
for (int ci = cstart; ci < cstop; ci ++) {
hsize_t cstart = start / chunklen;
hsize_t cstop = (start + nrecords - 1) / chunklen + 1;
for (hsize_t ci = cstart; ci < cstop; ci ++) {
hsize_t data_offset = chunklen - (start % chunklen) + (ci - cstart - 1) * chunklen;
if (ci == cstart) {
if ((start % chunklen == 0) && (nrecords >= chunklen)) {
if (insert_chunk_blosc2(dataset_id, ci * chunklen, chunklen, data) < 0)
Expand All @@ -791,7 +790,6 @@ herr_t write_records_blosc2( hid_t dataset_id,
goto out;
}
} else if (ci == cstop - 1) {
data_offset = chunklen - (start % chunklen) + (ci - cstart - 1) * chunklen;
count[0] = nrecords - data_offset;
if (count[0] == chunklen) {
if (insert_chunk_blosc2(dataset_id, ci * chunklen, count[0],
Expand All @@ -813,7 +811,6 @@ herr_t write_records_blosc2( hid_t dataset_id,
goto out;
}
} else {
data_offset = chunklen - (start % chunklen) + (ci - cstart - 1) * chunklen;
if (insert_chunk_blosc2(dataset_id, ci * chunklen, chunklen,
data2 + data_offset * typesize) < 0)
goto out;
Expand Down Expand Up @@ -871,7 +868,6 @@ herr_t insert_chunk_blosc2( hid_t dataset_id,
goto out;
}
int32_t typesize = cd_values[2];
int32_t chunksize = cd_values[3];
hsize_t chunklen;
H5Pget_chunk(dcpl, 1, &chunklen);
if (H5Pclose(dcpl) < 0)
Expand Down Expand Up @@ -904,17 +900,15 @@ herr_t insert_chunk_blosc2( hid_t dataset_id,
}
uint8_t* cframe;
bool needs_free2;
int cfsize = (int) blosc2_schunk_to_buffer(sc, &cframe, &needs_free2);
int64_t cfsize = blosc2_schunk_to_buffer(sc, &cframe, &needs_free2);
if (cfsize <= 0) {
BLOSC_TRACE_ERROR("Failed converting schunk to cframe");
goto out;
}

/* Write frame bypassing HDF5 filter pipeline */
unsigned flt_msk = 0;
haddr_t offset[8];
offset[0] = start;
if (H5Dwrite_chunk(dataset_id, H5P_DEFAULT, flt_msk, offset, cfsize, cframe) < 0) {
if (H5Dwrite_chunk(dataset_id, H5P_DEFAULT, flt_msk, &start, (size_t)cfsize, cframe) < 0) {
BLOSC_TRACE_ERROR("Failed HDF5 writing chunk");
goto out;
}
Expand Down
16 changes: 8 additions & 8 deletions tables/tableextension.pyx
Expand Up @@ -753,19 +753,19 @@ cdef class Row:

cdef npy_intp _stride
cdef long _row, _unsaved_nrows, _mod_nrows
cdef hsize_t start, absstep
cdef long long start, absstep
cdef long long stop, step, nextelement, _nrow, stopb # has to be long long, not hsize_t, for negative step sizes
cdef hsize_t nrowsinbuf, nrows, nrowsread
cdef hsize_t chunksize, nchunksinbuf, totalchunks
cdef hsize_t startb, lenbuf
cdef long long nrowsinbuf, nrows, nrowsread
cdef long long chunksize, nchunksinbuf, totalchunks
cdef long long startb, lenbuf
cdef long long indexchunk
cdef int bufcounter, counter
cdef int exist_enum_cols
cdef int _riterator, _rowsize, _write_to_seqcache
cdef int wherecond, indexed
cdef int ro_filemode, chunked
cdef int _bufferinfo_done, sss_on
cdef int iterseq_max_elements
cdef long long iterseq_max_elements
cdef ndarray bufcoords, indexvalid, indexvalues, chunkmap
cdef hsize_t *bufcoords_data
cdef hsize_t *index_values_data
Expand Down Expand Up @@ -870,7 +870,7 @@ cdef class Row:
self._rowsize = self.dtype.itemsize
self.nrows = table.nrows # This value may change

cdef _init_loop(self, hsize_t start, long long stop, long long step,
cdef _init_loop(self, long long start, long long stop, long long step,
object coords, object chunkmap):
"""Initialization for the __iter__ iterator"""
table = self.table
Expand Down Expand Up @@ -960,7 +960,7 @@ cdef class Row:
"""The version of next() for indexed columns and a chunkmap."""

cdef long recout, j, cs, vlen, rowsize
cdef hsize_t nchunksread
cdef long long nchunksread
cdef object tmp_range
cdef Table table
cdef ndarray iobuf
Expand Down Expand Up @@ -1246,7 +1246,7 @@ cdef class Row:
"""Read a field from a table on disk and put the result in result"""

cdef hsize_t startr, istartb
cdef hsize_t istart, inrowsinbuf, inextelement
cdef long long istart, inrowsinbuf, inextelement
cdef long long stopr, istopb, i, j, inrowsread
cdef long long istop, istep
cdef object fields
Expand Down
96 changes: 96 additions & 0 deletions tables/tests/test_large_tables.py
@@ -0,0 +1,96 @@
import sys
import numpy as np
import tables as tb
from tables.tests import common


class LargeTable(tb.IsDescription):
time = tb.Int32Col()


class BasicTestCase(common.TempFileMixin, common.PyTablesTestCase):
# file = "test.h5"
open_mode = "w"
title = "This is the table title"
dim1, dim2, dim3 = 24, 721, 1440
nrows = dim1 * dim2 * dim3 # rows for a day
chunkshape = nrows
complib = "blosc2" # default

def setUp(self):
super().setUp()

# Create an instance of an HDF5 Table
self.populateFile()
self.h5file.close()

def populateFile(self):
group = self.h5file.root
table = self.h5file.create_table(group, 'table', LargeTable, "Large table",
tb.Filters(complevel=1, complib=self.complib),
chunkshape=self.chunkshape)

# Structured NumPy buffer for every day
self.day_block = day_block = np.empty(self.nrows, dtype=table.dtype)
day_block["time"] = np.arange(self.nrows)

# Append groups of rows ("days") so that we have more than 2**31
# (see https://github.com/PyTables/PyTables/issues/995)
self.ndays = ndays = 90
self.assertTrue(ndays * self.nrows > 2 ** 31)
if common.verbose:
print(f"Writing {ndays} days...")
for day in range(ndays):
table.append(day_block)
table.flush()

def test00_values(self):
"""Check that written values are correct."""

self.h5file = tb.open_file(self.h5fname)
table = self.h5file.root.table
nrows = self.nrows
day_block = self.day_block
if common.verbose:
print(f"Checking {self.ndays} days...")
for nday in range(self.ndays):
day_block2 = table[nday * nrows: (nday + 1) * nrows]
self.assertEqual(np.sum(day_block2['time'] == day_block['time']), nrows,
f"Values differ in day {nday}")


@common.unittest.skipIf(not common.blosc_avail,
'BLOSC compression library not available')
class BloscTestCase(BasicTestCase):
title = "Blosc table"
complib = "blosc"

@common.unittest.skipIf(not common.blosc2_avail,
'BLOSC2 compression library not available')
class Blosc2TestCase(BasicTestCase):
title = "Blosc2 table"
complib = "blosc2"

class ZlibTestCase(BasicTestCase):
title = "Zlib table"
complib = "zlib"


def suite():
theSuite = common.unittest.TestSuite()
niter = 1
# common.heavy = 1 # Uncomment this only for testing purposes

for n in range(niter):
theSuite.addTest(common.unittest.makeSuite(BloscTestCase))
theSuite.addTest(common.unittest.makeSuite(Blosc2TestCase))
if common.heavy:
theSuite.addTest(common.unittest.makeSuite(ZlibTestCase))

return theSuite


if __name__ == '__main__':
common.parse_argv(sys.argv)
common.print_versions()
common.unittest.main(defaultTest='suite')
1 change: 1 addition & 0 deletions tables/tests/test_suite.py
Expand Up @@ -15,6 +15,7 @@ def suite():
'tables.tests.test_lists',
'tables.tests.test_tables',
'tables.tests.test_tablesMD',
'tables.tests.test_large_tables',
'tables.tests.test_array',
'tables.tests.test_earray',
'tables.tests.test_carray',
Expand Down

0 comments on commit 1d47756

Please sign in to comment.