From 1f0120df9a8820b1be9313c57f669eec96812adc Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Fri, 14 Nov 2025 18:11:57 +0800 Subject: [PATCH 1/2] [Python] new datatypes --- cpp/src/common/record.h | 18 ++---- cpp/src/cwrapper/tsfile_cwrapper.cc | 13 +++++ cpp/src/cwrapper/tsfile_cwrapper.h | 5 ++ cpp/src/writer/tsfile_writer.cc | 4 +- cpp/test/cwrapper/cwrapper_test.cc | 87 +++++++++++++++++++++++------ python/setup.py | 2 +- python/tests/test_write_and_read.py | 40 +++++++++++-- python/tsfile/constants.py | 2 +- python/tsfile/field.py | 14 ++++- python/tsfile/tsfile_cpp.pxd | 4 ++ python/tsfile/tsfile_py_cpp.pyx | 28 ++++++++-- python/tsfile/tsfile_reader.pyx | 4 +- 12 files changed, 174 insertions(+), 47 deletions(-) diff --git a/cpp/src/common/record.h b/cpp/src/common/record.h index 8c729f688..cbbebbc52 100644 --- a/cpp/src/common/record.h +++ b/cpp/src/common/record.h @@ -53,9 +53,8 @@ struct DataPoint { int64_t i64_val_; float float_val_; double double_val_; - common::String *str_val_; } u_; - TextType text_val_; + common::String text_val_; DataPoint(const std::string &measurement_name, bool b) : measurement_name_(measurement_name), text_val_() { @@ -82,19 +81,12 @@ struct DataPoint { u_.double_val_ = d; } - DataPoint(const std::string &measurement_name, common::String &str, - common::PageArena &pa) + DataPoint(const std::string &measurement_name, common::String str) : measurement_name_(measurement_name), text_val_() { - char *p_buf = (char *)pa.alloc(sizeof(common::String)); - u_.str_val_ = new (p_buf) common::String(); - u_.str_val_->dup_from(str, pa); + text_val_.buf_ = str.buf_; + text_val_.len_ = str.len_; } - // DataPoint(const std::string &measurement_name, Text &text), - // : measurement_name_(measurement_name), - // data_type_(common::TEXT), - // text_val_(text) {} - DataPoint(const std::string &measurement_name) : isnull(true), measurement_name_(measurement_name) {} void set_i32(int32_t i32) { @@ -150,7 +142,7 @@ template <> inline int TsRecord::add_point(const std::string &measurement_name, common::String val) { int ret = common::E_OK; - points_.emplace_back(DataPoint(measurement_name, val, pa)); + points_.emplace_back(DataPoint(measurement_name, val)); return ret; } diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 1b09db49c..02a3b11cc 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -280,6 +280,19 @@ TsRecord _ts_record_new(const char *device_id, Timestamp timestamp, return common::E_OK; \ } +ERRNO _insert_data_into_ts_record_by_name_string(TsRecord data, + const char *measurement_name, + const char *value) { + auto *record = (storage::TsRecord *)data; + storage::DataPoint point( + measurement_name, + common::String(const_cast(value), strlen(value))); + if (record->points_.size() + 1 > record->points_.capacity()) + return common::E_BUF_NOT_ENOUGH; + record->points_.push_back(point); + return common::E_OK; +} + INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(int32_t); INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(int64_t); INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(bool); diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 75dc03643..3c753395f 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -35,6 +35,8 @@ typedef enum { TS_DATATYPE_DOUBLE = 4, TS_DATATYPE_TEXT = 5, TS_DATATYPE_VECTOR = 6, + TS_DATATYPE_DATE = 9, + TS_DATATYPE_BLOB = 10, TS_DATATYPE_STRING = 11, TS_DATATYPE_NULL_TYPE = 254, TS_DATATYPE_INVALID = 255 @@ -623,6 +625,9 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(int64_t); INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool); INSERT_DATA_INTO_TS_RECORD_BY_NAME(float); INSERT_DATA_INTO_TS_RECORD_BY_NAME(double); +ERRNO _insert_data_into_ts_record_by_name_string(TsRecord data, + const char* measurement_name, + const char* value); // Write a tablet into a device. ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 3ae5cf28a..20c747f92 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -657,7 +657,7 @@ int TsFileWriter::write_point(ChunkWriter *chunk_writer, int64_t timestamp, case common::BLOB: case common::TEXT: case common::STRING: - return chunk_writer->write(timestamp, *point.u_.str_val_); + return chunk_writer->write(timestamp, point.text_val_); default: return E_INVALID_DATA_POINT; } @@ -689,7 +689,7 @@ int TsFileWriter::write_point_aligned(ValueChunkWriter *value_chunk_writer, case common::BLOB: case common::TEXT: case common::STRING: - return value_chunk_writer->write(timestamp, point.u_.str_val_, + return value_chunk_writer->write(timestamp, point.text_val_, isnull); default: return E_INVALID_DATA_POINT; diff --git a/cpp/test/cwrapper/cwrapper_test.cc b/cpp/test/cwrapper/cwrapper_test.cc index 90a93fb42..bb41f6567 100644 --- a/cpp/test/cwrapper/cwrapper_test.cc +++ b/cpp/test/cwrapper/cwrapper_test.cc @@ -19,6 +19,17 @@ #include #include #include + +#include "common/row_record.h" +#include "cwrapper/tsfile_cwrapper.h" +#include "reader/result_set.h" +#include "reader/tsfile_reader.h" +#include "writer/tsfile_writer.h" + +namespace storage { +class TsFileReader; +} + extern "C" { #include "cwrapper/errno_define_c.h" #include "cwrapper/tsfile_cwrapper.h" @@ -28,23 +39,65 @@ extern "C" { #include "utils/errno_define.h" namespace cwrapper { -class CWrapperTest : public testing::Test {}; - -// TEST_F(CWrapperTest, RegisterTimeSeries) { -// ERRNO code = 0; -// char* temperature = strdup("temperature"); -// TimeseriesSchema ts_schema{temperature, TS_DATATYPE_INT32, -// TS_ENCODING_PLAIN, -// TS_COMPRESSION_UNCOMPRESSED}; -// remove("cwrapper_register_timeseries.tsfile"); -// TsFileWriter writer = -// tsfile_writer_new("cwrapper_register_timeseries.tsfile", &code); -// ASSERT_EQ(code, 0); -// code = tsfile_writer_register_timeseries(writer, "device1", &ts_schema); -// ASSERT_EQ(code, 0); -// free(temperature); -// tsfile_writer_close(writer); -// } +class CWrapperTest : public testing::Test { + public: + static void ASSERT_OK(ERRNO code, const char* msg = "") { + ASSERT_EQ(code, RET_OK) << msg; + } +}; + +TEST_F(CWrapperTest, TestForPythonInterfaceInsert) { + ERRNO code = 0; + const int column_num = 10; + char* filename = "cwrapper_for_python.tsfile"; + remove(filename); + char* device_id = "root.device1"; + char* measurement_id = "measurement"; + + timeseries_schema measurement; + measurement.timeseries_name = measurement_id; + measurement.compression = TS_COMPRESSION_UNCOMPRESSED; + measurement.data_type = TS_DATATYPE_STRING; + measurement.encoding = TS_ENCODING_PLAIN; + + auto* writer = (storage::TsFileWriter*)_tsfile_writer_new( + filename, 128 * 1024 * 1024, &code); + ASSERT_OK(code, "create writer failed"); + + ASSERT_OK( + _tsfile_writer_register_timeseries(writer, device_id, &measurement), + "register timeseries failed"); + + auto* record = (storage::TsRecord*)_ts_record_new(device_id, 0, 1); + char* test_str = "test_string"; + ASSERT_OK(_insert_data_into_ts_record_by_name_string(record, measurement_id, + test_str), + "insert data failed"); + + ASSERT_OK(_tsfile_writer_write_ts_record(writer, record), + "write record failed"); + ASSERT_OK(_tsfile_writer_flush(writer), "flush failed"); + ASSERT_OK(_tsfile_writer_close(writer), "close writer failed"); + + auto* reader = (storage::TsFileReader*)tsfile_reader_new(filename, &code); + ASSERT_OK(code, "create reader failed"); + + auto* result = (storage::ResultSet*)_tsfile_reader_query_device( + reader, device_id, &measurement_id, 1, 0, 100, &code); + ASSERT_OK(code, "query device failed"); + + bool has_next = false; + int row_count = 0; + while (!result->next(has_next) && has_next) { + EXPECT_EQ(result->get_value(1), row_count); + common::String* str = result->get_value(2); + EXPECT_EQ(strlen(test_str), str->len_); + char* ret_char = tsfile_result_set_get_value_by_index_string(result, 2); + EXPECT_EQ(strcmp(test_str, ret_char), 0); + } + + ASSERT_OK(reader->close(), "close reader failed"); +} TEST_F(CWrapperTest, WriterFlushTabletAndReadData) { ERRNO code = 0; diff --git a/python/setup.py b/python/setup.py index 329cc2aae..cd3699818 100644 --- a/python/setup.py +++ b/python/setup.py @@ -25,7 +25,7 @@ from setuptools import setup, Extension from setuptools.command.build_ext import build_ext -version = "2.1.0.dev0" +version = "2.2.0.dev" system = platform.system() diff --git a/python/tests/test_write_and_read.py b/python/tests/test_write_and_read.py index e5c87ab92..d4cfb3d50 100644 --- a/python/tests/test_write_and_read.py +++ b/python/tests/test_write_and_read.py @@ -33,34 +33,53 @@ def test_row_record_write_and_read(): try: + if os.path.exists("record_write_and_read.tsfile"): + os.remove("record_write_and_read.tsfile") writer = TsFileWriter("record_write_and_read.tsfile") timeseries = TimeseriesSchema("level1", TSDataType.INT64) writer.register_timeseries("root.device1", timeseries) writer.register_timeseries("root.device1", TimeseriesSchema("level2", TSDataType.DOUBLE)) writer.register_timeseries("root.device1", TimeseriesSchema("level3", TSDataType.INT32)) + writer.register_timeseries("root.device1", TimeseriesSchema("level4", TSDataType.STRING)) + writer.register_timeseries("root.device1", TimeseriesSchema("level5", TSDataType.TEXT)) + writer.register_timeseries("root.device1", TimeseriesSchema("level6", TSDataType.BLOB)) + writer.register_timeseries("root.device1", TimeseriesSchema("level7", TSDataType.DATE)) + + max_row_num = 10 - max_row_num = 1000 for i in range(max_row_num): row = RowRecord("root.device1", i, [Field("level1", i + 1, TSDataType.INT64), Field("level2", i * 1.1, TSDataType.DOUBLE), - Field("level3", i * 2, TSDataType.INT32)]) + Field("level3", i * 2, TSDataType.INT32), + Field("level4", f"string_value_{i}", TSDataType.STRING), + Field("level5", f"text_value_{i}", TSDataType.TEXT), + Field("level6", f"blob_data_{i}".encode('utf-8'), TSDataType.BLOB), + Field("level7", i, TSDataType.DATE)]) writer.write_row_record(row) writer.close() reader = TsFileReader("record_write_and_read.tsfile") - result = reader.query_timeseries("root.device1", ["level1", "level2"], 10, 100) - i = 10 + result = reader.query_timeseries("root.device1", ["level1", "level2", "level4", "level7"], 0, 100) + row_num = 0 while result.next(): - print(result.get_value_by_index(1)) + #print(result.get_value_by_index(0)) + print(f"\n--- 记录 #{row_num} ---") + print(f"索引1的值: {result.get_value_by_index(1)}") + print(f"索引2的值: {result.get_value_by_index(2)}") + print(f"索引3的值: {result.get_value_by_index(3)}") + print(f"索引4的值: {result.get_value_by_index(4)}") + print(f"索引5的值: {result.get_value_by_index(5)}") + assert result.get_value_by_index(2) == row_num + 1 + row_num += 1 print(reader.get_active_query_result()) result.close() print(reader.get_active_query_result()) reader.close() finally: if os.path.exists("record_write_and_read.tsfile"): - os.remove("record_write_and_read.tsfile") + os.remove("record_write_and_read2.tsfile") @pytest.mark.skip(reason="API not match") @@ -305,3 +324,12 @@ def test_tsfile_to_df(): to_dataframe("table_write_to_df.tsfile", "test_table", ["device1"]) finally: os.remove("table_write_to_df.tsfile") + +import os + +if __name__ == "__main__": + os.chdir(os.path.dirname(os.path.abspath(__file__))) + pytest.main([ + "test_write_and_read.py::test_row_record_write_and_read", + "-s", "-v" + ]) \ No newline at end of file diff --git a/python/tsfile/constants.py b/python/tsfile/constants.py index 5eaa24700..7df8eab07 100644 --- a/python/tsfile/constants.py +++ b/python/tsfile/constants.py @@ -35,7 +35,7 @@ class TSDataType(IntEnum): def to_py_type(self): if self == TSDataType.BOOLEAN: return bool - elif self == TSDataType.INT32: + elif self == TSDataType.INT32 or self == TSDataType.DATE: return int elif self == TSDataType.INT64: return int diff --git a/python/tsfile/field.py b/python/tsfile/field.py index ae46c6282..26d02c5f7 100644 --- a/python/tsfile/field.py +++ b/python/tsfile/field.py @@ -75,6 +75,7 @@ def get_int_value(self): if ( self.data_type != TSDataType.INT32 + and self.data_type != TSDataType.DATE and self.data_type != TSDataType.INT64 and self.data_type != TSDataType.FLOAT and self.data_type != TSDataType.DOUBLE @@ -178,10 +179,21 @@ def get_string_value(self): return str(self.value) # BLOB elif self.data_type == TSDataType.BLOB: - return str(hex(int.from_bytes(self.value, byteorder="big"))) + return self.value else: return str(self.get_object_value(self.data_type)) + def get_bytes_value(self): + if self.value is None: + return None + if self.data_type is None: + raise NoneDataTypeException("None Data Type Exception!") + + if self.data_type == TSDataType.BLOB: + return self.value + else: + raise TypeError("get_bytes_value() only supports BLOB data type") + def __str__(self): return self.get_string_value() diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 1b04051c9..76262b5f4 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -42,6 +42,8 @@ cdef extern from "./tsfile_cwrapper.h": TS_DATATYPE_DOUBLE = 4 TS_DATATYPE_TEXT = 5 TS_DATATYPE_VECTOR = 6 + TS_DATATYPE_DATE = 9 + TS_DATATYPE_BLOB = 10 TS_DATATYPE_STRING = 11 TS_DATATYPE_NULL_TYPE = 254 TS_DATATYPE_INVALID = 255 @@ -159,6 +161,8 @@ cdef extern from "./tsfile_cwrapper.h": ErrorCode _insert_data_into_ts_record_by_name_double(TsRecord data, const char *measurement_name, const double value); ErrorCode _insert_data_into_ts_record_by_name_bool(TsRecord data, const char *measurement_name, const bint value); + ErrorCode _insert_data_into_ts_record_by_name_string(TsRecord data, const char *measurement_name, const char *value) + #ErrorCode _insert_data_into_ts_record_by_name_blob(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_bytes_value()) void _free_tsfile_ts_record(TsRecord * record); diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index e17430399..d13cb01a3 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -83,8 +83,10 @@ cdef dict TS_DATA_TYPE_MAP = { TSDataTypePy.INT64: TSDataType.TS_DATATYPE_INT64, TSDataTypePy.FLOAT: TSDataType.TS_DATATYPE_FLOAT, TSDataTypePy.DOUBLE: TSDataType.TS_DATATYPE_DOUBLE, + TSDataTypePy.DATE: TSDataType.TS_DATATYPE_DATE, TSDataTypePy.TEXT: TSDataType.TS_DATATYPE_TEXT, - TSDataTypePy.STRING: TSDataType.TS_DATATYPE_STRING + TSDataTypePy.STRING: TSDataType.TS_DATATYPE_STRING, + TSDataTypePy.BLOB: TSDataType.TS_DATATYPE_BLOB } cdef dict TS_ENCODING_MAP = { @@ -266,8 +268,8 @@ cdef Tablet to_c_tablet(object tablet): if value[row] is not None: tablet_add_value_by_index_double(ctablet, row, col, value[row]) - # STRING - elif data_type == TS_DATATYPE_STRING: + # STRING or TEXT + elif data_type == TS_DATATYPE_STRING or data_type == TS_DATATYPE_TEXT or data_type == TS_DATATYPE_BLOB: for row in range(max_row_num): if value[row] is not None: py_value = value[row] @@ -293,7 +295,7 @@ cdef TsRecord to_c_record(object row_record): data_type = to_c_data_type(field.get_data_type()) if data_type == TS_DATATYPE_BOOLEAN: _insert_data_into_ts_record_by_name_bool(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_bool_value()) - elif data_type == TS_DATATYPE_INT32: + elif data_type == TS_DATATYPE_INT32 or data_type == TS_DATATYPE_DATE: _insert_data_into_ts_record_by_name_int32_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_int_value()) elif data_type == TS_DATATYPE_INT64: _insert_data_into_ts_record_by_name_int64_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_long_value()) @@ -301,6 +303,24 @@ cdef TsRecord to_c_record(object row_record): _insert_data_into_ts_record_by_name_double(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_double_value()) elif data_type == TS_DATATYPE_FLOAT: _insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_float_value()) + elif data_type == TS_DATATYPE_TEXT or data_type == TS_DATATYPE_STRING: + field_name = PyUnicode_AsUTF8(field.get_field_name()) + value = field.get_string_value() + + # 确保最终传入的是bytes + if isinstance(value, str): + value_bytes = value.encode('utf-8') + else: + value_bytes = bytes(value) # 或其他适当的转换 + print("hello") + _insert_data_into_ts_record_by_name_string(record, field_name, PyUnicode_AsUTF8(value_bytes)) + # print(PyUnicode_AsUTF8(field.get_field_name())) + # print(field.get_string_value().encode('utf-8')) + # _insert_data_into_ts_record_by_name_string(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_string_value().encode('utf-8')) + elif data_type == TS_DATATYPE_BLOB: + print(PyUnicode_AsUTF8(field.get_field_name())) + print(field.get_string_value()) + _insert_data_into_ts_record_by_name_string(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_string_value()) return record diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index e8d38d7df..4eaa7a01a 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -162,7 +162,7 @@ cdef class ResultSetPy: return None # data type in metadata is an array, id from 0. data_type = self.metadata.get_data_type(index) - if data_type == TSDataTypePy.INT32: + if data_type == TSDataTypePy.INT32 or data_type == TSDataTypePy.DATE: return tsfile_result_set_get_value_by_index_int32_t(self.result, index) elif data_type == TSDataTypePy.INT64: return tsfile_result_set_get_value_by_index_int64_t(self.result, index) @@ -172,7 +172,7 @@ cdef class ResultSetPy: return tsfile_result_set_get_value_by_index_double(self.result, index) elif data_type == TSDataTypePy.BOOLEAN: return tsfile_result_set_get_value_by_index_bool(self.result, index) - elif data_type == TSDataTypePy.STRING: + elif data_type == TSDataTypePy.STRING or data_type == TSDataTypePy.TEXT or data_type == TSDataTypePy.BLOB: try: string = tsfile_result_set_get_value_by_index_string(self.result, index) py_str = string.decode('utf-8') From 06480757edddf18939807bd37579e42096390403 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Tue, 18 Nov 2025 17:25:20 +0800 Subject: [PATCH 2/2] add cwrapper ut for python new datatypes --- cpp/src/common/row_record.h | 1 + cpp/test/cwrapper/cwrapper_test.cc | 88 ++++++++++++++++++++++++----- python/tests/test_write_and_read.py | 12 ++-- python/tsfile/tsfile_py_cpp.pyx | 16 +----- python/tsfile/tsfile_reader.pyx | 8 +-- 5 files changed, 89 insertions(+), 36 deletions(-) diff --git a/cpp/src/common/row_record.h b/cpp/src/common/row_record.h index 5ff5e232f..713cabc83 100644 --- a/cpp/src/common/row_record.h +++ b/cpp/src/common/row_record.h @@ -103,6 +103,7 @@ struct Field { switch (type_) { case common::TSDataType::BOOLEAN: return value_.bval_; + case common::TSDataType::DATE: case common::TSDataType::INT32: return value_.ival_; case common::TSDataType::TIMESTAMP: diff --git a/cpp/test/cwrapper/cwrapper_test.cc b/cpp/test/cwrapper/cwrapper_test.cc index bb41f6567..0ddbbde8c 100644 --- a/cpp/test/cwrapper/cwrapper_test.cc +++ b/cpp/test/cwrapper/cwrapper_test.cc @@ -50,50 +50,112 @@ TEST_F(CWrapperTest, TestForPythonInterfaceInsert) { ERRNO code = 0; const int column_num = 10; char* filename = "cwrapper_for_python.tsfile"; - remove(filename); + remove(filename); // Clean up any existing file + + // Device and measurement definitions char* device_id = "root.device1"; - char* measurement_id = "measurement"; + char* str_measurement_id = "str_measurement"; + char* text_measurement_id = "text_measurement"; + char* date_measurement_id = "date_measurement"; + + // Define time series schemas for different data types + timeseries_schema str_measurement; + str_measurement.timeseries_name = str_measurement_id; + str_measurement.compression = TS_COMPRESSION_UNCOMPRESSED; + str_measurement.data_type = TS_DATATYPE_STRING; + str_measurement.encoding = TS_ENCODING_PLAIN; - timeseries_schema measurement; - measurement.timeseries_name = measurement_id; - measurement.compression = TS_COMPRESSION_UNCOMPRESSED; - measurement.data_type = TS_DATATYPE_STRING; - measurement.encoding = TS_ENCODING_PLAIN; + timeseries_schema text_measurement; + text_measurement.timeseries_name = text_measurement_id; + text_measurement.compression = TS_COMPRESSION_UNCOMPRESSED; + text_measurement.data_type = TS_DATATYPE_TEXT; + text_measurement.encoding = TS_ENCODING_PLAIN; + timeseries_schema date_measurement; + date_measurement.timeseries_name = date_measurement_id; + date_measurement.compression = TS_COMPRESSION_UNCOMPRESSED; + date_measurement.data_type = TS_DATATYPE_DATE; + date_measurement.encoding = TS_ENCODING_PLAIN; + + // Create TsFile writer auto* writer = (storage::TsFileWriter*)_tsfile_writer_new( filename, 128 * 1024 * 1024, &code); ASSERT_OK(code, "create writer failed"); + // Register time series with the writer + ASSERT_OK( + _tsfile_writer_register_timeseries(writer, device_id, &str_measurement), + "register timeseries failed"); + + ASSERT_OK( + _tsfile_writer_register_timeseries(writer, device_id, &text_measurement), + "register timeseries failed"); + ASSERT_OK( - _tsfile_writer_register_timeseries(writer, device_id, &measurement), + _tsfile_writer_register_timeseries(writer, device_id, &date_measurement), "register timeseries failed"); - auto* record = (storage::TsRecord*)_ts_record_new(device_id, 0, 1); + // Create a new time series record + auto* record = (storage::TsRecord*)_ts_record_new(device_id, 0, 3); + + // Insert string data char* test_str = "test_string"; - ASSERT_OK(_insert_data_into_ts_record_by_name_string(record, measurement_id, + ASSERT_OK(_insert_data_into_ts_record_by_name_string(record, str_measurement_id, test_str), "insert data failed"); + // Insert text data + char* test_text = "test_text"; + ASSERT_OK(_insert_data_into_ts_record_by_name_string(record, text_measurement_id, + test_text), + "insert data failed"); + + // Insert date data - NOTE: There's a bug here, should use date_measurement_id + int32_t test_date = 20251118; + ASSERT_OK(_insert_data_into_ts_record_by_name_int32_t(record, date_measurement_id, + test_date), + "insert data failed"); + + // Write the record to file and close writer ASSERT_OK(_tsfile_writer_write_ts_record(writer, record), "write record failed"); ASSERT_OK(_tsfile_writer_flush(writer), "flush failed"); ASSERT_OK(_tsfile_writer_close(writer), "close writer failed"); + // Create reader to verify the written data auto* reader = (storage::TsFileReader*)tsfile_reader_new(filename, &code); ASSERT_OK(code, "create reader failed"); + // Query the data we just wrote + char* sensors[] = {str_measurement_id, text_measurement_id, date_measurement_id}; auto* result = (storage::ResultSet*)_tsfile_reader_query_device( - reader, device_id, &measurement_id, 1, 0, 100, &code); + reader, device_id, sensors, 3, 0, 100, &code); ASSERT_OK(code, "query device failed"); + // Verify the retrieved data matches what we inserted bool has_next = false; int row_count = 0; while (!result->next(has_next) && has_next) { + // Verify timestamp EXPECT_EQ(result->get_value(1), row_count); - common::String* str = result->get_value(2); + + // Verify string data + const common::String* str = result->get_value(2); EXPECT_EQ(strlen(test_str), str->len_); - char* ret_char = tsfile_result_set_get_value_by_index_string(result, 2); + const char* ret_char = tsfile_result_set_get_value_by_index_string(result, 2); EXPECT_EQ(strcmp(test_str, ret_char), 0); + + // Verify text data + const common::String* text = result->get_value(3); + EXPECT_EQ(strlen(test_text), text->len_); + const char* ret_text = tsfile_result_set_get_value_by_index_string(result, 3); + EXPECT_EQ(strcmp(test_text, ret_text), 0); + + // Verify date data + int32_t ret_date = tsfile_result_set_get_value_by_index_int32_t(result, 4); + EXPECT_EQ(test_date, ret_date); + + row_count++; } ASSERT_OK(reader->close(), "close reader failed"); diff --git a/python/tests/test_write_and_read.py b/python/tests/test_write_and_read.py index d4cfb3d50..69cc3d894 100644 --- a/python/tests/test_write_and_read.py +++ b/python/tests/test_write_and_read.py @@ -36,8 +36,7 @@ def test_row_record_write_and_read(): if os.path.exists("record_write_and_read.tsfile"): os.remove("record_write_and_read.tsfile") writer = TsFileWriter("record_write_and_read.tsfile") - timeseries = TimeseriesSchema("level1", TSDataType.INT64) - writer.register_timeseries("root.device1", timeseries) + writer.register_timeseries("root.device1", TimeseriesSchema("level1", TSDataType.INT64)) writer.register_timeseries("root.device1", TimeseriesSchema("level2", TSDataType.DOUBLE)) writer.register_timeseries("root.device1", TimeseriesSchema("level3", TSDataType.INT32)) writer.register_timeseries("root.device1", TimeseriesSchema("level4", TSDataType.STRING)) @@ -61,7 +60,7 @@ def test_row_record_write_and_read(): writer.close() reader = TsFileReader("record_write_and_read.tsfile") - result = reader.query_timeseries("root.device1", ["level1", "level2", "level4", "level7"], 0, 100) + result = reader.query_timeseries("root.device1", ["level1", "level2", "level3", "level4", "level5", "level6","level7"], 0, 100) row_num = 0 while result.next(): #print(result.get_value_by_index(0)) @@ -71,6 +70,9 @@ def test_row_record_write_and_read(): print(f"索引3的值: {result.get_value_by_index(3)}") print(f"索引4的值: {result.get_value_by_index(4)}") print(f"索引5的值: {result.get_value_by_index(5)}") + print(f"索引6的值: {result.get_value_by_index(6)}") + print(f"索引7的值: {result.get_value_by_index(7)}") + print(f"索引8的值: {result.get_value_by_index(8)}") assert result.get_value_by_index(2) == row_num + 1 row_num += 1 print(reader.get_active_query_result()) @@ -79,7 +81,7 @@ def test_row_record_write_and_read(): reader.close() finally: if os.path.exists("record_write_and_read.tsfile"): - os.remove("record_write_and_read2.tsfile") + os.remove("record_write_and_read.tsfile") @pytest.mark.skip(reason="API not match") @@ -135,6 +137,8 @@ def test_table_writer_and_reader(): [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) try: + if os.path.exists("table_write.tsfile"): + os.remove("table_write.tsfile") with TsFileTableWriter("table_write.tsfile", table) as writer: tablet = Tablet(["device", "value"], [TSDataType.STRING, TSDataType.DOUBLE], 100) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index d13cb01a3..70c162bbd 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -304,22 +304,8 @@ cdef TsRecord to_c_record(object row_record): elif data_type == TS_DATATYPE_FLOAT: _insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_float_value()) elif data_type == TS_DATATYPE_TEXT or data_type == TS_DATATYPE_STRING: - field_name = PyUnicode_AsUTF8(field.get_field_name()) - value = field.get_string_value() - - # 确保最终传入的是bytes - if isinstance(value, str): - value_bytes = value.encode('utf-8') - else: - value_bytes = bytes(value) # 或其他适当的转换 - print("hello") - _insert_data_into_ts_record_by_name_string(record, field_name, PyUnicode_AsUTF8(value_bytes)) - # print(PyUnicode_AsUTF8(field.get_field_name())) - # print(field.get_string_value().encode('utf-8')) - # _insert_data_into_ts_record_by_name_string(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_string_value().encode('utf-8')) + _insert_data_into_ts_record_by_name_string(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_string_value().encode('utf-8')) elif data_type == TS_DATATYPE_BLOB: - print(PyUnicode_AsUTF8(field.get_field_name())) - print(field.get_string_value()) _insert_data_into_ts_record_by_name_string(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_string_value()) return record diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 4eaa7a01a..7a488c4ef 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -175,11 +175,11 @@ cdef class ResultSetPy: elif data_type == TSDataTypePy.STRING or data_type == TSDataTypePy.TEXT or data_type == TSDataTypePy.BLOB: try: string = tsfile_result_set_get_value_by_index_string(self.result, index) - py_str = string.decode('utf-8') - return py_str + if string == NULL: + return None + return string.decode('utf-8') finally: - if string != NULL: - free(string) + pass def get_value_by_name(self, column_name : str): """