Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 67 additions & 33 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ bvar::LatencyRecorder g_stream_load_commit_and_publish_latency_ms("stream_load",

static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
static const std::string CHUNK = "chunked";
static const std::string OFF_MODE = "off_mode";
static const std::string SYNC_MODE = "sync_mode";
static const std::string ASYNC_MODE = "async_mode";

#ifdef BE_TEST
TStreamLoadPutResult k_stream_load_put_result;
Expand Down Expand Up @@ -219,6 +222,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {

LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
<< ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit
<< ", group_commit_mode=" << ctx->group_commit_mode
<< ", HTTP headers=" << req->get_all_headers();
ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();

Expand Down Expand Up @@ -345,6 +349,9 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
int64_t begin_txn_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
if (ctx->group_commit) {
RETURN_IF_ERROR(_check_wal_space(ctx->group_commit_mode, ctx->body_bytes));
}
}

// process put file
Expand Down Expand Up @@ -746,12 +753,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_stream_per_node(stream_per_node);
}
if (ctx->group_commit) {
if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
} else {
// used for wait_internal_group_commit_finish
request.__set_group_commit_mode("sync_mode");
}
request.__set_group_commit_mode(ctx->group_commit_mode);
}

if (!http_req->header(HTTP_COMPUTE_GROUP).empty()) {
Expand Down Expand Up @@ -790,7 +792,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) {
return Status::NotSupported("stream load 2pc is unsupported for mow table");
}
if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (iequal(ctx->group_commit_mode, ASYNC_MODE)) {
// FIXME find a way to avoid chunked stream load write large WALs
size_t content_length = 0;
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
Expand Down Expand Up @@ -861,17 +863,24 @@ void StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContex
}
}

Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
std::shared_ptr<StreamLoadContext> ctx) {
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InvalidArgument(
"group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
Status StreamLoadAction::_check_wal_space(const std::string& group_commit_mode,
int64_t content_length) {
if (iequal(group_commit_mode, ASYNC_MODE) &&
!load_size_smaller_than_wal_limit(content_length)) {
std::stringstream ss;
ss << "There is no space for group commit stream load async WAL. This stream load "
"size is "
<< content_length
<< ". WAL dir info: " << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
LOG(WARNING) << ss.str();
return Status::Error<EXCEEDED_LIMIT>(ss.str());
}
return Status::OK();
}

Status StreamLoadAction::_can_group_commit(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx,
std::string& group_commit_header,
bool& can_group_commit) {
int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
Expand All @@ -882,13 +891,11 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
LOG(WARNING) << ss.str();
return Status::InvalidArgument(ss.str());
}
// allow chunked stream load in flink
auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos;
if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") ||
(content_length == 0 && !is_chunk)) {
if (content_length == 0 && !is_chunk) {
// off_mode and empty
ctx->group_commit = false;
can_group_commit = false;
return Status::OK();
}
if (is_chunk) {
Expand All @@ -905,20 +912,47 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FLEXIBLE_COLUMNS"));
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit &&
!update_mode) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
if (!config::wait_internal_group_commit_finish && !group_commit_header.empty() &&
!ctx->label.empty()) {
return Status::InvalidArgument("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
if (!load_size_smaller_than_wal_limit(content_length)) {
std::stringstream ss;
ss << "There is no space for group commit stream load async WAL. This stream load "
"size is "
<< content_length << ". WAL dir info: "
<< ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
LOG(WARNING) << ss.str();
return Status::Error<EXCEEDED_LIMIT>(ss.str());
}
RETURN_IF_ERROR(_check_wal_space(group_commit_header, content_length));
can_group_commit = true;
}
return Status::OK();
}

Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
std::shared_ptr<StreamLoadContext> ctx) {
std::string group_commit_header = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_header.empty() && !iequal(group_commit_header, SYNC_MODE) &&
!iequal(group_commit_header, ASYNC_MODE) && !iequal(group_commit_header, OFF_MODE)) {
return Status::InvalidArgument(
"group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
group_commit_header = SYNC_MODE;
}

// if group_commit_header is off_mode, we will not use group commit
if (iequal(group_commit_header, OFF_MODE)) {
ctx->group_commit_mode = OFF_MODE;
ctx->group_commit = false;
return Status::OK();
}
bool can_group_commit = false;
RETURN_IF_ERROR(_can_group_commit(req, ctx, group_commit_header, can_group_commit));
if (!can_group_commit) {
ctx->group_commit_mode = OFF_MODE;
ctx->group_commit = false;
} else {
if (!group_commit_header.empty()) {
ctx->group_commit_mode = group_commit_header;
ctx->group_commit = true;
} else {
// use table property to decide group commit or not
ctx->group_commit_mode = "";
ctx->group_commit = false;
}
}
return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions be/src/http/action/stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ class StreamLoadAction : public HttpHandler {
Status _handle(std::shared_ptr<StreamLoadContext> ctx);
Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t file_bytes);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
Status _can_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx,
std::string& group_commit_header, bool& can_group_commit);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
Status _check_wal_space(const std::string& group_commit_mode, int64_t content_length);
Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);

private:
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ std::string StreamLoadContext::to_json() const {
} else {
writer.Key("GroupCommit");
writer.Bool(true);
writer.Key("GroupCommitMode");
writer.String(group_commit_mode.c_str());
writer.Key("LoadId");
writer.String(id.to_string().c_str());
}

// status
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class StreamLoadContext {
TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN;
TFileCompressType::type compress_type = TFileCompressType::UNKNOWN;
bool group_commit = false;
std::string group_commit_mode = "";

std::shared_ptr<MessageBodySink> body_sink;
std::shared_ptr<io::StreamLoadPipe> pipe;
Expand Down
12 changes: 12 additions & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
}
request.__set_request_id(ctx->id.to_thrift());
request.__set_backend_id(_exec_env->cluster_info()->backend_id);
if (ctx->group_commit_mode.empty()) {
request.__set_use_table_group_commit_mode(true);
}

