Skip to content

Commit

Permalink
Merge branch 'master' into unique_modify_key_modify1
Browse files Browse the repository at this point in the history
  • Loading branch information
cjj2010 committed May 13, 2024
2 parents 09db9de + 9731492 commit b06c07d
Show file tree
Hide file tree
Showing 130 changed files with 2,602 additions and 1,291 deletions.
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteReques
RuntimeProfile* profile, const UniqueId& load_id)
: BaseDeltaWriter(req, profile, load_id), _engine(engine) {
_rowset_builder = std::make_unique<CloudRowsetBuilder>(engine, req, profile);
_query_thread_context.init();
_query_thread_context.init_unlocked();
}

CloudDeltaWriter::~CloudDeltaWriter() = default;
Expand Down
24 changes: 21 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ DEFINE_mInt64(s3_write_buffer_size, "5242880");
// Log interval when doing s3 upload task
DEFINE_mInt32(s3_file_writer_log_interval_second, "60");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "4"); // 4MB
DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "1"); // 1MB

//disable shrink memory by default
DEFINE_mBool(enable_shrink_memory, "false");
Expand Down Expand Up @@ -1256,6 +1256,17 @@ DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
// The max ratio for ttl cache's size
DEFINE_mInt64(max_ttl_cache_ratio, "90");
// The maximum jvm heap usage ratio for hdfs write workload
DEFINE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio, "0.5");
// The sleep milliseconds duration when hdfs write exceeds the maximum usage
DEFINE_mInt64(hdfs_jni_write_sleep_milliseconds, "300");
// The max retry times when hdfs write failed
DEFINE_mInt64(hdfs_jni_write_max_retry_time, "3");

// The min thread num for NonBlockCloseThreadPool
DEFINE_Int64(min_nonblock_close_thread_num, "12");
// The max thread num for NonBlockCloseThreadPool
DEFINE_Int64(max_nonblock_close_thread_num, "64");

// clang-format off
#ifdef BE_TEST
Expand Down Expand Up @@ -1726,11 +1737,18 @@ std::vector<std::vector<std::string>> get_config_info() {
std::vector<std::string> _config;
_config.push_back(it.first);

std::string config_val = it.second;
// For compatibility, this PR #32933 change the log dir's config logic,
// and deprecate the `sys_log_dir` config.
if (it.first == "sys_log_dir" && config_val == "") {
config_val = fmt::format("{}/log", std::getenv("DORIS_HOME"));
}

_config.emplace_back(field_it->second.type);
if (0 == strcmp(field_it->second.type, "bool")) {
_config.emplace_back(it.second == "1" ? "true" : "false");
_config.emplace_back(config_val == "1" ? "true" : "false");
} else {
_config.push_back(it.second);
_config.push_back(config_val);
}
_config.emplace_back(field_it->second.valmutable ? "true" : "false");

Expand Down
13 changes: 11 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,6 @@ DECLARE_mInt64(load_error_log_limit_bytes);
// be brpc interface is classified into two categories: light and heavy
// each category has diffrent thread number
// threads to handle heavy api interface, such as transmit_data/transmit_block etc
// Default, if less than or equal 32 core, the following are 128, 128, 10240, 10240 in turn.
// if greater than 32 core, the following are core num * 4, core num * 4, core num * 320, core num * 320 in turn
DECLARE_Int32(brpc_heavy_work_pool_threads);
// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start
DECLARE_Int32(brpc_light_work_pool_threads);
Expand Down Expand Up @@ -1332,6 +1330,17 @@ DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
DECLARE_Int64(num_s3_file_upload_thread_pool_max_thread);
// The max ratio for ttl cache's size
DECLARE_mInt64(max_ttl_cache_ratio);
// The maximum jvm heap usage ratio for hdfs write workload
DECLARE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio);
// The sleep milliseconds duration when hdfs write exceeds the maximum usage
DECLARE_mInt64(hdfs_jni_write_sleep_milliseconds);
// The max retry times when hdfs write failed
DECLARE_mInt64(hdfs_jni_write_max_retry_time);

// The min thread num for NonBlockCloseThreadPool
DECLARE_Int64(min_nonblock_close_thread_num);
// The max thread num for NonBlockCloseThreadPool
DECLARE_Int64(max_nonblock_close_thread_num);

#ifdef BE_TEST
// test s3
Expand Down
7 changes: 3 additions & 4 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ void Daemon::memory_gc_thread() {
}
}

