Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_version = pschema.version();
_is_partial_update = pschema.partial_update();
_is_strict_mode = pschema.is_strict_mode();
_timestamp_ms = pschema.timestamp_ms();
_timezone = pschema.timezone();

for (auto& col : pschema.partial_update_input_columns()) {
_partial_update_input_columns.insert(col);
Expand Down Expand Up @@ -207,6 +209,8 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_version(_version);
pschema->set_partial_update(_is_partial_update);
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class OlapTableSchemaParam {
std::set<std::string> partial_update_input_columns() const {
return _partial_update_input_columns;
}
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
int64_t timestamp_ms() const { return _timestamp_ms; }
void set_timezone(std::string timezone) { _timezone = timezone; }
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
std::string debug_string() const;

Expand All @@ -104,6 +108,8 @@ class OlapTableSchemaParam {
bool _is_partial_update = false;
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
int64_t _timestamp_ms = 0;
std::string _timezone;
};

using OlapTableIndexTablets = TOlapTableIndexTablets;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,8 @@ void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode());
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone());
}

void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ namespace doris {

struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode) {
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
this->timestamp_ms = timestamp_ms;
this->timezone = timezone;
missing_cids.clear();
update_cids.clear();
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
Expand All @@ -50,5 +53,7 @@ struct PartialUpdateInfo {
// to generate a new row, only available in non-strict mode
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
int64_t timestamp_ms {0};
std::string timezone;
};
} // namespace doris
16 changes: 15 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@
#include "util/faststring.h"
#include "util/key_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/columns_number.h"
#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/io/reader_buffer.h"
#include "vec/jsonb/serialize.h"
#include "vec/olap/olap_data_convertor.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris {
namespace segment_v2 {
Expand Down Expand Up @@ -648,7 +650,19 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
for (auto i = 0; i < cids_missing.size(); ++i) {
const auto& column = _tablet_schema->column(cids_missing[i]);
if (column.has_default_value()) {
auto default_value = _tablet_schema->column(cids_missing[i]).default_value();
std::string default_value;
if (UNLIKELY(_tablet_schema->column(cids_missing[i]).type() ==
FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
to_lower(_tablet_schema->column(cids_missing[i]).default_value())
.find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> dtv;
dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
_opts.rowset_ctx->partial_update_info->timezone);
default_value = dtv.debug_string();
} else {
default_value = _tablet_schema->column(cids_missing[i]).default_value();
}
vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
default_value.size());
old_value_block.get_by_position(i).type->from_string(
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/sink/vtablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,8 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));

_state = state;
_schema->set_timestamp_ms(_state->timestamp_ms());
_schema->set_timezone(_state->timezone());

_sender_id = state->per_fragment_instance_idx();
_num_senders = state->num_per_fragment_instances();
Expand Down
2 changes: 2 additions & 0 deletions gensrc/proto/descriptors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,7 @@ message POlapTableSchemaParam {
optional bool partial_update = 7 [default = false];
repeated string partial_update_input_columns = 8;
optional bool is_strict_mode = 9 [default = false];
optional int64 timestamp_ms = 11 [default = 0];
optional string timezone = 12;
};

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
3 "stranger" 500 \N 4321
4 "foreigner" 600 \N 4321

-- !select_timestamp --
1

-- !select_default --
1 doris 200 123 1
2 doris2 400 223 1
Expand Down Expand Up @@ -71,3 +74,6 @@
3 "stranger" 500 \N 4321
4 "foreigner" 600 \N 4321

-- !select_timestamp --
1

Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,25 @@ suite("test_primary_key_partial_update", "p0") {

// drop drop
sql """ DROP TABLE IF EXISTS ${tableName} """

sql """ CREATE TABLE ${tableName} (
`name` VARCHAR(600) NULL,
`userid` INT NOT NULL,
`seq` BIGINT NOT NULL DEFAULT "1",
`ctime` DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
`rtime` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`corp_name` VARCHAR(600) NOT NULL
) ENGINE = OLAP UNIQUE KEY(`name`, `userid`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`name`) BUCKETS 10
PROPERTIES ("replication_num" = "1",
"enable_unique_key_merge_on_write" = "true",
"store_row_column" = "${use_row_store}"); """

sql "set enable_unique_key_partial_update=true;"
sql "set enable_insert_strict=false;"

sql "INSERT INTO ${tableName}(`name`, `userid`, `corp_name`) VALUES ('test1', 1234567, 'A');"

qt_select_timestamp "select count(*) from ${tableName} where `ctime` > \"1970-01-01\""
}
}
}