From 4f56c45669b32fd016f7a2c54170753bde27fe30 Mon Sep 17 00:00:00 2001 From: vikonix Date: Mon, 13 Apr 2026 12:04:57 +0200 Subject: [PATCH 01/10] remove C API --- quasardb/quasardb/_table.pyi | 45 -------- quasardb/table.cpp | 211 ----------------------------------- quasardb/table.hpp | 59 +--------- 3 files changed, 1 insertion(+), 314 deletions(-) diff --git a/quasardb/quasardb/_table.pyi b/quasardb/quasardb/_table.pyi index 00c792c4..53b4adce 100644 --- a/quasardb/quasardb/_table.pyi +++ b/quasardb/quasardb/_table.pyi @@ -68,15 +68,6 @@ class IndexedColumnInfo: class Table(Entry): def __repr__(self) -> str: ... - def blob_get_ranges( - self, column: str, ranges: Optional[RangeSet] = None - ) -> tuple[NDArrayTime, MaskedArrayAny]: ... - def blob_insert( - self, - column: str, - timestamps: NDArrayTime, - values: Union[MaskedArrayAny, NDArrayAny], - ) -> None: ... def column_id_by_index(self, index: int) -> str: ... def column_index_by_id(self, alias: str) -> int: ... def column_info_by_index(self, index: int) -> ColumnInfo: ... @@ -88,29 +79,11 @@ class Table(Entry): shard_size: datetime.timedelta = ..., ttl: datetime.timedelta = ..., ) -> None: ... - def double_get_ranges( - self, column: str, ranges: Optional[RangeSet] = None - ) -> tuple[NDArrayTime, MaskedArrayAny]: ... - def double_insert( - self, - column: str, - timestamps: NDArrayTime, - values: Union[MaskedArrayAny, NDArrayAny], - ) -> None: ... def erase_ranges(self, column: str, ranges: RangeSet) -> int: ... def get_shard_size(self) -> datetime.timedelta: ... def get_ttl(self) -> datetime.timedelta: ... def has_ttl(self) -> bool: ... def insert_columns(self, columns: list[ColumnInfo]) -> None: ... - def int64_get_ranges( - self, column: str, ranges: Optional[RangeSet] = None - ) -> tuple[NDArrayTime, MaskedArrayAny]: ... - def int64_insert( - self, - column: str, - timestamps: NDArrayTime, - values: Union[MaskedArrayAny, NDArrayAny], - ) -> None: ... def list_columns(self) -> list[ColumnInfo]: ... def reader( self, @@ -119,22 +92,4 @@ class Table(Entry): ranges: RangeSet = [], ) -> Reader: ... def retrieve_metadata(self) -> None: ... - def string_get_ranges( - self, column: str, ranges: Optional[RangeSet] = None - ) -> tuple[NDArrayTime, MaskedArrayAny]: ... - def string_insert( - self, - column: str, - timestamps: NDArrayTime, - values: Union[MaskedArrayAny, NDArrayAny], - ) -> None: ... def subscribe(self, conn: Any) -> Any: ... - def timestamp_get_ranges( - self, column: str, ranges: Optional[RangeSet] = None - ) -> tuple[NDArrayTime, MaskedArrayAny]: ... - def timestamp_insert( - self, - column: str, - timestamps: NDArrayTime, - values: Union[MaskedArrayAny, NDArrayAny], - ) -> None: ... diff --git a/quasardb/table.cpp b/quasardb/table.cpp index 818b920a..ccbf5aad 100644 --- a/quasardb/table.cpp +++ b/quasardb/table.cpp @@ -1,10 +1,6 @@ #include "table.hpp" -#include "dispatch.hpp" #include "metrics.hpp" -#include "object_tracker.hpp" #include "reader.hpp" -#include "traits.hpp" -#include "convert/point.hpp" #include // for make_unique namespace qdb @@ -12,64 +8,6 @@ namespace qdb namespace py = pybind11; -namespace detail -{ - -template -using point_type = typename traits::qdb_value::point_type; - -template -struct column_inserter; - -#define COLUMN_INSERTER_DECL(CTYPE, DTYPE, VALUE_TYPE, FN) \ - template <> \ - struct column_inserter \ - { \ - inline void operator()(handle_ptr handle, \ - std::string const & table, \ - std::string const & column, \ - pybind11::array const & timestamps, \ - qdb::masked_array const & values) \ - { \ - handle->check_open(); \ - \ - qdb::object_tracker::scoped_repository ctx{}; \ - qdb::object_tracker::scoped_capture capture{ctx}; \ - numpy::array::ensure(timestamps); \ - auto xs = convert::point_array(timestamps, values); \ - \ - qdb::qdb_throw_if_error( \ - *handle, FN(*handle, table.c_str(), column.c_str(), xs.data(), xs.size())); \ - }; \ - }; - -COLUMN_INSERTER_DECL(qdb_ts_column_int64, traits::int64_dtype, qdb_int_t, qdb_ts_int64_insert); -COLUMN_INSERTER_DECL(qdb_ts_column_int64, traits::int32_dtype, qdb_int_t, qdb_ts_int64_insert); -COLUMN_INSERTER_DECL(qdb_ts_column_int64, traits::int16_dtype, qdb_int_t, qdb_ts_int64_insert); -COLUMN_INSERTER_DECL(qdb_ts_column_double, traits::float64_dtype, double, qdb_ts_double_insert); -COLUMN_INSERTER_DECL(qdb_ts_column_double, traits::float32_dtype, double, qdb_ts_double_insert); - -COLUMN_INSERTER_DECL( - qdb_ts_column_timestamp, traits::datetime64_ns_dtype, qdb_timespec_t, qdb_ts_timestamp_insert); -COLUMN_INSERTER_DECL(qdb_ts_column_string, traits::unicode_dtype, qdb_string_t, qdb_ts_string_insert); -COLUMN_INSERTER_DECL(qdb_ts_column_blob, traits::pyobject_dtype, qdb_blob_t, qdb_ts_blob_insert); -COLUMN_INSERTER_DECL(qdb_ts_column_blob, traits::bytestring_dtype, qdb_blob_t, qdb_ts_blob_insert); - -#undef COLUMN_INSERTER_DECL - -template -inline void insert_column_dispatch(handle_ptr handle, - std::string const & table, - std::string const & column, - pybind11::array const & timestamps, - qdb::masked_array const & values) -{ - dispatch::by_dtype( - values.dtype(), handle, table, column, timestamps, values); -}; - -}; // namespace detail - void table::_cache_metadata() const { _handle->check_open(); @@ -105,155 +43,6 @@ void table::_cache_metadata() const _shard_size = std::chrono::milliseconds{metadata->shard_size}; } -qdb_uint_t table::erase_ranges(const std::string & column, py::object ranges) -{ - _handle->check_open(); - - auto ranges_ = qdb::convert_ranges(ranges); - - qdb_uint_t erased_count = 0; - - qdb::qdb_throw_if_error(*_handle, qdb_ts_erase_ranges(*_handle, _alias.c_str(), column.c_str(), - ranges_.data(), ranges_.size(), &erased_count)); - - return erased_count; -} - -// insert_ranges -void table::blob_insert( - const std::string & column, const pybind11::array & timestamps, const qdb::masked_array & values) -{ - detail::insert_column_dispatch(_handle, _alias, column, timestamps, values); -} - -void table::string_insert( - const std::string & column, const pybind11::array & timestamps, qdb::masked_array const & values) -{ - detail::insert_column_dispatch(_handle, _alias, column, timestamps, values); -} - -void table::double_insert( - const std::string & column, const pybind11::array & timestamps, qdb::masked_array const & values) -{ - detail::insert_column_dispatch(_handle, _alias, column, timestamps, values); -} - -void table::int64_insert( - const std::string & column, const pybind11::array & timestamps, qdb::masked_array const & values) -{ - detail::insert_column_dispatch(_handle, _alias, column, timestamps, values); -} - -void table::timestamp_insert( - const std::string & column, const pybind11::array & timestamps, const qdb::masked_array & values) -{ - detail::insert_column_dispatch( - _handle, _alias, column, timestamps, values); -} - -// get_ranges - -std::pair table::blob_get_ranges( - const std::string & column, py::object ranges) -{ - _handle->check_open(); - - qdb_ts_blob_point * points = nullptr; - qdb_size_t count = 0; - - auto ranges_ = qdb::convert_ranges(ranges); - - qdb::qdb_throw_if_error(*_handle, qdb_ts_blob_get_ranges(*_handle, _alias.c_str(), column.c_str(), - ranges_.data(), ranges_.size(), &points, &count)); - - auto ret = convert::point_array(points, count); - - qdb_release(*_handle, points); - - return ret; -} - -std::pair table::string_get_ranges( - const std::string & column, py::object ranges) -{ - _handle->check_open(); - - qdb_ts_string_point * points = nullptr; - qdb_size_t count = 0; - - auto ranges_ = qdb::convert_ranges(ranges); - - qdb::qdb_throw_if_error(*_handle, qdb_ts_string_get_ranges(*_handle, _alias.c_str(), column.c_str(), - ranges_.data(), ranges_.size(), &points, &count)); - - auto ret = convert::point_array(points, count); - - qdb_release(*_handle, points); - - return ret; -} - -std::pair table::double_get_ranges( - const std::string & column, py::object ranges) -{ - _handle->check_open(); - - qdb_ts_double_point * points = nullptr; - qdb_size_t count = 0; - - auto ranges_ = qdb::convert_ranges(ranges); - - qdb::qdb_throw_if_error(*_handle, qdb_ts_double_get_ranges(*_handle, _alias.c_str(), column.c_str(), - ranges_.data(), ranges_.size(), &points, &count)); - - auto ret = convert::point_array(points, count); - - qdb_release(*_handle, points); - - return ret; -} - -std::pair table::int64_get_ranges( - const std::string & column, py::object ranges) -{ - _handle->check_open(); - - qdb_ts_int64_point * points = nullptr; - qdb_size_t count = 0; - - auto ranges_ = qdb::convert_ranges(ranges); - - qdb::qdb_throw_if_error(*_handle, qdb_ts_int64_get_ranges(*_handle, _alias.c_str(), column.c_str(), - ranges_.data(), ranges_.size(), &points, &count)); - - auto ret = convert::point_array(points, count); - - qdb_release(*_handle, points); - - return ret; -} - -std::pair table::timestamp_get_ranges( - const std::string & column, py::object ranges) -{ - _handle->check_open(); - - qdb_ts_timestamp_point * points = nullptr; - qdb_size_t count = 0; - - auto ranges_ = qdb::convert_ranges(ranges); - - qdb::qdb_throw_if_error( - *_handle, qdb_ts_timestamp_get_ranges(*_handle, _alias.c_str(), column.c_str(), ranges_.data(), - ranges_.size(), &points, &count)); - - auto ret = convert::point_array(points, count); - - qdb_release(*_handle, points); - - return ret; -} - qdb::reader_ptr table::reader( // std::vector const & column_names, // std::size_t batch_size, // diff --git a/quasardb/table.hpp b/quasardb/table.hpp index 5e3541be..d9dba68b 100644 --- a/quasardb/table.hpp +++ b/quasardb/table.hpp @@ -253,41 +253,6 @@ class table : public entry } public: - qdb_uint_t erase_ranges(const std::string & column, py::object ranges); - -public: - void blob_insert(const std::string & column, - const pybind11::array & timestamps, - const qdb::masked_array & values); - void string_insert(const std::string & column, - const pybind11::array & timestamps, - qdb::masked_array const & values); - void double_insert(const std::string & column, - const pybind11::array & timestamps, - qdb::masked_array const & values); - void int64_insert(const std::string & column, - const pybind11::array & timestamps, - qdb::masked_array const & values); - void timestamp_insert(const std::string & column, - const pybind11::array & timestamps, - const qdb::masked_array & values); - -public: - std::pair blob_get_ranges( - const std::string & column, py::object ranges); - - std::pair string_get_ranges( - const std::string & column, py::object ranges); - - std::pair double_get_ranges( - const std::string & column, py::object ranges); - - std::pair int64_get_ranges( - const std::string & column, py::object ranges); - - std::pair timestamp_get_ranges( - const std::string & column, py::object ranges); - py::object subscribe(py::object conn) { auto firehose = py::module::import("quasardb.firehose"); @@ -351,29 +316,7 @@ static inline void register_table(Module & m) py::arg("batch_size") = std::size_t{0}, // py::arg("ranges") = std::vector{}) - .def("subscribe", &qdb::table::subscribe) - .def("erase_ranges", &qdb::table::erase_ranges) - .def("blob_insert", &qdb::table::blob_insert) - .def("string_insert", &qdb::table::string_insert) - .def("double_insert", &qdb::table::double_insert) - .def("int64_insert", &qdb::table::int64_insert) - .def("timestamp_insert", &qdb::table::timestamp_insert) - - .def("blob_get_ranges", &qdb::table::blob_get_ranges, // - py::arg("column"), // - py::arg("ranges") = py::none{}) - .def("string_get_ranges", &qdb::table::string_get_ranges, // - py::arg("column"), // - py::arg("ranges") = py::none{}) - .def("double_get_ranges", &qdb::table::double_get_ranges, // - py::arg("column"), // - py::arg("ranges") = py::none{}) - .def("int64_get_ranges", &qdb::table::int64_get_ranges, // - py::arg("column"), // - py::arg("ranges") = py::none{}) - .def("timestamp_get_ranges", &qdb::table::timestamp_get_ranges, // - py::arg("column"), // - py::arg("ranges") = py::none{}); + .def("subscribe", &qdb::table::subscribe); } } // namespace qdb From 49db90b20983541969247a598550fe6e38335ee4 Mon Sep 17 00:00:00 2001 From: vikonix Date: Mon, 13 Apr 2026 12:26:04 +0200 Subject: [PATCH 02/10] numpy tests --- quasardb/numpy/__init__.py | 123 ------------------------------------- tests/test_numpy.py | 75 ++++++++++++++++------ 2 files changed, 55 insertions(+), 143 deletions(-) diff --git a/quasardb/numpy/__init__.py b/quasardb/numpy/__init__.py index bb1f6f89..f712db63 100644 --- a/quasardb/numpy/__init__.py +++ b/quasardb/numpy/__init__.py @@ -597,129 +597,6 @@ def ensure_ma( return _ensure_ma(xs, dtype) -def read_array( - table: Optional[Table] = None, column: Optional[str] = None, ranges: Any = None -) -> Tuple[NDArrayTime, MaskedArrayAny]: - if table is None: - raise RuntimeError("A table is required.") - - if column is None: - raise RuntimeError("A column is required.") - - kwargs: Dict[str, Any] = {"column": column} - - if ranges is not None: - kwargs["ranges"] = ranges - - read_with = { - quasardb.ColumnType.Double: table.double_get_ranges, - quasardb.ColumnType.Blob: table.blob_get_ranges, - quasardb.ColumnType.String: table.string_get_ranges, - quasardb.ColumnType.Symbol: table.string_get_ranges, - quasardb.ColumnType.Int64: table.int64_get_ranges, - quasardb.ColumnType.Timestamp: table.timestamp_get_ranges, - } - - ctype = table.column_type_by_id(column) - - fn = read_with[ctype] - return fn(**kwargs) - - -def write_array( - data: Any = None, - index: Optional[NDArrayTime] = None, - table: Optional[Table] = None, - column: Optional[str] = None, - dtype: Optional[DType] = None, - infer_types: bool = True, -) -> None: - """ - Write a Numpy array to a single column. - - Parameters: - ----------- - - data: np.array - Numpy array with a dtype that is compatible with the column's type. - - index: np.array - Numpy array with a datetime64[ns] dtype that will be used as the - $timestamp axis for the data to be stored. - - dtype: optional np.dtype - If provided, ensures the data array is converted to this dtype before - insertion. - - infer_types: optional bool - If true, when necessary will attempt to convert the data and index array - to the best type for the column. For example, if you provide float64 data - while the column's type is int64, it will automatically convert the data. - - Defaults to True. For production use cases where you want to avoid implicit - conversions, we recommend always setting this to False. - - """ - - if table is None: - raise RuntimeError("A table is required.") - - if column is None: - raise RuntimeError("A column is required.") - - if data is None: - raise RuntimeError("A data numpy array is required.") - - if index is None: - raise RuntimeError("An index numpy timestamp array is required.") - - data = ensure_ma(data, dtype=dtype) - ctype = table.column_type_by_id(column) - - # We try to reuse some of the other functions, which assume array-like - # shapes for column info and data. It's a bit hackish, but actually works - # well. - # - # We should probably generalize this block of code with the same found in - # write_arrays(). - - cinfos = [(column, ctype)] - dtype_: List[Optional[DType]] = [dtype] - - dtype_ = _coerce_dtype(dtype_, cinfos) - - if infer_types is True: - dtype_ = _add_desired_dtypes(dtype_, cinfos) - - # data_ = an array of [data] - data_ = [data] - data_ = _coerce_data(data_, dtype_) - _validate_dtypes(data_, cinfos) - - # No functions that assume array-of-data anymore, let's put it back - data = data_[0] - - # Dispatch to the correct function - write_with = { - quasardb.ColumnType.Double: table.double_insert, - quasardb.ColumnType.Blob: table.blob_insert, - quasardb.ColumnType.String: table.string_insert, - quasardb.ColumnType.Symbol: table.string_insert, - quasardb.ColumnType.Int64: table.int64_insert, - quasardb.ColumnType.Timestamp: table.timestamp_insert, - } - - logger.info( - "Writing array (%d rows of dtype %s) to columns %s.%s (type %s)", - len(data), - data.dtype, - table.get_name(), - column, - ctype, - ) - write_with[ctype](column, index, data) - - def _concat_masked(xs: List[MaskedArrayAny]) -> MaskedArrayAny: if len(xs) == 0: return ma.masked_array(np.array([])) diff --git a/tests/test_numpy.py b/tests/test_numpy.py index e816f3e0..f2ae99cc 100644 --- a/tests/test_numpy.py +++ b/tests/test_numpy.py @@ -28,48 +28,83 @@ def _unicode_to_object_array(xs): return ma.masked_array(data=data, mask=mask) +def _read_single_array(conn, table, column, *, ranges=None): + idx, xs = qdbnp.read_arrays( + conn, + [table], + column_names=[column], + ranges=ranges, + ) + return idx, xs[column] + + +def _write_single_array( + conn, table, column, data, index, *, dtype=None, infer_types=True, **kwargs +): + payload = {column: data} + if dtype is not None: + dtype = {column: dtype} + + return qdbnp.write_arrays( + payload, + conn, + table, + index=index, + dtype=dtype, + infer_types=infer_types, + **kwargs, + ) + + ###### # # Array tests # ### # -# The 'array' functions operate on just a single array. They use the column-oriented -# APIs, `qdb_ts_*_insert` under the hood. +# Single-column read/write tests now go through the batched array API. # ###### @conftest.override_cdtypes("native") -def test_array_read_write_native_dtypes(array_with_index_and_table): +def test_array_read_write_native_dtypes(array_with_index_and_table, qdbd_connection): """ - * qdbnp.write_array() - * => qdb_ts_*_insert + * qdbnp.write_arrays() * => no conversion """ (ctype, dtype, data, index, table) = array_with_index_and_table col = table.column_id_by_index(0) - qdbnp.write_array(data, index, table, column=col, dtype=dtype, infer_types=False) + _write_single_array( + qdbd_connection, + table, + col, + data, + index, + dtype=dtype, + infer_types=False, + ) - res = qdbnp.read_array(table, col) + res = _read_single_array(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @conftest.override_cdtypes("inferrable") -def test_array_read_write_inferrable_dtypes(array_with_index_and_table): +def test_array_read_write_inferrable_dtypes( + array_with_index_and_table, qdbd_connection +): """ - * qdbnp.write_array() - * => qdb_ts_*_insert + * qdbnp.write_arrays() * => conversion in python """ (ctype, dtype, data, index, table) = array_with_index_and_table col = table.column_id_by_index(0) - qdbnp.write_array(data, index, table, column=col, infer_types=True) + _write_single_array(qdbd_connection, table, col, data, index, infer_types=True) - res = qdbnp.read_array(table, col) + res = _read_single_array(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -93,7 +128,7 @@ def test_arrays_read_write_native_dtypes(array_with_index_and_table, qdbd_connec push_mode=quasardb.WriterPushMode.Truncate, ) - res = qdbnp.read_array(table, col) + res = _read_single_array(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -113,7 +148,7 @@ def test_arrays_read_write_inferrable_dtypes( col = table.column_id_by_index(0) qdbnp.write_arrays([data], qdbd_connection, table, index=index, infer_types=True) - res = qdbnp.read_array(table, col) + res = _read_single_array(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -137,7 +172,7 @@ def test_arrays_read_write_data_as_dict(array_with_index_and_table, qdbd_connect push_mode=quasardb.WriterPushMode.Truncate, ) - res = qdbnp.read_array(table, col) + res = _read_single_array(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -162,7 +197,7 @@ def test_provide_index_as_dict(array_with_index_and_table, qdbd_connection): truncate=True, ) - res = qdbnp.read_array(table, col) + res = _read_single_array(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -260,7 +295,7 @@ def _do_write_read(xs): dtype=dtype, infer_types=False, ) - return qdbnp.read_array(table, col) + return _read_single_array(qdbd_connection, table, col) res1 = _do_write_read(data1) assert_indexed_arrays_equal((index, data1), res1) @@ -311,7 +346,7 @@ def test_string_array_returns_unicode(array_with_index_and_table, qdbd_connectio col = table.column_id_by_index(0) qdbnp.write_arrays([data], qdbd_connection, table, index=index) - (idx, xs) = qdbnp.read_array(table, col) + (idx, xs) = _read_single_array(qdbd_connection, table, col) assert qdbnp.dtypes_equal(xs.dtype, np.dtype("unicode")) @@ -734,7 +769,7 @@ def test_regression_sc11333(qdbd_connection, table_name, start_date, row_count): for i in range(len(cols)): col = cols[i] - res = qdbnp.read_array(t, col) + res = _read_single_array(qdbd_connection, t, col) assert_indexed_arrays_equal((idx, data[i]), res) @@ -746,7 +781,7 @@ def test_write_through_flag(arrays_with_index_and_table, qdbd_connection): [data[0]], qdbd_connection, table, index=index, write_through=True ) - res = qdbnp.read_array(table, col) + res = _read_single_array(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data[0]), res) From a5957c9d63779078413cda9bbc92c35f3e12784f Mon Sep 17 00:00:00 2001 From: vikonix Date: Mon, 13 Apr 2026 12:46:42 +0200 Subject: [PATCH 03/10] tests 1 --- tests/test_batch_inserter.py | 109 +++++++++++++++-------------- tests/test_continuous.py | 32 +++++---- tests/test_query.py | 130 ++++++++++++++++++++++++----------- tests/test_writer.py | 54 ++++++++------- 4 files changed, 200 insertions(+), 125 deletions(-) diff --git a/tests/test_batch_inserter.py b/tests/test_batch_inserter.py index b5eabe32..268c1829 100644 --- a/tests/test_batch_inserter.py +++ b/tests/test_batch_inserter.py @@ -8,6 +8,7 @@ import pytest import quasardb import numpy as np +import quasardb.numpy as qdbnp def _row_insertion_method( @@ -80,37 +81,46 @@ def _set_batch_inserter_data(inserter, intervals, data, start=0): inserter.set_string(5, symbols[i]) -def _assert_results(table, intervals, data): - (doubles, integers, blobs, strings, timestamps, symbols) = data - - whole_range = (intervals[0], intervals[-1:][0] + np.timedelta64(2, "s")) - results = table.double_get_ranges(tslib._double_col_name(table), [whole_range]) +def _read_column(conn, table, column, *, ranges=None): + idx, xs = qdbnp.read_arrays( + conn, + [table], + column_names=[column], + ranges=ranges, + ) + return idx, xs[column] - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], doubles) - results = table.blob_get_ranges(tslib._blob_col_name(table), [whole_range]) - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], blobs) +def _read_all_columns(conn, table, *, ranges=None): + column_names = [ + tslib._double_col_name(table), + tslib._blob_col_name(table), + tslib._string_col_name(table), + tslib._int64_col_name(table), + tslib._ts_col_name(table), + tslib._symbol_col_name(table), + ] + return qdbnp.read_arrays(conn, [table], column_names=column_names, ranges=ranges) - results = table.string_get_ranges(tslib._string_col_name(table), [whole_range]) - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], strings) - results = table.int64_get_ranges(tslib._int64_col_name(table), [whole_range]) - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], integers) +def _assert_results(conn, table, intervals, data): + (doubles, integers, blobs, strings, timestamps, symbols) = data - results = table.timestamp_get_ranges(tslib._ts_col_name(table), [whole_range]) - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], timestamps) + whole_range = (intervals[0], intervals[-1:][0] + np.timedelta64(2, "s")) + idx, xs = _read_all_columns(conn, table, ranges=[whole_range]) - results = table.string_get_ranges(tslib._symbol_col_name(table), [whole_range]) - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], symbols) + np.testing.assert_array_equal(idx, intervals) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) + np.testing.assert_array_equal(xs[tslib._blob_col_name(table)], blobs) + np.testing.assert_array_equal(xs[tslib._string_col_name(table)], strings) + np.testing.assert_array_equal(xs[tslib._int64_col_name(table)], integers) + np.testing.assert_array_equal(xs[tslib._ts_col_name(table)], timestamps) + np.testing.assert_array_equal(xs[tslib._symbol_col_name(table)], symbols) -def _test_with_table(inserter, table, intervals, push_method=_regular_push, data=None): +def _test_with_table( + conn, inserter, table, intervals, push_method=_regular_push, data=None +): if data is None: data = _generate_data(len(intervals)) @@ -123,30 +133,17 @@ def _test_with_table(inserter, table, intervals, push_method=_regular_push, data _set_batch_inserter_data(inserter, intervals, data) # before the push, there is nothing - results = table.double_get_ranges(tslib._double_col_name(table), [whole_range]) - assert len(results[0]) == 0 - - results = table.blob_get_ranges(tslib._blob_col_name(table), [whole_range]) - assert len(results[0]) == 0 - - results = table.string_get_ranges(tslib._string_col_name(table), [whole_range]) - assert len(results[0]) == 0 - - results = table.int64_get_ranges(tslib._int64_col_name(table), [whole_range]) - assert len(results[0]) == 0 - - results = table.timestamp_get_ranges(tslib._ts_col_name(table), [whole_range]) - assert len(results[0]) == 0 - - results = table.string_get_ranges(tslib._symbol_col_name(table), [whole_range]) - assert len(results[0]) == 0 + idx, xs = _read_all_columns(conn, table, ranges=[whole_range]) + assert len(idx) == 0 + for values in xs.values(): + assert len(values) == 0 # after push, there is everything push_method(inserter) if push_method == _async_push: sleep(20) - _assert_results(table, intervals, data) + _assert_results(conn, table, intervals, data) return doubles, blobs, strings, integers, timestamps, symbols @@ -154,7 +151,7 @@ def _test_with_table(inserter, table, intervals, push_method=_regular_push, data def test_successful_bulk_row_insert(qdbd_connection, table, many_intervals): inserter = qdbd_connection.inserter(_make_inserter_info(table)) - _test_with_table(inserter, table, many_intervals, _regular_push) + _test_with_table(qdbd_connection, inserter, table, many_intervals, _regular_push) def test_successful_secure_bulk_row_insert( @@ -162,7 +159,9 @@ def test_successful_secure_bulk_row_insert( ): inserter = qdbd_secure_connection.inserter(_make_inserter_info(secure_table)) - _test_with_table(inserter, secure_table, many_intervals, _regular_push) + _test_with_table( + qdbd_secure_connection, inserter, secure_table, many_intervals, _regular_push + ) @pytest.mark.skip(reason="Skip slow tests") @@ -172,7 +171,7 @@ def test_successful_async_bulk_row_insert(qdbd_connection, table, many_intervals # This allows us to test the `push_async` feature inserter = qdbd_connection.inserter(_make_inserter_info(table)) - _test_with_table(inserter, table, many_intervals, _async_push) + _test_with_table(qdbd_connection, inserter, table, many_intervals, _async_push) def test_successful_fast_bulk_row_insert(qdbd_connection, table, many_intervals): @@ -180,7 +179,7 @@ def test_successful_fast_bulk_row_insert(qdbd_connection, table, many_intervals) # This allows us to test the `push_async` feature inserter = qdbd_connection.inserter(_make_inserter_info(table)) - _test_with_table(inserter, table, many_intervals, _fast_push) + _test_with_table(qdbd_connection, inserter, table, many_intervals, _fast_push) def test_failed_local_table_with_wrong_columns(qdbd_connection, entry_name): @@ -204,7 +203,9 @@ def test_push_truncate_implicit_range(qdbd_connection, table, many_intervals): inserter.push() # Compare results, should be equal - results = table.double_get_ranges(tslib._double_col_name(table), [whole_range]) + results = _read_column( + qdbd_connection, table, tslib._double_col_name(table), ranges=[whole_range] + ) np.testing.assert_array_equal(results[0], many_intervals) np.testing.assert_array_equal(results[1], doubles) @@ -214,7 +215,9 @@ def test_push_truncate_implicit_range(qdbd_connection, table, many_intervals): inserter.push() # Compare results, should now have the same data twice - results = table.double_get_ranges(tslib._double_col_name(table), [whole_range]) + results = _read_column( + qdbd_connection, table, tslib._double_col_name(table), ranges=[whole_range] + ) assert len(results[1]) == 2 * len(doubles) @@ -224,7 +227,9 @@ def test_push_truncate_implicit_range(qdbd_connection, table, many_intervals): # Verify results, truncating should now make things the same # as the beginning again. - results = table.double_get_ranges(tslib._double_col_name(table), [whole_range]) + results = _read_column( + qdbd_connection, table, tslib._double_col_name(table), ranges=[whole_range] + ) np.testing.assert_array_equal(results[0], many_intervals) np.testing.assert_array_equal(results[1], doubles) @@ -249,7 +254,9 @@ def test_push_truncate_explicit_range(qdbd_connection, table, many_intervals): # Verify results, truncating should now make things the same # as the beginning again. - results = table.double_get_ranges(tslib._double_col_name(table), [whole_range]) + results = _read_column( + qdbd_connection, table, tslib._double_col_name(table), ranges=[whole_range] + ) np.testing.assert_array_equal(results[0], many_intervals) np.testing.assert_array_equal(results[1], doubles) @@ -262,7 +269,9 @@ def test_push_truncate_explicit_range(qdbd_connection, table, many_intervals): # Verify results, truncating should now make things the same # as the beginning again. - results = table.double_get_ranges(tslib._double_col_name(table), [whole_range]) + results = _read_column( + qdbd_connection, table, tslib._double_col_name(table), ranges=[whole_range] + ) np.testing.assert_array_equal(results[0], many_intervals[1:]) np.testing.assert_array_equal(results[1], doubles[1:]) diff --git a/tests/test_continuous.py b/tests/test_continuous.py index 457c3a60..b04a37f9 100644 --- a/tests/test_continuous.py +++ b/tests/test_continuous.py @@ -2,15 +2,21 @@ import pytest import quasardb import numpy as np +import quasardb.numpy as qdbnp import test_table as tslib import time import datetime -def _insert_double_points(table, start_time, points=10): +def _insert_double_points(conn, table, start_time, points=10): inserted_double_data = tslib._generate_double_ts(start_time, points) - table.double_insert( - tslib._double_col_name(table), inserted_double_data[0], inserted_double_data[1] + qdbnp.write_arrays( + {tslib._double_col_name(table): inserted_double_data[1]}, + conn, + table, + index=inserted_double_data[0], + infer_types=False, + dtype={tslib._double_col_name(table): inserted_double_data[1].dtype}, ) return inserted_double_data @@ -72,7 +78,7 @@ def _test_against_table(res, table, data): @pytest.mark.skip(reason="Flaky test -- tracked in QDB-14766 in shortcut") def test_returns_rows_full(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 1) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 1) q = 'select * from "' + table.get_name() + '"' cont = qdbd_connection.query_continuous_full( q, datetime.timedelta(milliseconds=100) @@ -85,7 +91,7 @@ def test_returns_rows_full(qdbd_connection, table, intervals): @pytest.mark.skip(reason="Flaky test -- tracked in QDB-14766 in shortcut") def test_returns_rows_full_probe(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 1) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 1) q = 'select * from "' + table.get_name() + '"' cont = qdbd_connection.query_continuous_full( q, datetime.timedelta(milliseconds=100) @@ -100,7 +106,7 @@ def test_returns_rows_full_probe(qdbd_connection, table, intervals): @pytest.mark.skip(reason="Flaky test -- tracked in QDB-14766 in shortcut") def test_returns_rows_full_iterator(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 1) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 1) q = 'select * from "' + table.get_name() + '"' cont = qdbd_connection.query_continuous_full( q, datetime.timedelta(milliseconds=100) @@ -114,7 +120,7 @@ def test_returns_rows_full_iterator(qdbd_connection, table, intervals): @pytest.mark.skip(reason="Flaky test -- tracked in QDB-14766 in shortcut") def test_returns_rows_new_values(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 1) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 1) q = 'select * from "' + table.get_name() + '"' cont = qdbd_connection.query_continuous_new_values( q, datetime.timedelta(milliseconds=100) @@ -127,7 +133,7 @@ def test_returns_rows_new_values(qdbd_connection, table, intervals): @pytest.mark.skip(reason="Flaky test -- tracked in QDB-14766 in shortcut") def test_returns_rows_new_values_probe(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 1) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 1) q = 'select * from "' + table.get_name() + '"' cont = qdbd_connection.query_continuous_new_values( q, datetime.timedelta(milliseconds=100) @@ -142,7 +148,7 @@ def test_returns_rows_new_values_probe(qdbd_connection, table, intervals): @pytest.mark.skip(reason="Flaky test -- tracked in QDB-14766 in shortcut") def test_returns_rows_new_value_iterator(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 1) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 1) q = 'select * from "' + table.get_name() + '"' cont = qdbd_connection.query_continuous_new_values( q, datetime.timedelta(milliseconds=100) @@ -171,7 +177,7 @@ def __wait_for(cont, f, max_ticks=10): @pytest.mark.skip(reason="Flaky test -- tracked in QDB-14766 in shortcut") def test_returns_rows_full_value_iterator_multiple(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 1) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 1) q = 'select * from "' + table.get_name() + '"' cont = qdbd_connection.query_continuous_full( q, datetime.timedelta(milliseconds=100) @@ -180,7 +186,7 @@ def test_returns_rows_full_value_iterator_multiple(qdbd_connection, table, inter assert True is __wait_for(cont, lambda x: len(x) == 1) start_time += np.timedelta64(1, "m") - inserted_double_data = _insert_double_points(table, start_time, 1) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 1) assert True is __wait_for(cont, lambda x: len(x) == 2) @@ -188,7 +194,7 @@ def test_returns_rows_full_value_iterator_multiple(qdbd_connection, table, inter @pytest.mark.skip(reason="Flaky test -- tracked in QDB-14766 in shortcut") def test_returns_rows_new_value_iterator_multiple(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 1) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 1) q = 'select * from "' + table.get_name() + '"' cont = qdbd_connection.query_continuous_new_values( q, datetime.timedelta(milliseconds=100) @@ -197,6 +203,6 @@ def test_returns_rows_new_value_iterator_multiple(qdbd_connection, table, interv assert True is __wait_for(cont, lambda x: len(x) == 1) start_time += np.timedelta64(1, "m") - inserted_double_data = _insert_double_points(table, start_time, 1) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 1) assert True is __wait_for(cont, lambda x: len(x) == 1) diff --git a/tests/test_query.py b/tests/test_query.py index 9593adde..20e90e77 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -2,58 +2,94 @@ import pytest import quasardb import numpy as np +import quasardb.numpy as qdbnp import test_table as tslib -def _insert_double_points(table, start_time, points=10): +def _write_points(conn, table, column, xs): + qdbnp.write_arrays( + {column: xs[1]}, + conn, + table, + index=xs[0], + infer_types=False, + dtype={column: xs[1].dtype}, + ) + return xs + + +def _write_multiple_points(conn, table, data): + payload = {column: xs[1] for column, xs in data.items()} + dtypes = {column: xs[1].dtype for column, xs in data.items()} + index = next(iter(data.values()))[0] + + qdbnp.write_arrays( + payload, + conn, + table, + index=index, + infer_types=False, + dtype=dtypes, + ) + return data + + +def _insert_double_points(conn, table, start_time, points=10): inserted_double_data = tslib._generate_double_ts(start_time, points) - table.double_insert( - tslib._double_col_name(table), inserted_double_data[0], inserted_double_data[1] + return _write_points( + conn, + table, + tslib._double_col_name(table), + inserted_double_data, ) - return inserted_double_data -def _insert_blob_points(table, start_time, points=10): +def _insert_blob_points(conn, table, start_time, points=10): inserted_blob_data = tslib._generate_blob_ts(start_time, points) - table.blob_insert( - tslib._blob_col_name(table), inserted_blob_data[0], inserted_blob_data[1] + return _write_points( + conn, + table, + tslib._blob_col_name(table), + inserted_blob_data, ) - return inserted_blob_data -def _insert_string_points(table, start_time, points=10): +def _insert_string_points(conn, table, start_time, points=10): xs = tslib._generate_string_ts(start_time, points) - table.string_insert(tslib._string_col_name(table), xs[0], xs[1]) - return xs + return _write_points(conn, table, tslib._string_col_name(table), xs) -def _insert_int64_points(table, start_time, points=10): +def _insert_int64_points(conn, table, start_time, points=10): inserted_int64_data = tslib._generate_int64_ts(start_time, points) - table.int64_insert( - tslib._int64_col_name(table), inserted_int64_data[0], inserted_int64_data[1] + return _write_points( + conn, + table, + tslib._int64_col_name(table), + inserted_int64_data, ) - return inserted_int64_data -def _insert_timestamp_points(table, start_time, points=10): +def _insert_timestamp_points(conn, table, start_time, points=10): inserted_timestamp_data = tslib._generate_timestamp_ts( start_time, start_time, points ) - table.timestamp_insert( + return _write_points( + conn, + table, tslib._ts_col_name(table), - inserted_timestamp_data[0], - inserted_timestamp_data[1], + inserted_timestamp_data, ) - return inserted_timestamp_data -def _insert_symbol_points(table, start_time, points=10): +def _insert_symbol_points(conn, table, start_time, points=10): inserted_symbol_data = tslib._generate_symbol_ts(start_time, points) - table.symbol_insert( - tslib._ts_col_name(table), inserted_symbol_data[0], inserted_symbol_data[1] + return _write_points( + conn, + table, + tslib._symbol_col_name(table), + inserted_symbol_data, ) - return inserted_symbol_data point_inserter_by_type = { @@ -66,7 +102,9 @@ def _insert_symbol_points(table, start_time, points=10): } -def _insert_points(value_type, table, start_time=None, intervals=None, points=10): +def _insert_points( + value_type, conn, table, start_time=None, intervals=None, points=10 +): if start_time is None: assert intervals is not None start_time = tslib._start_time(intervals) @@ -74,7 +112,7 @@ def _insert_points(value_type, table, start_time=None, intervals=None, points=10 assert start_time is not None fn = point_inserter_by_type[value_type] - return fn(table, start_time, points) + return fn(conn, table, start_time, points) def _column_name(table, value_type): @@ -169,7 +207,7 @@ def test_returns_empty_result(qdbd_connection, table): def test_returns_table_as_string(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 10) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 10) query = ( 'select * from "' + table.get_name() @@ -187,7 +225,7 @@ def test_returns_table_as_string(qdbd_connection, table, intervals): def test_returns_table_as_blob(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 10) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 10) query = ( 'select * from "' + table.get_name() @@ -205,7 +243,7 @@ def test_returns_table_as_blob(qdbd_connection, table, intervals): def test_returns_inserted_data_with_star_select(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 10) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 10) query = ( 'select * from "' + table.get_name() @@ -234,7 +272,9 @@ def test_returns_inserted_data_with_star_select(qdbd_connection, table, interval def test_supports_all_column_types( value_type, query_handler, qdbd_connection, table, intervals ): - inserted_data = _insert_points(value_type, table, intervals=intervals) + inserted_data = _insert_points( + value_type, qdbd_connection, table, intervals=intervals + ) column_name = _column_name(table, value_type) query = 'SELECT "{}" FROM "{}"'.format(column_name, table.get_name()) @@ -278,7 +318,7 @@ def test_query_handler_benchmark( raise RuntimeError("Unrecognized query handler: {}".format(query_handler)) inserted_data = _insert_points( - value_type, table, intervals=intervals, points=row_count + value_type, qdbd_connection, table, intervals=intervals, points=row_count ) column_name = _column_name(table, value_type) query = 'SELECT "{}" FROM "{}"'.format(column_name, table.get_name()) @@ -288,7 +328,7 @@ def test_query_handler_benchmark( def test_returns_inserted_data_with_column_select(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 10) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 10) query = ( "select " + tslib._double_col_name(table) @@ -315,7 +355,7 @@ def test_returns_inserted_data_with_column_select(qdbd_connection, table, interv def test_returns_inserted_data_with_specific_select(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 10) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 10) query = ( "select $timestamp, $table, " + tslib._double_col_name(table) @@ -340,7 +380,7 @@ def test_returns_inserted_data_with_specific_select(qdbd_connection, table, inte def test_returns_count_data_with_count_select(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - _ = _insert_double_points(table, start_time, 10) + _ = _insert_double_points(qdbd_connection, table, start_time, 10) query = ( "select count(" + tslib._double_col_name(table) @@ -358,7 +398,7 @@ def test_returns_count_data_with_count_select(qdbd_connection, table, intervals) def test_returns_count_data_with_sum_select(qdbd_connection, table, intervals): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 10) + inserted_double_data = _insert_double_points(qdbd_connection, table, start_time, 10) query = ( "select sum(" + tslib._double_col_name(table) @@ -378,10 +418,22 @@ def test_returns_inserted_multi_data_with_star_select( qdbd_connection, table, intervals ): start_time = tslib._start_time(intervals) - inserted_double_data = _insert_double_points(table, start_time, 100) - inserted_blob_data = _insert_blob_points(table, start_time, 100) - inserted_int64_data = _insert_int64_points(table, start_time, 100) - inserted_timestamp_data = _insert_timestamp_points(table, start_time, 100) + inserted_data = _write_multiple_points( + qdbd_connection, + table, + { + tslib._double_col_name(table): tslib._generate_double_ts(start_time, 100), + tslib._blob_col_name(table): tslib._generate_blob_ts(start_time, 100), + tslib._int64_col_name(table): tslib._generate_int64_ts(start_time, 100), + tslib._ts_col_name(table): tslib._generate_timestamp_ts( + start_time, start_time, 100 + ), + }, + ) + inserted_double_data = inserted_data[tslib._double_col_name(table)] + inserted_blob_data = inserted_data[tslib._blob_col_name(table)] + inserted_int64_data = inserted_data[tslib._int64_col_name(table)] + inserted_timestamp_data = inserted_data[tslib._ts_col_name(table)] query = ( 'select * from "' diff --git a/tests/test_writer.py b/tests/test_writer.py index bfe27eeb..a123f27a 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -8,6 +8,7 @@ import pytest import quasardb import numpy as np +import quasardb.numpy as qdbnp def _generate_data(count, start=np.datetime64("2017-01-01", "ns")): @@ -396,31 +397,38 @@ def _set_batch_writer_data(writer, table, intervals, data, start=0): writer.set_string(5, symbols[i]) -def _assert_results(table, intervals, data): - (doubles, integers, blobs, strings, timestamps, symbols) = data - - whole_range = (intervals[0], intervals[-1:][0] + np.timedelta64(2, "s")) - results = table.double_get_ranges(tslib._double_col_name(table), [whole_range]) - - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], doubles) +def _read_column(conn, table, column, *, ranges=None): + idx, xs = qdbnp.read_arrays( + conn, + [table], + column_names=[column], + ranges=ranges, + ) + return idx, xs[column] - results = table.blob_get_ranges(tslib._blob_col_name(table), [whole_range]) - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], blobs) - results = table.string_get_ranges(tslib._string_col_name(table), [whole_range]) - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], strings) +def _read_all_columns(conn, table, *, ranges=None): + column_names = [ + tslib._double_col_name(table), + tslib._blob_col_name(table), + tslib._string_col_name(table), + tslib._int64_col_name(table), + tslib._ts_col_name(table), + tslib._symbol_col_name(table), + ] + return qdbnp.read_arrays(conn, [table], column_names=column_names, ranges=ranges) - results = table.int64_get_ranges(tslib._int64_col_name(table), [whole_range]) - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], integers) - results = table.timestamp_get_ranges(tslib._ts_col_name(table), [whole_range]) - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], timestamps) +def _assert_results(conn, table, intervals, data): + (doubles, integers, blobs, strings, timestamps, symbols) = data - results = table.string_get_ranges(tslib._symbol_col_name(table), [whole_range]) - np.testing.assert_array_equal(results[0], intervals) - np.testing.assert_array_equal(results[1], symbols) + whole_range = (intervals[0], intervals[-1:][0] + np.timedelta64(2, "s")) + idx, xs = _read_all_columns(conn, table, ranges=[whole_range]) + + np.testing.assert_array_equal(idx, intervals) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) + np.testing.assert_array_equal(xs[tslib._blob_col_name(table)], blobs) + np.testing.assert_array_equal(xs[tslib._string_col_name(table)], strings) + np.testing.assert_array_equal(xs[tslib._int64_col_name(table)], integers) + np.testing.assert_array_equal(xs[tslib._ts_col_name(table)], timestamps) + np.testing.assert_array_equal(xs[tslib._symbol_col_name(table)], symbols) From f3625a421047fba7b61884236363b70e293ef72f Mon Sep 17 00:00:00 2001 From: vikonix Date: Mon, 13 Apr 2026 13:08:34 +0200 Subject: [PATCH 04/10] tests 2 --- quasardb/pandas/__init__.py | 44 ++++++++++++++++++------------------- tests/test_pandas.py | 41 ---------------------------------- 2 files changed, 22 insertions(+), 63 deletions(-) diff --git a/quasardb/pandas/__init__.py b/quasardb/pandas/__init__.py index 9c72c47e..5aa8b909 100644 --- a/quasardb/pandas/__init__.py +++ b/quasardb/pandas/__init__.py @@ -108,34 +108,31 @@ def read_series( ranges : list A list of ranges to read, represented as tuples of Numpy datetime64[ns] objects. """ - read_with = { - quasardb.ColumnType.Double: table.double_get_ranges, - quasardb.ColumnType.Blob: table.blob_get_ranges, - quasardb.ColumnType.String: table.string_get_ranges, - quasardb.ColumnType.Int64: table.int64_get_ranges, - quasardb.ColumnType.Timestamp: table.timestamp_get_ranges, - quasardb.ColumnType.Symbol: table.string_get_ranges, - } - - kwargs: Dict[str, Any] = {"column": col_name} - - if ranges is not None: - kwargs["ranges"] = ranges - - # Dispatch based on column type t = table.column_type_by_id(col_name) logger.info( "reading Series from column %s.%s with type %s", table.get_name(), col_name, t ) - res = (read_with[t])(**kwargs) + reader_kwargs: Dict[str, Any] = {"column_names": [col_name]} + if ranges is not None: + reader_kwargs["ranges"] = ranges + + with table.reader(**reader_kwargs) as reader: + try: + idx, xs = qdbnp._concat_array_batches( + qdbnp._reader_batch_to_arrays(batch) for batch in reader + ) + except ValueError: + idx = np.array([], dtype=np.dtype("datetime64[ns]")) + xs = {} - return pd.Series(res[1], index=res[0]) + return pd.Series(xs.get(col_name, np.array([], dtype=object)), index=idx) def write_series( series: pd.Series, + cluster: Cluster, table: Table, col_name: str, infer_types: bool = True, @@ -151,6 +148,9 @@ def write_series( Pandas Series, with a numpy.datetime64[ns] as index. Underlying data will be attempted to be transformed to appropriate QuasarDB type. + cluster : quasardb.Cluster + Active connection to the QuasarDB cluster. + table : quasardb.Timeseries QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table') @@ -179,12 +179,12 @@ def write_series( assert data is not None assert index is not None - qdbnp.write_array( - data=data, - index=index, + qdbnp.write_arrays( + data={col_name: data}, + cluster=cluster, table=table, - column=col_name, - dtype=dtype, + index=index, + dtype={col_name: dtype} if dtype is not None else None, infer_types=infer_types, ) diff --git a/tests/test_pandas.py b/tests/test_pandas.py index b7e480f6..b9410617 100644 --- a/tests/test_pandas.py +++ b/tests/test_pandas.py @@ -93,47 +93,6 @@ def gen_df(start_time, count, step=1, unit="D"): ) -def gen_series(start_time, count): - idx = pd.date_range(start_time, periods=count, freq="s") - - return { - "the_double": pd.Series(np.random.uniform(-100.0, 100.0, count), index=idx), - "the_int64": pd.Series(np.random.randint(-100, 100, count), index=idx), - "the_blob": pd.Series( - np.array( - list(np.random.bytes(np.random.randint(16, 32)) for i in range(count)), - "O", - ), - index=idx, - ), - "the_string": pd.Series( - np.array([("content_" + str(item)) for item in range(count)], "U"), - index=idx, - ), - "the_ts": pd.Series( - np.array( - [(start_time + np.timedelta64(i, "s")) for i in range(count)] - ).astype("datetime64[ns]"), - index=idx, - ), - "the_symbol": pd.Series( - np.array([("symbol_" + str(item)) for item in range(count)], "U"), index=idx - ), - } - - -def test_series_read_write(series_with_table): - (ctype, dtype, series, table) = series_with_table - - col = table.column_id_by_index(0) - qdbpd.write_series(series, table, col, dtype=dtype) - - # Read everything - res = qdbpd.read_series(table, col) - - _assert_series_equal(series, res) - - def test_dataframe(qdbpd_write_fn, df_with_table, qdbd_connection): (_, _, df1, table) = df_with_table qdbpd_write_fn(df1, qdbd_connection, table) From d69fa09ba1b90187f3fe2b7902f0395340ad2abb Mon Sep 17 00:00:00 2001 From: vikonix Date: Mon, 13 Apr 2026 13:14:34 +0200 Subject: [PATCH 05/10] panda read/write series --- quasardb/pandas/__init__.py | 99 ------------------------------------- 1 file changed, 99 deletions(-) diff --git a/quasardb/pandas/__init__.py b/quasardb/pandas/__init__.py index 5aa8b909..9c9099bd 100644 --- a/quasardb/pandas/__init__.py +++ b/quasardb/pandas/__init__.py @@ -90,105 +90,6 @@ class PandasRequired(ImportError): TableLike = Union[str, Table] -def read_series( - table: Table, col_name: str, ranges: Optional[RangeSet] = None -) -> pd.Series: - """ - Read a Pandas Timeseries from a single column. - - Parameters: - ----------- - - table : quasardb.Timeseries - QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table') - - col_name : str - Name of the column to read. - - ranges : list - A list of ranges to read, represented as tuples of Numpy datetime64[ns] objects. - """ - t = table.column_type_by_id(col_name) - - logger.info( - "reading Series from column %s.%s with type %s", table.get_name(), col_name, t - ) - - reader_kwargs: Dict[str, Any] = {"column_names": [col_name]} - if ranges is not None: - reader_kwargs["ranges"] = ranges - - with table.reader(**reader_kwargs) as reader: - try: - idx, xs = qdbnp._concat_array_batches( - qdbnp._reader_batch_to_arrays(batch) for batch in reader - ) - except ValueError: - idx = np.array([], dtype=np.dtype("datetime64[ns]")) - xs = {} - - return pd.Series(xs.get(col_name, np.array([], dtype=object)), index=idx) - - -def write_series( - series: pd.Series, - cluster: Cluster, - table: Table, - col_name: str, - infer_types: bool = True, - dtype: Optional[DType] = None, -) -> None: - """ - Writes a Pandas Timeseries to a single column. - - Parameters: - ----------- - - series : pandas.Series - Pandas Series, with a numpy.datetime64[ns] as index. Underlying data will be attempted - to be transformed to appropriate QuasarDB type. - - cluster : quasardb.Cluster - Active connection to the QuasarDB cluster. - - table : quasardb.Timeseries - QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table') - - col_name : str - Column name to store data in. - """ - - logger.debug( - "write_series, table=%s, col_name=%s, infer_types=%s, dtype=%s", - table.get_name(), - col_name, - infer_types, - dtype, - ) - - data = None - index = None - - data = ma.masked_array(series.to_numpy(copy=False), mask=series.isna()) - - if infer_types is True: - index = series.index.to_numpy("datetime64[ns]", copy=False) - else: - index = series.index.to_numpy(copy=False) - - assert data is not None - assert index is not None - - qdbnp.write_arrays( - data={col_name: data}, - cluster=cluster, - table=table, - index=index, - dtype={col_name: dtype} if dtype is not None else None, - infer_types=infer_types, - ) - - def query( cluster: Cluster, query: str, From ada9b1e5b02fbaf8df246b83a59707410f21d53a Mon Sep 17 00:00:00 2001 From: vikonix Date: Mon, 13 Apr 2026 13:27:46 +0200 Subject: [PATCH 06/10] fix tests --- tests/test_buf_size.py | 2 +- tests/test_firehose.py | 4 ++-- tests/test_pandas_benchmark.py | 12 ++++++------ tests/test_stats.py | 6 +++++- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/tests/test_buf_size.py b/tests/test_buf_size.py index f40350fa..787562c5 100644 --- a/tests/test_buf_size.py +++ b/tests/test_buf_size.py @@ -50,7 +50,7 @@ def test_client_query_buf_size_error(qdbd_settings, table, many_intervals): # doubles, blobs, strings, integers, timestamps = # batchlib._test_with_table( batchlib._test_with_table( - inserter, table, many_intervals, batchlib._regular_push + conn, inserter, table, many_intervals, batchlib._regular_push ) res = conn.query('select * from "' + table.get_name() + '"') diff --git a/tests/test_firehose.py b/tests/test_firehose.py index cce974e4..53792150 100644 --- a/tests/test_firehose.py +++ b/tests/test_firehose.py @@ -57,7 +57,7 @@ def test_subscribe_single_table(qdbd_connection, table, many_intervals): xs = table.subscribe(qdbd_connection) doubles, blobs, strings, integers, timestamps, symbols = batchlib._test_with_table( - inserter, table, many_intervals, batchlib._regular_push + qdbd_connection, inserter, table, many_intervals, batchlib._regular_push ) time.sleep(4) @@ -92,7 +92,7 @@ def next_row(): many_intervals_.append(x + np.timedelta64(365, "D")) doubles, blobs, strings, integers, timestamps, symbols = batchlib._test_with_table( - inserter, table, many_intervals_, batchlib._regular_push + qdbd_connection, inserter, table, many_intervals_, batchlib._regular_push ) # Note that we only reset `offset`; since we just inserted exactly double diff --git a/tests/test_pandas_benchmark.py b/tests/test_pandas_benchmark.py index a91d80c9..0c9ec8d4 100644 --- a/tests/test_pandas_benchmark.py +++ b/tests/test_pandas_benchmark.py @@ -15,7 +15,7 @@ def test_bench_double_series(qdbd_connection, table, many_intervals, benchmark): # doubles, blobs, strings, integers, timestamps, symbols = # batchlib._test_with_table( _, _, _, _, _, _ = batchlib._test_with_table( - inserter, table, many_intervals, batchlib._regular_push + qdbd_connection, inserter, table, many_intervals, batchlib._regular_push ) benchmark(qdbpd.read_series, table, "the_double") @@ -28,7 +28,7 @@ def test_bench_blob_series(qdbd_connection, table, many_intervals, benchmark): # doubles, blobs, strings, integers, timestamps, symbols = # batchlib._test_with_table( _, _, _, _, _, _ = batchlib._test_with_table( - inserter, table, many_intervals, batchlib._regular_push + qdbd_connection, inserter, table, many_intervals, batchlib._regular_push ) benchmark(qdbpd.read_series, table, "the_blob") @@ -41,7 +41,7 @@ def test_bench_string_series(qdbd_connection, table, many_intervals, benchmark): # doubles, blobs, strings, integers, timestamps, symbols = # batchlib._test_with_table( _, _, _, _, _, _ = batchlib._test_with_table( - inserter, table, many_intervals, batchlib._regular_push + qdbd_connection, inserter, table, many_intervals, batchlib._regular_push ) benchmark(qdbpd.read_series, table, "the_string") @@ -54,7 +54,7 @@ def test_bench_int64_series(qdbd_connection, table, many_intervals, benchmark): # doubles, blobs, strings, integers, timestamps, symbols = # batchlib._test_with_table( _, _, _, _, _, _ = batchlib._test_with_table( - inserter, table, many_intervals, batchlib._regular_push + qdbd_connection, inserter, table, many_intervals, batchlib._regular_push ) benchmark(qdbpd.read_series, table, "the_int64") @@ -67,7 +67,7 @@ def test_bench_timestamp_series(qdbd_connection, table, many_intervals, benchmar # doubles, blobs, strings, integers, timestamps, symbols = # batchlib._test_with_table( _, _, _, _, _, _ = batchlib._test_with_table( - inserter, table, many_intervals, batchlib._regular_push + qdbd_connection, inserter, table, many_intervals, batchlib._regular_push ) benchmark(qdbpd.read_series, table, "the_ts") @@ -80,7 +80,7 @@ def test_bench_symbol_series(qdbd_connection, table, many_intervals, benchmark): # doubles, blobs, strings, integers, timestamps, symbols = # batchlib._test_with_table( _, _, _, _, _, _ = batchlib._test_with_table( - inserter, table, many_intervals, batchlib._regular_push + qdbd_connection, inserter, table, many_intervals, batchlib._regular_push ) benchmark(qdbpd.read_series, table, "the_symbol") diff --git a/tests/test_stats.py b/tests/test_stats.py index 6657147c..ba49cca8 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -16,7 +16,11 @@ def _write_data(conn, table): # doubles, blobs, strings, integers, timestamps, symbols = # batchlib._test_with_table( _, _, _, _, _, _ = batchlib._test_with_table( - inserter, table, conftest.create_many_intervals(), batchlib._regular_push + conn, + inserter, + table, + conftest.create_many_intervals(), + batchlib._regular_push, ) From 7e7cea06444360ce49979edd695b0d393db1a6ab Mon Sep 17 00:00:00 2001 From: vikonix Date: Mon, 13 Apr 2026 13:29:56 +0200 Subject: [PATCH 07/10] formatting --- tests/test_query.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_query.py b/tests/test_query.py index 20e90e77..89bb2c71 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -102,9 +102,7 @@ def _insert_symbol_points(conn, table, start_time, points=10): } -def _insert_points( - value_type, conn, table, start_time=None, intervals=None, points=10 -): +def _insert_points(value_type, conn, table, start_time=None, intervals=None, points=10): if start_time is None: assert intervals is not None start_time = tslib._start_time(intervals) From 993fdd1df3f0119d0debedf1d18cac409578342c Mon Sep 17 00:00:00 2001 From: vikonix Date: Mon, 13 Apr 2026 13:47:20 +0200 Subject: [PATCH 08/10] cosmetic --- tests/test_numpy.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tests/test_numpy.py b/tests/test_numpy.py index f2ae99cc..11e2d933 100644 --- a/tests/test_numpy.py +++ b/tests/test_numpy.py @@ -28,7 +28,7 @@ def _unicode_to_object_array(xs): return ma.masked_array(data=data, mask=mask) -def _read_single_array(conn, table, column, *, ranges=None): +def _read_single_column(conn, table, column, *, ranges=None): idx, xs = qdbnp.read_arrays( conn, [table], @@ -38,7 +38,7 @@ def _read_single_array(conn, table, column, *, ranges=None): return idx, xs[column] -def _write_single_array( +def _write_single_column( conn, table, column, data, index, *, dtype=None, infer_types=True, **kwargs ): payload = {column: data} @@ -62,7 +62,7 @@ def _write_single_array( # ### # -# Single-column read/write tests now go through the batched array API. +# Single-column read/write tests go through the batched array API. # ###### @@ -76,7 +76,7 @@ def test_array_read_write_native_dtypes(array_with_index_and_table, qdbd_connect (ctype, dtype, data, index, table) = array_with_index_and_table col = table.column_id_by_index(0) - _write_single_array( + _write_single_column( qdbd_connection, table, col, @@ -86,7 +86,7 @@ def test_array_read_write_native_dtypes(array_with_index_and_table, qdbd_connect infer_types=False, ) - res = _read_single_array(qdbd_connection, table, col) + res = _read_single_column(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -102,9 +102,9 @@ def test_array_read_write_inferrable_dtypes( (ctype, dtype, data, index, table) = array_with_index_and_table col = table.column_id_by_index(0) - _write_single_array(qdbd_connection, table, col, data, index, infer_types=True) + _write_single_column(qdbd_connection, table, col, data, index, infer_types=True) - res = _read_single_array(qdbd_connection, table, col) + res = _read_single_column(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -128,7 +128,7 @@ def test_arrays_read_write_native_dtypes(array_with_index_and_table, qdbd_connec push_mode=quasardb.WriterPushMode.Truncate, ) - res = _read_single_array(qdbd_connection, table, col) + res = _read_single_column(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -148,7 +148,7 @@ def test_arrays_read_write_inferrable_dtypes( col = table.column_id_by_index(0) qdbnp.write_arrays([data], qdbd_connection, table, index=index, infer_types=True) - res = _read_single_array(qdbd_connection, table, col) + res = _read_single_column(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -172,7 +172,7 @@ def test_arrays_read_write_data_as_dict(array_with_index_and_table, qdbd_connect push_mode=quasardb.WriterPushMode.Truncate, ) - res = _read_single_array(qdbd_connection, table, col) + res = _read_single_column(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -197,7 +197,7 @@ def test_provide_index_as_dict(array_with_index_and_table, qdbd_connection): truncate=True, ) - res = _read_single_array(qdbd_connection, table, col) + res = _read_single_column(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data), res) @@ -295,7 +295,7 @@ def _do_write_read(xs): dtype=dtype, infer_types=False, ) - return _read_single_array(qdbd_connection, table, col) + return _read_single_column(qdbd_connection, table, col) res1 = _do_write_read(data1) assert_indexed_arrays_equal((index, data1), res1) @@ -346,7 +346,7 @@ def test_string_array_returns_unicode(array_with_index_and_table, qdbd_connectio col = table.column_id_by_index(0) qdbnp.write_arrays([data], qdbd_connection, table, index=index) - (idx, xs) = _read_single_array(qdbd_connection, table, col) + (idx, xs) = _read_single_column(qdbd_connection, table, col) assert qdbnp.dtypes_equal(xs.dtype, np.dtype("unicode")) @@ -769,7 +769,7 @@ def test_regression_sc11333(qdbd_connection, table_name, start_date, row_count): for i in range(len(cols)): col = cols[i] - res = _read_single_array(qdbd_connection, t, col) + res = _read_single_column(qdbd_connection, t, col) assert_indexed_arrays_equal((idx, data[i]), res) @@ -781,7 +781,7 @@ def test_write_through_flag(arrays_with_index_and_table, qdbd_connection): [data[0]], qdbd_connection, table, index=index, write_through=True ) - res = _read_single_array(qdbd_connection, table, col) + res = _read_single_column(qdbd_connection, table, col) assert_indexed_arrays_equal((index, data[0]), res) From b125a72974b69daee2475a87ac94ad4409346866 Mon Sep 17 00:00:00 2001 From: vikonix Date: Mon, 13 Apr 2026 17:58:08 +0200 Subject: [PATCH 09/10] cosmetic 2 --- tests/test_batch_inserter.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tests/test_batch_inserter.py b/tests/test_batch_inserter.py index 268c1829..ec56806e 100644 --- a/tests/test_batch_inserter.py +++ b/tests/test_batch_inserter.py @@ -81,6 +81,21 @@ def _set_batch_inserter_data(inserter, intervals, data, start=0): inserter.set_string(5, symbols[i]) +def _assert_results(conn, table, intervals, data): + (doubles, integers, blobs, strings, timestamps, symbols) = data + + whole_range = (intervals[0], intervals[-1:][0] + np.timedelta64(2, "s")) + idx, xs = _read_all_columns(conn, table, ranges=[whole_range]) + + np.testing.assert_array_equal(idx, intervals) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) + np.testing.assert_array_equal(xs[tslib._blob_col_name(table)], blobs) + np.testing.assert_array_equal(xs[tslib._string_col_name(table)], strings) + np.testing.assert_array_equal(xs[tslib._int64_col_name(table)], integers) + np.testing.assert_array_equal(xs[tslib._ts_col_name(table)], timestamps) + np.testing.assert_array_equal(xs[tslib._symbol_col_name(table)], symbols) + + def _read_column(conn, table, column, *, ranges=None): idx, xs = qdbnp.read_arrays( conn, @@ -103,21 +118,6 @@ def _read_all_columns(conn, table, *, ranges=None): return qdbnp.read_arrays(conn, [table], column_names=column_names, ranges=ranges) -def _assert_results(conn, table, intervals, data): - (doubles, integers, blobs, strings, timestamps, symbols) = data - - whole_range = (intervals[0], intervals[-1:][0] + np.timedelta64(2, "s")) - idx, xs = _read_all_columns(conn, table, ranges=[whole_range]) - - np.testing.assert_array_equal(idx, intervals) - np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) - np.testing.assert_array_equal(xs[tslib._blob_col_name(table)], blobs) - np.testing.assert_array_equal(xs[tslib._string_col_name(table)], strings) - np.testing.assert_array_equal(xs[tslib._int64_col_name(table)], integers) - np.testing.assert_array_equal(xs[tslib._ts_col_name(table)], timestamps) - np.testing.assert_array_equal(xs[tslib._symbol_col_name(table)], symbols) - - def _test_with_table( conn, inserter, table, intervals, push_method=_regular_push, data=None ): From c706ba92a5c77422dd8b3bc702c4207bec948012 Mon Sep 17 00:00:00 2001 From: vikonix Date: Mon, 13 Apr 2026 18:06:31 +0200 Subject: [PATCH 10/10] cosmetic 3 --- tests/test_numpy.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/test_numpy.py b/tests/test_numpy.py index 11e2d933..9af12c24 100644 --- a/tests/test_numpy.py +++ b/tests/test_numpy.py @@ -60,10 +60,6 @@ def _write_single_column( # # Array tests # -### -# -# Single-column read/write tests go through the batched array API. -# ######