Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ class TableSchema {
}
}

size_t get_column_pos_index_num() const {
return column_pos_index_.size();
}

void update(ChunkGroupMeta *chunk_group_meta) {
for (auto iter = chunk_group_meta->chunk_meta_list_.begin();
iter != chunk_group_meta->chunk_meta_list_.end(); iter++) {
Expand Down
16 changes: 13 additions & 3 deletions cpp/src/common/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ int Tablet::init() {
std::pair<std::map<std::string, int>::iterator, bool> ins_res;
for (size_t c = 0; c < schema_count; c++) {
ins_res = schema_map_.insert(
std::make_pair(schema_vec_->at(c).measurement_name_, c));
std::make_pair(to_lower(schema_vec_->at(c).measurement_name_), c));
if (!ins_res.second) {
ASSERT(false);
// maybe dup measurement_name
return E_INVALID_ARG;
}
Expand Down Expand Up @@ -131,6 +130,9 @@ void Tablet::destroy() {
}

int Tablet::add_timestamp(uint32_t row_index, int64_t timestamp) {
if (err_code_ != E_OK) {
return err_code_;
}
ASSERT(timestamps_ != NULL);
if (UNLIKELY(row_index >= static_cast<uint32_t>(max_row_num_))) {
ASSERT(false);
Expand Down Expand Up @@ -223,6 +225,9 @@ void Tablet::process_val(uint32_t row_index, uint32_t schema_index, T val) {

template <typename T>
int Tablet::add_value(uint32_t row_index, uint32_t schema_index, T val) {
if (err_code_ != E_OK) {
return err_code_;
}
int ret = common::E_OK;
if (UNLIKELY(schema_index >= schema_vec_->size())) {
ASSERT(false);
Expand Down Expand Up @@ -250,6 +255,9 @@ int Tablet::add_value(uint32_t row_index, uint32_t schema_index, T val) {
template <>
int Tablet::add_value(uint32_t row_index, uint32_t schema_index,
common::String val) {
if (err_code_ != E_OK) {
return err_code_;
}
int ret = common::E_OK;
if (UNLIKELY(schema_index >= schema_vec_->size())) {
ASSERT(false);
Expand All @@ -269,9 +277,11 @@ template <typename T>
int Tablet::add_value(uint32_t row_index, const std::string &measurement_name,
T val) {
int ret = common::E_OK;
if (err_code_ != E_OK) {
return err_code_;
}
SchemaMapIterator find_iter = schema_map_.find(measurement_name);
if (LIKELY(find_iter == schema_map_.end())) {
ASSERT(false);
ret = E_INVALID_ARG;
} else {
ret = add_value(row_index, find_iter->second, val);
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/common/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Tablet {

public:
static const uint32_t DEFAULT_MAX_ROWS = 1024;
int err_code_ = common::E_OK;

public:
Tablet(const std::string &device_id,
Expand All @@ -75,7 +76,7 @@ class Tablet {
ASSERT(false);
max_row_num_ = DEFAULT_MAX_ROWS;
}
init();
err_code_ = init();
}

Tablet(const std::string &device_id,
Expand Down Expand Up @@ -106,7 +107,7 @@ class Tablet {
return MeasurementSchema(name, type);
});
schema_vec_ = std::make_shared<std::vector<MeasurementSchema>>(measurement_vec);
init();
err_code_ = init();
}

Tablet(const std::string &insert_target_name,
Expand All @@ -127,7 +128,7 @@ class Tablet {
common::get_default_compressor()));
}
set_column_categories(column_categories);
init();
err_code_ = init();
}

/**
Expand All @@ -150,10 +151,10 @@ class Tablet {
schema_vec_ = std::make_shared<std::vector<MeasurementSchema>>();
for (size_t i = 0; i < column_names.size(); i++) {
schema_vec_->emplace_back(
MeasurementSchema(column_names[i], data_types[i], common::get_value_encoder(data_types[i]),
common::get_default_compressor()));
column_names[i], data_types[i], common::get_value_encoder(data_types[i]),
common::get_default_compressor());
}
init();
err_code_ = init();
}

~Tablet() { destroy(); }
Expand Down
7 changes: 3 additions & 4 deletions cpp/src/cwrapper/tsfile_cwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Tablet tablet_new(char **column_name_list, TSDataType *data_types,
std::vector<std::string> measurement_list;
std::vector<common::TSDataType> data_type_list;
for (uint32_t i = 0; i < column_num; i++) {
measurement_list.emplace_back(column_name_list[i]);
measurement_list.emplace_back(storage::to_lower(column_name_list[i]));
data_type_list.push_back(
static_cast<common::TSDataType>(*(data_types + i)));
}
Expand All @@ -196,7 +196,7 @@ ERRNO tablet_add_timestamp(Tablet tablet, uint32_t row_index,
const char *column_name, \
const type value) { \
return static_cast<storage::Tablet *>(tablet)->add_value( \
row_index, column_name, value); \
row_index, storage::to_lower(column_name), value); \
}
TABLET_ADD_VALUE_BY_NAME_DEF(int32_t);
TABLET_ADD_VALUE_BY_NAME_DEF(int64_t);
Expand All @@ -208,7 +208,7 @@ ERRNO tablet_add_value_by_name_string(Tablet tablet, uint32_t row_index,
const char *column_name,
const char *value) {
return static_cast<storage::Tablet *>(tablet)->add_value(
row_index, column_name, common::String(value));
row_index, storage::to_lower(column_name), common::String(value));
}

#define TABLE_ADD_VALUE_BY_INDEX_DEF(type) \
Expand Down Expand Up @@ -688,7 +688,6 @@ ERRNO _tsfile_writer_flush(TsFileWriter writer) {
return w->flush();
}


ResultSet _tsfile_reader_query_device(TsFileReader reader,
const char *device_name,
char **sensor_name, uint32_t sensor_num,
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/writer/tsfile_table_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ int storage::TsFileTableWriter::register_table(const std::shared_ptr<TableSchema
}

int storage::TsFileTableWriter::write_table(storage::Tablet& tablet) const {
// DIRTY CODE...
if (common::E_OK != error_number) {
return error_number;
}
if (tablet.get_table_name().empty()) {
tablet.set_table_name(exclusive_table_name_);
} else if (!exclusive_table_name_.empty() && tablet.get_table_name() != exclusive_table_name_) {
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/writer/tsfile_table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class TsFileTableWriter {
// Perform a deep copy. The source TableSchema object may be
// stack/heap-allocated.
auto table_schema_ptr = std::make_shared<TableSchema>(*table_schema);
tsfile_writer_->register_table(table_schema_ptr);
error_number = tsfile_writer_->register_table(table_schema_ptr);
exclusive_table_name_ = table_schema->get_table_name();
common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
}
Expand Down Expand Up @@ -106,6 +106,10 @@ class TsFileTableWriter {
// if this TsFile only contains one table, this will be its name, otherwise,
// it will be an empty string
std::string exclusive_table_name_;

// Some errors may not be conveyed during the construction phase, so it's
// necessary to maintain an internal error code.
int error_number = common::E_OK;
};

} // namespace storage
Expand Down
25 changes: 23 additions & 2 deletions cpp/src/writer/tsfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,27 @@ void TsFileWriter::set_generate_table_schema(bool generate_table_schema) {
int TsFileWriter::register_table(
const std::shared_ptr<TableSchema> &table_schema) {
if (!table_schema) return E_INVALID_ARG;

// Empty table name or column name is not allowed.
if (table_schema->get_table_name().empty()) {
return E_INVALID_ARG;
}
for (const auto &name : table_schema->get_measurement_names()) {
if (name.empty()) {
return E_INVALID_ARG;
}
}

// Because it is not possible to return an error code for duplicate name
// checks during the construction phase of TabletSchema, the duplicate name
// check has been moved to the table registration stage.

// TODO: Add Debug INFO if ErrorCode is not enough to describe problems.
if (table_schema->get_column_pos_index_num() !=
table_schema->get_measurement_names().size()) {
return E_INVALID_ARG;
}

if (io_writer_->get_schema()->table_schema_map_.find(
table_schema->get_table_name()) !=
io_writer_->get_schema()->table_schema_map_.end()) {
Expand Down Expand Up @@ -671,7 +692,7 @@ int TsFileWriter::write_tablet_aligned(const Tablet &tablet) {
continue;
}
if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, 0,
tablet.get_cur_row_size()))) {
tablet.get_cur_row_size()))) {
return ret;
}
}
Expand Down Expand Up @@ -764,7 +785,7 @@ int TsFileWriter::write_table(Tablet &tablet) {
continue;
}
if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx,
device_id_end_index_pair.second))) {
device_id_end_index_pair.second))) {
return ret;
}
}
Expand Down
38 changes: 38 additions & 0 deletions cpp/test/writer/table_view/tsfile_writer_table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -351,4 +351,42 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {

reader.close();
delete table_schema;
}

TEST_F(TsFileWriterTableTest, DuplicateColumnName) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
measurement_schemas.resize(3);
measurement_schemas[0] = new MeasurementSchema("device", STRING);
column_categories.emplace_back(ColumnCategory::TAG);
measurement_schemas[1] = new MeasurementSchema("Device", STRING);
column_categories.emplace_back(ColumnCategory::TAG);
measurement_schemas[2] = new MeasurementSchema("value", DOUBLE);
column_categories.emplace_back(ColumnCategory::FIELD);
TableSchema* table_schema =
new TableSchema("test_table", measurement_schemas, column_categories);
auto tsfile_table_writer =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
Tablet tablet = Tablet(table_schema->get_measurement_names(),
table_schema->get_data_types());
tablet.set_table_name("test_table");
ASSERT_EQ(E_INVALID_ARG, tablet.add_timestamp(0, 10));
ASSERT_EQ(E_INVALID_ARG, tablet.add_value(1, 1, 10));
ASSERT_EQ(E_INVALID_ARG, tablet.add_value(1, "test", 10));
std::vector<MeasurementSchema> measurement_schemas2;
for (int i = 0; i < 2; i++) {
measurement_schemas2.push_back(*measurement_schemas[i]);
}
Tablet tablet1 = Tablet(
"test_table",
std::make_shared<std::vector<MeasurementSchema>>(measurement_schemas2));
tablet1.set_table_name("test_table");
ASSERT_EQ(E_INVALID_ARG, tablet1.add_timestamp(0, 10));
ASSERT_EQ(E_INVALID_ARG, tablet1.add_value(1, 1, 10));
ASSERT_EQ(E_INVALID_ARG, tablet1.add_value(1, "test", 10));

ASSERT_EQ(E_INVALID_ARG, tsfile_table_writer->write_table(tablet));
ASSERT_EQ(E_INVALID_ARG, tsfile_table_writer->register_table(
std::make_shared<TableSchema>(*table_schema)));
delete table_schema;
}