diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h index 4316e8f63..a0e94391b 100644 --- a/cpp/src/common/tsblock/tsblock.h +++ b/cpp/src/common/tsblock/tsblock.h @@ -211,8 +211,7 @@ class ColAppender { } return E_OK; } - FORCE_INLINE int fill(const char *value, uint32_t len, - uint32_t end_index) { + FORCE_INLINE int fill(const char *value, uint32_t len, uint32_t end_index) { while (column_row_count_ < end_index) { if (!add_row()) { return E_INVALID_ARG; @@ -258,6 +257,13 @@ class RowIterator { } } + FORCE_INLINE void next(size_t ind) const { + ASSERT(row_id_ < tsblock_->row_count_); + tsblock_->vectors_[ind]->update_offset(); + } + + FORCE_INLINE void update_row_id() { row_id_++; } + FORCE_INLINE char *read(uint32_t column_index, uint32_t *__restrict len, bool *__restrict null) { ASSERT(column_index < column_count_); @@ -287,8 +293,10 @@ class ColIterator { FORCE_INLINE bool end() const { return row_id_ >= tsblock_->row_count_; } FORCE_INLINE void next() { + if (!vec_->is_null(row_id_)) { + vec_->update_offset(); + } ++row_id_; - vec_->update_offset(); } FORCE_INLINE bool has_null() { return vec_->has_null(); } diff --git a/cpp/src/common/tsblock/tuple_desc.h b/cpp/src/common/tsblock/tuple_desc.h index 98bd341d5..85ba13097 100644 --- a/cpp/src/common/tsblock/tuple_desc.h +++ b/cpp/src/common/tsblock/tuple_desc.h @@ -71,6 +71,11 @@ class TupleDesc { return column_list_[index].data_type_; } + FORCE_INLINE common::ColumnCategory get_column_category( + const uint32_t index) const { + return column_list_[index].column_category_; + } + FORCE_INLINE std::string get_column_name(uint32_t index) { return column_list_[index].column_name_; } diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index 230661f3b..5e1bbe439 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -302,7 +302,8 @@ int AlignedChunkReader::read_from_file_and_rewrap( int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset; int read_size = (want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size); - if (file_data_buf_size < read_size || (may_shrink && read_size < file_data_buf_size / 10)) { + if (file_data_buf_size < read_size || + (may_shrink && read_size < file_data_buf_size / 10)) { file_data_buf = (char *)mem_realloc(file_data_buf, read_size); if (IS_NULL(file_data_buf)) { return E_OOM; @@ -366,7 +367,6 @@ int AlignedChunkReader::decode_cur_time_page_data() { uint32_t time_compressed_buf_size = 0; uint32_t time_uncompressed_buf_size = 0; - // Step 2: do uncompress if (IS_SUCC(ret)) { time_compressed_buf = @@ -519,9 +519,9 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( uint32_t mask = 1 << 7; \ int64_t time = 0; \ CppType value; \ - while ((time_decoder_->has_remaining() || time_in.has_remaining()) \ - && (value_decoder_->has_remaining() || \ - value_in.has_remaining())){ \ + while ( \ + (time_decoder_->has_remaining() || time_in.has_remaining()) && \ + (value_decoder_->has_remaining() || value_in.has_remaining())) { \ cur_value_index++; \ if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & \ 0xFF) & \ @@ -530,16 +530,17 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( if (ret != E_OK) { \ break; \ } \ - ret = value_decoder_->read_##ReadType(value, \ - value_in); \ - if (ret != E_OK) { \ + if (UNLIKELY(!row_appender.add_row())) { \ + ret = E_OVERFLOW; \ break; \ } \ + row_appender.append(0, (char *)&time, sizeof(time)); \ + row_appender.append_null(1); \ continue; \ } \ if (UNLIKELY(!row_appender.add_row())) { \ ret = E_OVERFLOW; \ - cur_value_index--; \ + cur_value_index--; \ break; \ } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) { \ } else if (RET_FAIL(value_decoder_->read_##ReadType(value, \ @@ -549,7 +550,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( continue; \ } else { \ /*std::cout << "decoder: time=" << time << ", value=" << value \ - * << std::endl;*/ \ + * << std::endl;*/ \ row_appender.append(0, (char *)&time, sizeof(time)); \ row_appender.append(1, (char *)&value, sizeof(value)); \ } \ diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc b/cpp/src/reader/block/single_device_tsblock_reader.cc index 507597c01..1df563cd8 100644 --- a/cpp/src/reader/block/single_device_tsblock_reader.cc +++ b/cpp/src/reader/block/single_device_tsblock_reader.cc @@ -29,8 +29,7 @@ SingleDeviceTsBlockReader::SingleDeviceTsBlockReader( field_filter_(field_filter), block_size_(block_size), tuple_desc_(), - tsfile_io_reader_(tsfile_io_reader) { -} + tsfile_io_reader_(tsfile_io_reader) {} int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task, uint32_t block_size, Filter* time_filter, @@ -63,9 +62,9 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task, ->get_measurement_columns() .size()); if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes( - device_query_task->get_device_id(), - device_query_task->get_column_mapping()->get_measurement_columns(), - time_series_indexs, pa_))) { + device_query_task->get_device_id(), + device_query_task->get_column_mapping()->get_measurement_columns(), + time_series_indexs, pa_))) { return ret; } for (const auto& time_series_index : time_series_indexs) { @@ -171,6 +170,21 @@ int SingleDeviceTsBlockReader::fill_measurements( break; } } + + // Align all columns, filling with nulls where data is missing. + uint32_t row_count = + col_appenders_[time_column_index_]->get_col_row_count(); + for (auto& col_appender : col_appenders_) { + if (tuple_desc_.get_column_category( + col_appender->get_column_index()) != + common::ColumnCategory::FIELD) { + continue; + } + while (col_appender->get_col_row_count() < row_count) { + col_appender->add_row(); + col_appender->append_null(); + } + } } return ret; } @@ -366,8 +380,8 @@ int SingleMeasurementColumnContext::get_current_value(char*& value, if (value_iter_->end()) { return common::E_NO_MORE_DATA; } - value = value_iter_->read(&len); - assert(value != nullptr); + bool is_null = false; + value = value_iter_->read(&len, &is_null); return common::E_OK; } @@ -392,7 +406,11 @@ void SingleMeasurementColumnContext::fill_into( } for (int32_t pos : pos_in_result_) { col_appenders[pos + 1]->add_row(); - col_appenders[pos + 1]->append(val, len); + if (val == nullptr) { + col_appenders[pos + 1]->append_null(); + } else { + col_appenders[pos + 1]->append(val, len); + } } } diff --git a/cpp/src/reader/table_result_set.cc b/cpp/src/reader/table_result_set.cc index 396913e9b..01c3b2397 100644 --- a/cpp/src/reader/table_result_set.cc +++ b/cpp/src/reader/table_result_set.cc @@ -74,9 +74,10 @@ int TableResultSet::next(bool& has_next) { if (!null) { row_record_->get_field(i)->set_value(row_iterator_->get_data_type(i), value, len, pa_); + row_iterator_->next(i); } } - row_iterator_->next(); + row_iterator_->update_row_id(); } return ret; } diff --git a/cpp/src/writer/time_chunk_writer.cc b/cpp/src/writer/time_chunk_writer.cc index 892c0d1c1..81fafc5a1 100644 --- a/cpp/src/writer/time_chunk_writer.cc +++ b/cpp/src/writer/time_chunk_writer.cc @@ -191,4 +191,9 @@ int64_t TimeChunkWriter::estimate_max_series_mem_size() { time_page_writer_.get_statistic()->get_type()); } +bool TimeChunkWriter::hasData() { + return num_of_pages_ > 0 || (time_page_writer_.get_statistic() != nullptr && + time_page_writer_.get_statistic()->count_ > 0); +} + } // end namespace storage diff --git a/cpp/src/writer/time_chunk_writer.h b/cpp/src/writer/time_chunk_writer.h index e03b264c2..aff8e2af0 100644 --- a/cpp/src/writer/time_chunk_writer.h +++ b/cpp/src/writer/time_chunk_writer.h @@ -28,6 +28,8 @@ namespace storage { +// TODO: TimeChunkWriter, ValueChunkWriter, ChunkWriter can be further +// abstracted. class TimeChunkWriter { public: static const int32_t PAGES_DATA_PAGE_SIZE = 1024; @@ -68,6 +70,8 @@ class TimeChunkWriter { int64_t estimate_max_series_mem_size(); + bool hasData(); + private: FORCE_INLINE bool is_cur_page_full() const { // FIXME diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index e9a1162a9..73e4543e8 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -313,7 +313,8 @@ int TsFileWriter::do_check_and_prepare_tablet(Tablet &tablet) { if (col_index == -1) { return E_COLUMN_NOT_EXIST; } - if (table_schema->get_data_types()[col_index] != tablet.schema_vec_->at(i).data_type_) { + if (table_schema->get_data_types()[col_index] != + tablet.schema_vec_->at(i).data_type_) { return E_TYPE_NOT_MATCH; } const common::ColumnCategory column_category = @@ -1055,6 +1056,11 @@ int TsFileWriter::flush() { bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group, bool is_aligned) { + if (chunk_group->is_aligned_ && + chunk_group->time_chunk_writer_ != nullptr && + chunk_group->time_chunk_writer_->hasData()) { + return false; + } MeasurementSchemaMap &map = chunk_group->measurement_schema_map_; for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end(); ms_iter++) { diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index b1ef896a9..d2f6c1c20 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -32,7 +32,7 @@ using namespace storage; using namespace common; class TsFileWriterTableTest : public ::testing::Test { -protected: + protected: void SetUp() override { libtsfile_init(); file_name_ = std::string("tsfile_writer_table_test_") + @@ -49,7 +49,7 @@ class TsFileWriterTableTest : public ::testing::Test { std::string file_name_; WriteFile write_file_; -public: + public: static std::string generate_random_string(int length) { std::random_device rd; std::mt19937 gen(rd()); @@ -101,7 +101,8 @@ class TsFileWriterTableTest : public ::testing::Test { for (int i = 0; i < device_num; i++) { PageArena pa; pa.init(512, MOD_DEFAULT); - std::string device_str = std::string("device_id_") + std::to_string(i); + std::string device_str = + std::string("device_id_") + std::to_string(i); String literal_str(device_str, pa); for (int l = 0; l < num_timestamp_per_device; l++) { int row_index = i * num_timestamp_per_device + l; @@ -450,7 +451,7 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) { cur_line++; int64_t timestamp = table_result_set->get_value("time"); ASSERT_EQ(table_result_set->get_value("device") - ->to_std_string(), + ->to_std_string(), "device" + std::to_string(timestamp)); ASSERT_EQ(table_result_set->get_value("VaLue"), timestamp * 1.1); @@ -497,7 +498,7 @@ TEST_F(TsFileWriterTableTest, DuplicateColumnName) { ASSERT_EQ(E_INVALID_ARG, tsfile_table_writer->write_table(tablet)); ASSERT_EQ(E_INVALID_ARG, tsfile_table_writer->register_table( - std::make_shared(*table_schema))); + std::make_shared(*table_schema))); delete table_schema; } @@ -637,8 +638,8 @@ TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) { TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { common::config_set_max_degree_of_index_node(5); auto table_schema = gen_table_schema(0, 1, 100); - auto tsfile_table_writer_ = std::make_shared( - &write_file_, table_schema); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); int num_row_per_device = 10; auto tablet = gen_tablet(table_schema, 0, 100, num_row_per_device); ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); @@ -651,23 +652,23 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { ResultSet* tmp_result_set = nullptr; ret = reader.query(table_schema->get_table_name(), - table_schema->get_measurement_names(), 0, - INT32_MAX, tmp_result_set); + table_schema->get_measurement_names(), 0, INT32_MAX, + tmp_result_set); auto* table_result_set = (TableResultSet*)tmp_result_set; bool has_next = false; int64_t row_num = 0; auto result_set_meta = table_result_set->get_metadata(); ASSERT_EQ(result_set_meta->get_column_count(), - table_schema->get_columns_num() + 1); // +1: time column + table_schema->get_columns_num() + 1); // +1: time column while (IS_SUCC(table_result_set->next(has_next)) && has_next) { auto column_schemas = table_schema->get_measurement_schemas(); - std::string tag_col_val; // "device_id_[num]" + std::string tag_col_val; // "device_id_[num]" std::string tag_col_val_prefix = "device_id_"; for (const auto& column_schema : column_schemas) { switch (column_schema->data_type_) { case TSDataType::INT64: if (!table_result_set->is_null( - column_schema->measurement_name_)) { + column_schema->measurement_name_)) { std::string num = tag_col_val.substr( tag_col_val_prefix.length(), tag_col_val.length() - tag_col_val_prefix.length()); @@ -677,8 +678,10 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { } break; case TSDataType::STRING: - tag_col_val = table_result_set->get_value( - column_schema->measurement_name_)->to_std_string(); + tag_col_val = table_result_set + ->get_value( + column_schema->measurement_name_) + ->to_std_string(); default: break; } @@ -690,3 +693,81 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { ASSERT_EQ(reader.close(), common::E_OK); delete table_schema; } + +TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) { + std::vector measurement_schemas; + std::vector column_categories; + for (int i = 0; i < 3; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "id" + std::to_string(i), TSDataType::STRING)); + column_categories.emplace_back(ColumnCategory::TAG); + } + measurement_schemas.emplace_back(new MeasurementSchema("value", DOUBLE)); + measurement_schemas.emplace_back(new MeasurementSchema("value1", INT32)); + column_categories.emplace_back(ColumnCategory::FIELD); + column_categories.emplace_back(ColumnCategory::FIELD); + auto table_schema = + new TableSchema("testTable", measurement_schemas, column_categories); + auto tsfile_table_writer = + std::make_shared(&write_file_, table_schema); + int time = 0; + Tablet tablet = Tablet(table_schema->get_measurement_names(), + table_schema->get_data_types(), 100); + + for (int i = 0; i < 100; i++) { + tablet.add_timestamp(i, static_cast(time++)); + tablet.add_value(i, 0, "tag1"); + tablet.add_value(i, 1, "tag2"); + if (i % 3 == 0) { + // all device has no data + tablet.add_value(i, 2, "tag_null"); + } else { + tablet.add_value(i, 2, "tag3"); + tablet.add_value(i, 3, 100.0f); + if (i % 5 == 0) { + tablet.add_value(i, 4, 100); + } + } + } + tsfile_table_writer->write_table(tablet); + tsfile_table_writer->flush(); + tsfile_table_writer->close(); + + delete table_schema; + + auto reader = TsFileReader(); + reader.open(write_file_.get_file_path()); + ResultSet* ret = nullptr; + int ret_value = reader.query( + "testTable", {"id0", "id1", "id2", "value", "value1"}, 0, 100, ret); + ASSERT_EQ(common::E_OK, ret_value); + + auto table_result_set = (TableResultSet*)ret; + bool has_next = false; + int cur_line = 0; + auto schema = table_result_set->get_metadata(); + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + int64_t timestamp = table_result_set->get_value(1); + ASSERT_EQ(common::String("tag1"), + *table_result_set->get_value(2)); + ASSERT_EQ(common::String("tag2"), + *table_result_set->get_value(3)); + if (timestamp % 3 == 0) { + ASSERT_EQ(common::String("tag_null"), + *table_result_set->get_value(4)); + ASSERT_TRUE(table_result_set->is_null(5)); + ASSERT_TRUE(table_result_set->is_null(6)); + } else { + ASSERT_EQ(common::String("tag3"), + *table_result_set->get_value(4)); + ASSERT_EQ(100.0f, table_result_set->get_value(5)); + if (timestamp % 5 == 0) { + ASSERT_EQ(100, table_result_set->get_value(6)); + } else { + ASSERT_TRUE(table_result_set->is_null(6)); + } + } + } + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); +}