TLoadTxnBeginResult result;
Status status;
Expand Down Expand Up @@ -200,6 +203,15 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
}
return status;
}
if (ctx->group_commit_mode.empty() && result.__isset.table_group_commit_mode) {
auto table_group_commit_mode = result.table_group_commit_mode;
if (iequal(table_group_commit_mode, "async_mode") ||
iequal(table_group_commit_mode, "sync_mode")) {
ctx->group_commit = true;
ctx->group_commit_mode = table_group_commit_mode;
return Status::OK();
}
}
ctx->txn_id = result.txnId;
if (result.__isset.db_id) {
ctx->db_id = result.db_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2361,6 +2361,7 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS);
add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE);
add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS);
add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES);
add(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE);
Expand Down Expand Up @@ -2474,6 +2475,7 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY)
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -3900,6 +3900,12 @@ private static void addOlapTablePropertyInfo(OlapTable olapTable, StringBuilder
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES).append("\" = \"");
sb.append(olapTable.getGroupCommitDataBytes()).append("\"");

// group commit mode (only show when not off_mode)
if (!olapTable.getGroupCommitMode().equalsIgnoreCase(PropertyAnalyzer.GROUP_COMMIT_MODE_OFF)) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE).append("\" = \"");
sb.append(olapTable.getGroupCommitMode()).append("\"");
}

// enable delete on delete predicate
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && olapTable.getEnableUniqueKeyMergeOnWrite()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,14 @@ public int getGroupCommitDataBytes() {
return getOrCreatTableProperty().getGroupCommitDataBytes();
}

public void setGroupCommitMode(String groupCommitMode) {
getOrCreatTableProperty().setGroupCommitMode(groupCommitMode);
}

public String getGroupCommitMode() {
return getOrCreatTableProperty().getGroupCommitMode();
}

public Boolean hasSequenceCol() {
return getSequenceCol() != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,15 @@ public int getGroupCommitDataBytes() {
Integer.toString(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE)));
}

public void setGroupCommitMode(String groupCommitMode) {
properties.put(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE, groupCommitMode);
}

public String getGroupCommitMode() {
return properties.getOrDefault(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE,
PropertyAnalyzer.GROUP_COMMIT_MODE_OFF);
}

public void setRowStoreColumns(List<String> rowStoreColumns) {
if (rowStoreColumns != null && !rowStoreColumns.isEmpty()) {
modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
throws UserException {
final Set<String> allowedProps = new HashSet<String>() {
{
add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE);
add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS);
add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES);
add(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS);
Expand Down Expand Up @@ -152,9 +153,23 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
}
param.ttlSeconds = ttlSeconds;
param.type = UpdatePartitionMetaParam.TabletMetaType.TTL_SECONDS;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE)) {
String groupCommitMode = PropertyAnalyzer.analyzeGroupCommitMode(properties, false);
olapTable.readLock();
try {
if (groupCommitMode.equalsIgnoreCase(olapTable.getGroupCommitMode())) {
LOG.info("groupCommitMode:{} is equal with olapTable.groupCommitMode():{}",
groupCommitMode, olapTable.getGroupCommitMode());
return;
}
partitions.addAll(olapTable.getPartitions());
} finally {
olapTable.readUnlock();
}
param.groupCommitMode = groupCommitMode;
param.type = UpdatePartitionMetaParam.TabletMetaType.GROUP_COMMIT_MODE;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)) {
long groupCommitIntervalMs = Long.parseLong(properties.get(PropertyAnalyzer
.PROPERTIES_GROUP_COMMIT_INTERVAL_MS));
int groupCommitIntervalMs = PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties, false);
olapTable.readLock();
try {
if (groupCommitIntervalMs == olapTable.getGroupCommitIntervalMs()) {
Expand All @@ -169,8 +184,7 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
param.groupCommitIntervalMs = groupCommitIntervalMs;
param.type = UpdatePartitionMetaParam.TabletMetaType.GROUP_COMMIT_INTERVAL_MS;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)) {
long groupCommitDataBytes = Long.parseLong(properties.get(PropertyAnalyzer
.PROPERTIES_GROUP_COMMIT_DATA_BYTES));
int groupCommitDataBytes = PropertyAnalyzer.analyzeGroupCommitDataBytes(properties, false);
olapTable.readLock();
try {
if (groupCommitDataBytes == olapTable.getGroupCommitDataBytes()) {
Expand Down Expand Up @@ -386,6 +400,7 @@ public enum TabletMetaType {
INMEMORY,
PERSISTENT,
TTL_SECONDS,
GROUP_COMMIT_MODE,
GROUP_COMMIT_INTERVAL_MS,
GROUP_COMMIT_DATA_BYTES,
COMPACTION_POLICY,
Expand All @@ -403,6 +418,7 @@ public enum TabletMetaType {
boolean isPersistent = false;
boolean isInMemory = false;
long ttlSeconds = 0;
String groupCommitMode;
long groupCommitIntervalMs = 0;
long groupCommitDataBytes = 0;
String compactionPolicy;
Expand Down Expand Up @@ -462,6 +478,9 @@ public void updateCloudPartitionMeta(Database db,
case GROUP_COMMIT_DATA_BYTES:
infoBuilder.setGroupCommitDataBytes(param.groupCommitDataBytes);
break;
case GROUP_COMMIT_MODE:
infoBuilder.setGroupCommitMode(param.groupCommitMode);
break;
case COMPACTION_POLICY:
infoBuilder.setCompactionPolicy(param.compactionPolicy);
break;
Expand Down
Loading
Loading