Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions cpp/src/common/record.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ struct DataPoint {
int64_t i64_val_;
float float_val_;
double double_val_;
common::String *str_val_;
} u_;
TextType text_val_;
common::String text_val_;

DataPoint(const std::string &measurement_name, bool b)
: measurement_name_(measurement_name), text_val_() {
Expand All @@ -82,19 +81,12 @@ struct DataPoint {
u_.double_val_ = d;
}

DataPoint(const std::string &measurement_name, common::String &str,
common::PageArena &pa)
DataPoint(const std::string &measurement_name, common::String str)
: measurement_name_(measurement_name), text_val_() {
char *p_buf = (char *)pa.alloc(sizeof(common::String));
u_.str_val_ = new (p_buf) common::String();
u_.str_val_->dup_from(str, pa);
text_val_.buf_ = str.buf_;
text_val_.len_ = str.len_;
}

// DataPoint(const std::string &measurement_name, Text &text),
// : measurement_name_(measurement_name),
// data_type_(common::TEXT),
// text_val_(text) {}

DataPoint(const std::string &measurement_name)
: isnull(true), measurement_name_(measurement_name) {}
void set_i32(int32_t i32) {
Expand Down Expand Up @@ -150,7 +142,7 @@ template <>
inline int TsRecord::add_point(const std::string &measurement_name,
common::String val) {
int ret = common::E_OK;
points_.emplace_back(DataPoint(measurement_name, val, pa));
points_.emplace_back(DataPoint(measurement_name, val));
return ret;
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/common/row_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ struct Field {
switch (type_) {
case common::TSDataType::BOOLEAN:
return value_.bval_;
case common::TSDataType::DATE:
case common::TSDataType::INT32:
return value_.ival_;
case common::TSDataType::TIMESTAMP:
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,19 @@ TsRecord _ts_record_new(const char *device_id, Timestamp timestamp,
return common::E_OK; \
}

ERRNO _insert_data_into_ts_record_by_name_string(TsRecord data,
const char *measurement_name,
const char *value) {
auto *record = (storage::TsRecord *)data;
storage::DataPoint point(
measurement_name,
common::String(const_cast<char *>(value), strlen(value)));
if (record->points_.size() + 1 > record->points_.capacity())
return common::E_BUF_NOT_ENOUGH;
record->points_.push_back(point);
return common::E_OK;
}

INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(int32_t);
INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(int64_t);
INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(bool);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ typedef enum {
TS_DATATYPE_DOUBLE = 4,
TS_DATATYPE_TEXT = 5,
TS_DATATYPE_VECTOR = 6,
TS_DATATYPE_DATE = 9,
TS_DATATYPE_BLOB = 10,
TS_DATATYPE_STRING = 11,
TS_DATATYPE_NULL_TYPE = 254,
TS_DATATYPE_INVALID = 255
Expand Down Expand Up @@ -623,6 +625,9 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(int64_t);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(float);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(double);
ERRNO _insert_data_into_ts_record_by_name_string(TsRecord data,
const char* measurement_name,
const char* value);

// Write a tablet into a device.
ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/writer/tsfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ int TsFileWriter::write_point(ChunkWriter *chunk_writer, int64_t timestamp,
case common::BLOB:
case common::TEXT:
case common::STRING:
return chunk_writer->write(timestamp, *point.u_.str_val_);
return chunk_writer->write(timestamp, point.text_val_);
default:
return E_INVALID_DATA_POINT;
}
Expand Down Expand Up @@ -689,7 +689,7 @@ int TsFileWriter::write_point_aligned(ValueChunkWriter *value_chunk_writer,
case common::BLOB:
case common::TEXT:
case common::STRING:
return value_chunk_writer->write(timestamp, point.u_.str_val_,
return value_chunk_writer->write(timestamp, point.text_val_,
isnull);
default:
return E_INVALID_DATA_POINT;
Expand Down
149 changes: 132 additions & 17 deletions cpp/test/cwrapper/cwrapper_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@
#include <gtest/gtest.h>
#include <unistd.h>
#include <utils/db_utils.h>

#include "common/row_record.h"
#include "cwrapper/tsfile_cwrapper.h"
#include "reader/result_set.h"
#include "reader/tsfile_reader.h"
#include "writer/tsfile_writer.h"

namespace storage {
class TsFileReader;
}

extern "C" {
#include "cwrapper/errno_define_c.h"
#include "cwrapper/tsfile_cwrapper.h"
Expand All @@ -28,23 +39,127 @@ extern "C" {
#include "utils/errno_define.h"

namespace cwrapper {
class CWrapperTest : public testing::Test {};

// TEST_F(CWrapperTest, RegisterTimeSeries) {
// ERRNO code = 0;
// char* temperature = strdup("temperature");
// TimeseriesSchema ts_schema{temperature, TS_DATATYPE_INT32,
// TS_ENCODING_PLAIN,
// TS_COMPRESSION_UNCOMPRESSED};
// remove("cwrapper_register_timeseries.tsfile");
// TsFileWriter writer =
// tsfile_writer_new("cwrapper_register_timeseries.tsfile", &code);
// ASSERT_EQ(code, 0);
// code = tsfile_writer_register_timeseries(writer, "device1", &ts_schema);
// ASSERT_EQ(code, 0);
// free(temperature);
// tsfile_writer_close(writer);
// }
class CWrapperTest : public testing::Test {
public:
static void ASSERT_OK(ERRNO code, const char* msg = "") {
ASSERT_EQ(code, RET_OK) << msg;
}
};

TEST_F(CWrapperTest, TestForPythonInterfaceInsert) {
ERRNO code = 0;
const int column_num = 10;
char* filename = "cwrapper_for_python.tsfile";
remove(filename); // Clean up any existing file

// Device and measurement definitions
char* device_id = "root.device1";
char* str_measurement_id = "str_measurement";
char* text_measurement_id = "text_measurement";
char* date_measurement_id = "date_measurement";

// Define time series schemas for different data types
timeseries_schema str_measurement;
str_measurement.timeseries_name = str_measurement_id;
str_measurement.compression = TS_COMPRESSION_UNCOMPRESSED;
str_measurement.data_type = TS_DATATYPE_STRING;
str_measurement.encoding = TS_ENCODING_PLAIN;

timeseries_schema text_measurement;
text_measurement.timeseries_name = text_measurement_id;
text_measurement.compression = TS_COMPRESSION_UNCOMPRESSED;
text_measurement.data_type = TS_DATATYPE_TEXT;
text_measurement.encoding = TS_ENCODING_PLAIN;

timeseries_schema date_measurement;
date_measurement.timeseries_name = date_measurement_id;
date_measurement.compression = TS_COMPRESSION_UNCOMPRESSED;
date_measurement.data_type = TS_DATATYPE_DATE;
date_measurement.encoding = TS_ENCODING_PLAIN;

// Create TsFile writer
auto* writer = (storage::TsFileWriter*)_tsfile_writer_new(
filename, 128 * 1024 * 1024, &code);
ASSERT_OK(code, "create writer failed");

// Register time series with the writer
ASSERT_OK(
_tsfile_writer_register_timeseries(writer, device_id, &str_measurement),
"register timeseries failed");

ASSERT_OK(
_tsfile_writer_register_timeseries(writer, device_id, &text_measurement),
"register timeseries failed");

ASSERT_OK(
_tsfile_writer_register_timeseries(writer, device_id, &date_measurement),
"register timeseries failed");

// Create a new time series record
auto* record = (storage::TsRecord*)_ts_record_new(device_id, 0, 3);

// Insert string data
char* test_str = "test_string";
ASSERT_OK(_insert_data_into_ts_record_by_name_string(record, str_measurement_id,
test_str),
"insert data failed");

// Insert text data
char* test_text = "test_text";
ASSERT_OK(_insert_data_into_ts_record_by_name_string(record, text_measurement_id,
test_text),
"insert data failed");

// Insert date data - NOTE: There's a bug here, should use date_measurement_id
int32_t test_date = 20251118;
ASSERT_OK(_insert_data_into_ts_record_by_name_int32_t(record, date_measurement_id,
test_date),
"insert data failed");

// Write the record to file and close writer
ASSERT_OK(_tsfile_writer_write_ts_record(writer, record),
"write record failed");
ASSERT_OK(_tsfile_writer_flush(writer), "flush failed");
ASSERT_OK(_tsfile_writer_close(writer), "close writer failed");

// Create reader to verify the written data
auto* reader = (storage::TsFileReader*)tsfile_reader_new(filename, &code);
ASSERT_OK(code, "create reader failed");

// Query the data we just wrote
char* sensors[] = {str_measurement_id, text_measurement_id, date_measurement_id};
auto* result = (storage::ResultSet*)_tsfile_reader_query_device(
reader, device_id, sensors, 3, 0, 100, &code);
ASSERT_OK(code, "query device failed");

// Verify the retrieved data matches what we inserted
bool has_next = false;
int row_count = 0;
while (!result->next(has_next) && has_next) {
// Verify timestamp
EXPECT_EQ(result->get_value<int64_t>(1), row_count);

// Verify string data
const common::String* str = result->get_value<common::String*>(2);
EXPECT_EQ(strlen(test_str), str->len_);
const char* ret_char = tsfile_result_set_get_value_by_index_string(result, 2);
EXPECT_EQ(strcmp(test_str, ret_char), 0);

// Verify text data
const common::String* text = result->get_value<common::String*>(3);
EXPECT_EQ(strlen(test_text), text->len_);
const char* ret_text = tsfile_result_set_get_value_by_index_string(result, 3);
EXPECT_EQ(strcmp(test_text, ret_text), 0);

// Verify date data
int32_t ret_date = tsfile_result_set_get_value_by_index_int32_t(result, 4);
EXPECT_EQ(test_date, ret_date);

row_count++;
}

ASSERT_OK(reader->close(), "close reader failed");
}

TEST_F(CWrapperTest, WriterFlushTabletAndReadData) {
ERRNO code = 0;
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from setuptools import setup, Extension
from setuptools.command.build_ext import build_ext

version = "2.1.0.dev0"
version = "2.2.0.dev"
system = platform.system()


Expand Down
46 changes: 39 additions & 7 deletions python/tests/test_write_and_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,48 @@

def test_row_record_write_and_read():
try:
if os.path.exists("record_write_and_read.tsfile"):
os.remove("record_write_and_read.tsfile")
writer = TsFileWriter("record_write_and_read.tsfile")
timeseries = TimeseriesSchema("level1", TSDataType.INT64)
writer.register_timeseries("root.device1", timeseries)
writer.register_timeseries("root.device1", TimeseriesSchema("level1", TSDataType.INT64))
writer.register_timeseries("root.device1", TimeseriesSchema("level2", TSDataType.DOUBLE))
writer.register_timeseries("root.device1", TimeseriesSchema("level3", TSDataType.INT32))
writer.register_timeseries("root.device1", TimeseriesSchema("level4", TSDataType.STRING))
writer.register_timeseries("root.device1", TimeseriesSchema("level5", TSDataType.TEXT))
writer.register_timeseries("root.device1", TimeseriesSchema("level6", TSDataType.BLOB))
writer.register_timeseries("root.device1", TimeseriesSchema("level7", TSDataType.DATE))

max_row_num = 10

max_row_num = 1000
for i in range(max_row_num):
row = RowRecord("root.device1", i,
[Field("level1", i + 1, TSDataType.INT64),
Field("level2", i * 1.1, TSDataType.DOUBLE),
Field("level3", i * 2, TSDataType.INT32)])
Field("level3", i * 2, TSDataType.INT32),
Field("level4", f"string_value_{i}", TSDataType.STRING),
Field("level5", f"text_value_{i}", TSDataType.TEXT),
Field("level6", f"blob_data_{i}".encode('utf-8'), TSDataType.BLOB),
Field("level7", i, TSDataType.DATE)])
writer.write_row_record(row)

writer.close()

reader = TsFileReader("record_write_and_read.tsfile")
result = reader.query_timeseries("root.device1", ["level1", "level2"], 10, 100)
i = 10
result = reader.query_timeseries("root.device1", ["level1", "level2", "level3", "level4", "level5", "level6","level7"], 0, 100)
row_num = 0
while result.next():
print(result.get_value_by_index(1))
#print(result.get_value_by_index(0))
print(f"\n--- 记录 #{row_num} ---")
print(f"索引1的值: {result.get_value_by_index(1)}")
print(f"索引2的值: {result.get_value_by_index(2)}")
print(f"索引3的值: {result.get_value_by_index(3)}")
print(f"索引4的值: {result.get_value_by_index(4)}")
print(f"索引5的值: {result.get_value_by_index(5)}")
print(f"索引6的值: {result.get_value_by_index(6)}")
print(f"索引7的值: {result.get_value_by_index(7)}")
print(f"索引8的值: {result.get_value_by_index(8)}")
assert result.get_value_by_index(2) == row_num + 1
row_num += 1
print(reader.get_active_query_result())
result.close()
print(reader.get_active_query_result())
Expand Down Expand Up @@ -116,6 +137,8 @@ def test_table_writer_and_reader():
[ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)])
try:
if os.path.exists("table_write.tsfile"):
os.remove("table_write.tsfile")
with TsFileTableWriter("table_write.tsfile", table) as writer:
tablet = Tablet(["device", "value"],
[TSDataType.STRING, TSDataType.DOUBLE], 100)
Expand Down Expand Up @@ -305,3 +328,12 @@ def test_tsfile_to_df():
to_dataframe("table_write_to_df.tsfile", "test_table", ["device1"])
finally:
os.remove("table_write_to_df.tsfile")

import os

if __name__ == "__main__":
os.chdir(os.path.dirname(os.path.abspath(__file__)))
pytest.main([
"test_write_and_read.py::test_row_record_write_and_read",
"-s", "-v"
])
2 changes: 1 addition & 1 deletion python/tsfile/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TSDataType(IntEnum):
def to_py_type(self):
if self == TSDataType.BOOLEAN:
return bool
elif self == TSDataType.INT32:
elif self == TSDataType.INT32 or self == TSDataType.DATE:
return int
elif self == TSDataType.INT64:
return int
Expand Down
14 changes: 13 additions & 1 deletion python/tsfile/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def get_int_value(self):

if (
self.data_type != TSDataType.INT32
and self.data_type != TSDataType.DATE
and self.data_type != TSDataType.INT64
and self.data_type != TSDataType.FLOAT
and self.data_type != TSDataType.DOUBLE
Expand Down Expand Up @@ -178,10 +179,21 @@ def get_string_value(self):
return str(self.value)
# BLOB
elif self.data_type == TSDataType.BLOB:
return str(hex(int.from_bytes(self.value, byteorder="big")))
return self.value
else:
return str(self.get_object_value(self.data_type))

def get_bytes_value(self):
if self.value is None:
return None
if self.data_type is None:
raise NoneDataTypeException("None Data Type Exception!")

if self.data_type == TSDataType.BLOB:
return self.value
else:
raise TypeError("get_bytes_value() only supports BLOB data type")

def __str__(self):
return self.get_string_value()

Expand Down
Loading
Loading