void Daemon::memtable_memory_limiter_tracker_refresh_thread() {
void Daemon::memtable_memory_refresh_thread() {
// Refresh the memory statistics of the load channel tracker more frequently,
// which helps to accurately control the memory of LoadChannelMgr.
while (!_stop_background_threads_latch.wait_for(
Expand Down Expand Up @@ -407,9 +407,8 @@ void Daemon::start() {
&_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "memtable_memory_limiter_tracker_refresh_thread",
[this]() { this->memtable_memory_limiter_tracker_refresh_thread(); },
&_threads.emplace_back());
"Daemon", "memtable_memory_refresh_thread",
[this]() { this->memtable_memory_refresh_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;

if (config::enable_metric_calculator) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Daemon {
void tcmalloc_gc_thread();
void memory_maintenance_thread();
void memory_gc_thread();
void memtable_memory_limiter_tracker_refresh_thread();
void memtable_memory_refresh_thread();
void calculate_metrics_thread();
void je_purge_dirty_pages_thread() const;
void report_runtime_query_statistics_thread();
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/broker_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ Status BrokerFileSystem::download_impl(const Path& remote_file, const Path& loca
RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len}));
} // file_handler should be closed before calculating checksum

return Status::OK();
return local_writer->close();
}

std::string BrokerFileSystem::error_msg(const std::string& err) const {
Expand Down
30 changes: 22 additions & 8 deletions be/src/io/fs/broker_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "common/config.h"
#include "common/logging.h"
#include "io/fs/file_writer.h"
#include "runtime/broker_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -60,12 +61,29 @@ inline const std::string& client_id(ExecEnv* env, const TNetworkAddress& addr) {
}
#endif

Status BrokerFileWriter::close() {
if (_closed) {
Status BrokerFileWriter::close(bool non_block) {
if (_close_state == FileWriterState::CLOSED) {
return Status::InternalError("BrokerFileWriter already closed, file path {}",
_path.native());
}
if (_close_state == FileWriterState::ASYNC_CLOSING) {
if (non_block) {
return Status::InternalError("Don't submit async close multi times");
}
// Actucally the first time call to close(true) would return the value of _finalize, if it returned one
// error status then the code would never call the second close(true)
_close_state = FileWriterState::CLOSED;
return Status::OK();
}
_closed = true;
if (non_block) {
_close_state = FileWriterState::ASYNC_CLOSING;
} else {
_close_state = FileWriterState::CLOSED;
}
return _close_impl();
}

Status BrokerFileWriter::_close_impl() {
TBrokerCloseWriterRequest request;
request.__set_version(TBrokerVersion::VERSION_ONE);
request.__set_fd(_fd);
Expand Down Expand Up @@ -117,7 +135,7 @@ Status BrokerFileWriter::close() {
}

Status BrokerFileWriter::appendv(const Slice* data, size_t data_cnt) {
if (_closed) [[unlikely]] {
if (_close_state != FileWriterState::OPEN) [[unlikely]] {
return Status::InternalError("append to closed file: {}", _path.native());
}

Expand All @@ -135,10 +153,6 @@ Status BrokerFileWriter::appendv(const Slice* data, size_t data_cnt) {
return Status::OK();
}

Status BrokerFileWriter::finalize() {
return Status::OK();
}

Result<FileWriterPtr> BrokerFileWriter::create(ExecEnv* env, const TNetworkAddress& broker_address,
const std::map<std::string, std::string>& properties,
Path path) {
Expand Down
8 changes: 4 additions & 4 deletions be/src/io/fs/broker_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,25 @@ class BrokerFileWriter final : public FileWriter {

BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, Path path, TBrokerFD fd);
~BrokerFileWriter() override;
Status close(bool non_block = false) override;

Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
const Path& path() const override { return _path; }
size_t bytes_appended() const override { return _cur_offset; }
bool closed() const override { return _closed; }
FileWriterState closed() const override { return _close_state; }
FileCacheAllocatorBuilder* cache_builder() const override { return nullptr; }

private:
Status _write(const uint8_t* buf, size_t buf_len, size_t* written_bytes);
Status _close_impl();

private:
ExecEnv* _env = nullptr;
const TNetworkAddress _address;
Path _path;
size_t _cur_offset = 0;
TBrokerFD _fd;
bool _closed = false;
FileWriterState _close_state {FileWriterState::OPEN};
};

} // end namespace io
Expand Down
20 changes: 14 additions & 6 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <future>
#include <memory>

#include "common/status.h"
Expand Down Expand Up @@ -46,6 +47,17 @@ struct FileWriterOptions {
uint64_t file_cache_expiration = 0; // Absolute time
};

struct AsyncCloseStatusPack {
std::promise<Status> promise;
std::future<Status> future;
};

enum class FileWriterState : uint8_t {
OPEN = 0,
ASYNC_CLOSING,
CLOSED,
};

class FileWriter {
public:
FileWriter() = default;
Expand All @@ -56,21 +68,17 @@ class FileWriter {

// Normal close. Wait for all data to persist before returning.
// If there is no data appended, an empty file will be persisted.
virtual Status close() = 0;
virtual Status close(bool non_block = false) = 0;

Status append(const Slice& data) { return appendv(&data, 1); }

virtual Status appendv(const Slice* data, size_t data_cnt) = 0;

// Call this method when there is no more data to write.
// FIXME(cyx): Does not seem to be an appropriate interface for file system?
virtual Status finalize() = 0;

virtual const Path& path() const = 0;

virtual size_t bytes_appended() const = 0;

virtual bool closed() const = 0;
virtual FileWriterState closed() const = 0;

virtual FileCacheAllocatorBuilder* cache_builder() const = 0;
};
Expand Down
Loading

0 comments on commit b06c07d

Please sign in to comment.