Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions cpp/src/common/tsblock/tsblock.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ class ColAppender {
}
return E_OK;
}
FORCE_INLINE int fill(const char *value, uint32_t len,
uint32_t end_index) {
FORCE_INLINE int fill(const char *value, uint32_t len, uint32_t end_index) {
while (column_row_count_ < end_index) {
if (!add_row()) {
return E_INVALID_ARG;
Expand Down Expand Up @@ -258,6 +257,13 @@ class RowIterator {
}
}

FORCE_INLINE void next(size_t ind) const {
ASSERT(row_id_ < tsblock_->row_count_);
tsblock_->vectors_[ind]->update_offset();
}

FORCE_INLINE void update_row_id() { row_id_++; }

FORCE_INLINE char *read(uint32_t column_index, uint32_t *__restrict len,
bool *__restrict null) {
ASSERT(column_index < column_count_);
Expand Down Expand Up @@ -287,8 +293,10 @@ class ColIterator {
FORCE_INLINE bool end() const { return row_id_ >= tsblock_->row_count_; }

FORCE_INLINE void next() {
if (!vec_->is_null(row_id_)) {
vec_->update_offset();
}
++row_id_;
vec_->update_offset();
}

FORCE_INLINE bool has_null() { return vec_->has_null(); }
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/common/tsblock/tuple_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ class TupleDesc {
return column_list_[index].data_type_;
}

FORCE_INLINE common::ColumnCategory get_column_category(
const uint32_t index) const {
return column_list_[index].column_category_;
}

FORCE_INLINE std::string get_column_name(uint32_t index) {
return column_list_[index].column_name_;
}
Expand Down
21 changes: 11 additions & 10 deletions cpp/src/reader/aligned_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ int AlignedChunkReader::read_from_file_and_rewrap(
int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
int read_size =
(want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
if (file_data_buf_size < read_size || (may_shrink && read_size < file_data_buf_size / 10)) {
if (file_data_buf_size < read_size ||
(may_shrink && read_size < file_data_buf_size / 10)) {
file_data_buf = (char *)mem_realloc(file_data_buf, read_size);
if (IS_NULL(file_data_buf)) {
return E_OOM;
Expand Down Expand Up @@ -366,7 +367,6 @@ int AlignedChunkReader::decode_cur_time_page_data() {
uint32_t time_compressed_buf_size = 0;
uint32_t time_uncompressed_buf_size = 0;


// Step 2: do uncompress
if (IS_SUCC(ret)) {
time_compressed_buf =
Expand Down Expand Up @@ -519,9 +519,9 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
uint32_t mask = 1 << 7; \
int64_t time = 0; \
CppType value; \
while ((time_decoder_->has_remaining() || time_in.has_remaining()) \
&& (value_decoder_->has_remaining() || \
value_in.has_remaining())){ \
while ( \
(time_decoder_->has_remaining() || time_in.has_remaining()) && \
(value_decoder_->has_remaining() || value_in.has_remaining())) { \
cur_value_index++; \
if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & \
0xFF) & \
Expand All @@ -530,16 +530,17 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
if (ret != E_OK) { \
break; \
} \
ret = value_decoder_->read_##ReadType(value, \
value_in); \
if (ret != E_OK) { \
if (UNLIKELY(!row_appender.add_row())) { \
ret = E_OVERFLOW; \
break; \
} \
row_appender.append(0, (char *)&time, sizeof(time)); \
row_appender.append_null(1); \
continue; \
} \
if (UNLIKELY(!row_appender.add_row())) { \
ret = E_OVERFLOW; \
cur_value_index--; \
cur_value_index--; \
break; \
} else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) { \
} else if (RET_FAIL(value_decoder_->read_##ReadType(value, \
Expand All @@ -549,7 +550,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
continue; \
} else { \
/*std::cout << "decoder: time=" << time << ", value=" << value \
* << std::endl;*/ \
* << std::endl;*/ \
row_appender.append(0, (char *)&time, sizeof(time)); \
row_appender.append(1, (char *)&value, sizeof(value)); \
} \
Expand Down
34 changes: 26 additions & 8 deletions cpp/src/reader/block/single_device_tsblock_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ SingleDeviceTsBlockReader::SingleDeviceTsBlockReader(
field_filter_(field_filter),
block_size_(block_size),
tuple_desc_(),
tsfile_io_reader_(tsfile_io_reader) {
}
tsfile_io_reader_(tsfile_io_reader) {}

int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task,
uint32_t block_size, Filter* time_filter,
Expand Down Expand Up @@ -63,9 +62,9 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task,
->get_measurement_columns()
.size());
if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes(
device_query_task->get_device_id(),
device_query_task->get_column_mapping()->get_measurement_columns(),
time_series_indexs, pa_))) {
device_query_task->get_device_id(),
device_query_task->get_column_mapping()->get_measurement_columns(),
time_series_indexs, pa_))) {
return ret;
}
for (const auto& time_series_index : time_series_indexs) {
Expand Down Expand Up @@ -171,6 +170,21 @@ int SingleDeviceTsBlockReader::fill_measurements(
break;
}
}

// Align all columns, filling with nulls where data is missing.
uint32_t row_count =
col_appenders_[time_column_index_]->get_col_row_count();
for (auto& col_appender : col_appenders_) {
if (tuple_desc_.get_column_category(
col_appender->get_column_index()) !=
common::ColumnCategory::FIELD) {
continue;
}
while (col_appender->get_col_row_count() < row_count) {
col_appender->add_row();
col_appender->append_null();
}
}
}
return ret;
}
Expand Down Expand Up @@ -366,8 +380,8 @@ int SingleMeasurementColumnContext::get_current_value(char*& value,
if (value_iter_->end()) {
return common::E_NO_MORE_DATA;
}
value = value_iter_->read(&len);
assert(value != nullptr);
bool is_null = false;
value = value_iter_->read(&len, &is_null);
return common::E_OK;
}

Expand All @@ -392,7 +406,11 @@ void SingleMeasurementColumnContext::fill_into(
}
for (int32_t pos : pos_in_result_) {
col_appenders[pos + 1]->add_row();
col_appenders[pos + 1]->append(val, len);
if (val == nullptr) {
col_appenders[pos + 1]->append_null();
} else {
col_appenders[pos + 1]->append(val, len);
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/reader/table_result_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ int TableResultSet::next(bool& has_next) {
if (!null) {
row_record_->get_field(i)->set_value(row_iterator_->get_data_type(i),
value, len, pa_);
row_iterator_->next(i);
}
}
row_iterator_->next();
row_iterator_->update_row_id();
}
return ret;
}
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/writer/time_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,9 @@ int64_t TimeChunkWriter::estimate_max_series_mem_size() {
time_page_writer_.get_statistic()->get_type());
}

bool TimeChunkWriter::hasData() {
return num_of_pages_ > 0 || (time_page_writer_.get_statistic() != nullptr &&
time_page_writer_.get_statistic()->count_ > 0);
}

} // end namespace storage
4 changes: 4 additions & 0 deletions cpp/src/writer/time_chunk_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

namespace storage {

// TODO: TimeChunkWriter, ValueChunkWriter, ChunkWriter can be further
// abstracted.
class TimeChunkWriter {
public:
static const int32_t PAGES_DATA_PAGE_SIZE = 1024;
Expand Down Expand Up @@ -68,6 +70,8 @@ class TimeChunkWriter {

int64_t estimate_max_series_mem_size();

bool hasData();

private:
FORCE_INLINE bool is_cur_page_full() const {
// FIXME
Expand Down
8 changes: 7 additions & 1 deletion cpp/src/writer/tsfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ int TsFileWriter::do_check_and_prepare_tablet(Tablet &tablet) {
if (col_index == -1) {
return E_COLUMN_NOT_EXIST;
}
if (table_schema->get_data_types()[col_index] != tablet.schema_vec_->at(i).data_type_) {
if (table_schema->get_data_types()[col_index] !=
tablet.schema_vec_->at(i).data_type_) {
return E_TYPE_NOT_MATCH;
}
const common::ColumnCategory column_category =
Expand Down Expand Up @@ -1055,6 +1056,11 @@ int TsFileWriter::flush() {

bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
bool is_aligned) {
if (chunk_group->is_aligned_ &&
chunk_group->time_chunk_writer_ != nullptr &&
chunk_group->time_chunk_writer_->hasData()) {
return false;
}
MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
ms_iter++) {
Expand Down
Loading