diff --git a/be/src/exprs/function/dictionary_factory.cpp b/be/src/exprs/function/dictionary_factory.cpp index 1c84adaba35b71..de92c24f3f7ed2 100644 --- a/be/src/exprs/function/dictionary_factory.cpp +++ b/be/src/exprs/function/dictionary_factory.cpp @@ -34,27 +34,28 @@ DictionaryFactory::~DictionaryFactory() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); _dict_id_to_dict_map.clear(); _dict_id_to_version_id_map.clear(); + _refreshing_dict_map.clear(); } void DictionaryFactory::get_dictionary_status(std::vector& result, std::vector dict_ids) { - std::shared_lock lc(_mutex); + SharedLockGuard lock(_mutex); if (dict_ids.empty()) { // empty means ALL for (const auto& [dict_id, dict] : _dict_id_to_dict_map) { TDictionaryStatus status; status.__set_dictionary_id(dict_id); - status.__set_version_id(_dict_id_to_version_id_map[dict_id]); + status.__set_version_id(_dict_id_to_version_id_map.at(dict_id)); status.__set_dictionary_memory_size(dict->allocated_bytes()); result.emplace_back(std::move(status)); } } else { for (auto dict_id : dict_ids) { - if (_dict_id_to_dict_map.contains(dict_id)) { + auto dict_iter = _dict_id_to_dict_map.find(dict_id); + if (dict_iter != _dict_id_to_dict_map.end()) { TDictionaryStatus status; status.__set_dictionary_id(dict_id); - status.__set_version_id(_dict_id_to_version_id_map[dict_id]); - status.__set_dictionary_memory_size( - _dict_id_to_dict_map[dict_id]->allocated_bytes()); + status.__set_version_id(_dict_id_to_version_id_map.at(dict_id)); + status.__set_dictionary_memory_size(dict_iter->second->allocated_bytes()); result.emplace_back(std::move(status)); } } diff --git a/be/src/exprs/function/dictionary_factory.h b/be/src/exprs/function/dictionary_factory.h index bab27987bf3450..b26bd9f1fc3440 100644 --- a/be/src/exprs/function/dictionary_factory.h +++ b/be/src/exprs/function/dictionary_factory.h @@ -17,10 +17,9 @@ #include -#include - #include "common/config.h" #include "common/logging.h" +#include "common/thread_safety_annotations.h" #include "exprs/function/dictionary.h" namespace doris { @@ -35,11 +34,16 @@ class DictionaryFactory : private boost::noncopyable { // Returns nullptr if failed std::shared_ptr get(int64_t dict_id, int64_t version_id) { - std::unique_lock lc(_mutex); + SharedLockGuard lock(_mutex); + auto dict_iter = _dict_id_to_dict_map.find(dict_id); + if (dict_iter == _dict_id_to_dict_map.end()) { + return nullptr; + } + auto version_iter = _dict_id_to_version_id_map.find(dict_id); // dict_id and version_id must match - if (_dict_id_to_dict_map.contains(dict_id) && - _dict_id_to_version_id_map[dict_id] == version_id) { - return _dict_id_to_dict_map[dict_id]; + if (version_iter != _dict_id_to_version_id_map.end() && + version_iter->second == version_id) { + return dict_iter->second; } return nullptr; } @@ -48,7 +52,7 @@ class DictionaryFactory : private boost::noncopyable { VLOG_DEBUG << "DictionaryFactory refresh dictionary" << " dict_id: " << dict_id << " version_id: " << version_id << " dict name: " << dict->dict_name(); - std::unique_lock lc(_mutex); + UniqueLock lock(_mutex); dict->_mem_tracker = _mem_tracker; _refreshing_dict_map[dict_id] = std::make_pair(version_id, dict); // Set the mem tracker for the dictionary @@ -58,7 +62,7 @@ class DictionaryFactory : private boost::noncopyable { Status abort_refresh_dict(int64_t dict_id, int64_t version_id) { VLOG_DEBUG << "DictionaryFactory abort refresh dictionary" << " dict_id: " << dict_id << " version_id: " << version_id; - std::unique_lock lc(_mutex); + UniqueLock lock(_mutex); if (!_refreshing_dict_map.contains(dict_id)) { // FE will abort all, including succeed and failed. return Status::OK(); @@ -76,7 +80,7 @@ class DictionaryFactory : private boost::noncopyable { Status commit_refresh_dict(int64_t dict_id, int64_t version_id) { VLOG_DEBUG << "DictionaryFactory commit refresh dictionary" << " dict_id: " << dict_id << " version_id: " << version_id; - std::unique_lock lc(_mutex); + UniqueLock lock(_mutex); if (!_refreshing_dict_map.contains(dict_id)) { return Status::InvalidArgument("Dictionary is not refreshing dict_id: {}", dict_id); } @@ -117,7 +121,7 @@ class DictionaryFactory : private boost::noncopyable { Status delete_dict(int64_t dict_id) { VLOG_DEBUG << "DictionaryFactory delete dictionary, dict_id: " << dict_id; - std::unique_lock lc(_mutex); + UniqueLock lock(_mutex); if (!_dict_id_to_dict_map.contains(dict_id)) { LOG_WARNING("DictionaryFactory Failed to delete dictionary").tag("dict_id", dict_id); return Status::OK(); @@ -136,14 +140,36 @@ class DictionaryFactory : private boost::noncopyable { void get_dictionary_status(std::vector& result, std::vector dict_ids); +#ifdef BE_TEST + bool get_refreshing_version_for_test(int64_t dict_id, int64_t* version_id) { + SharedLockGuard lock(_mutex); + auto iter = _refreshing_dict_map.find(dict_id); + if (iter == _refreshing_dict_map.end()) { + return false; + } + *version_id = iter->second.first; + return true; + } + + bool get_committed_version_for_test(int64_t dict_id, int64_t* version_id) { + SharedLockGuard lock(_mutex); + auto iter = _dict_id_to_version_id_map.find(dict_id); + if (iter == _dict_id_to_version_id_map.end()) { + return false; + } + *version_id = iter->second; + return true; + } +#endif + private: - std::map _dict_id_to_dict_map; - std::map _dict_id_to_version_id_map; + std::map _dict_id_to_dict_map GUARDED_BY(_mutex); + std::map _dict_id_to_version_id_map GUARDED_BY(_mutex); - std::map> - _refreshing_dict_map; // dict_id -> (version_id, dict) + std::map> _refreshing_dict_map + GUARDED_BY(_mutex); // dict_id -> (version_id, dict) - std::shared_mutex _mutex; + AnnotatedSharedMutex _mutex; std::shared_ptr _mem_tracker; }; diff --git a/be/src/exprs/function/regexps.h b/be/src/exprs/function/regexps.h index d521c7d9decd73..e2dd98c66e62ba 100644 --- a/be/src/exprs/function/regexps.h +++ b/be/src/exprs/function/regexps.h @@ -25,13 +25,13 @@ #include #include -#include #include #include #include #include #include "common/exception.h" +#include "common/thread_safety_annotations.h" #include "core/string_ref.h" namespace doris::multiregexps { @@ -73,7 +73,7 @@ class DeferredConstructedRegexps { : constructor(std::move(constructor_)) {} Regexps* get() { - std::lock_guard lock(mutex); + LockGuard lock(mutex); if (regexps) { return &*regexps; } @@ -82,9 +82,9 @@ class DeferredConstructedRegexps { } private: - std::mutex mutex; + AnnotatedMutex mutex; std::function constructor; - std::optional regexps; + std::optional regexps GUARDED_BY(mutex); }; using DeferredConstructedRegexpsPtr = std::shared_ptr; @@ -204,8 +204,8 @@ struct GlobalCacheTable { DeferredConstructedRegexpsPtr regexps; /// value }; - std::mutex mutex; - std::array known_regexps; + AnnotatedMutex mutex; + std::array known_regexps GUARDED_BY(mutex); static size_t getBucketIndexFor(const std::vector patterns, std::optional edit_distance) { @@ -235,7 +235,7 @@ DeferredConstructedRegexpsPtr getOrSet(const std::vector& patterns, size_t bucket_idx = GlobalCacheTable::getBucketIndexFor(str_patterns, edit_distance); /// Lock cache to find compiled regexp for given pattern vector + edit distance. - std::lock_guard lock(pool.mutex); + LockGuard lock(pool.mutex); GlobalCacheTable::Bucket& bucket = pool.known_regexps[bucket_idx]; diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index 453b496929ad84..5d84c75fbefcc2 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -38,6 +38,7 @@ #include "cloud/config.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" +#include "common/thread_safety_annotations.h" #include "cpp/sync_point.h" #include "cpp/token_bucket_rate_limiter.h" #include "io/cache/block_file_cache.h" @@ -61,7 +62,6 @@ #include "util/debug_points.h" namespace doris::io { - bvar::Adder s3_read_counter("cached_remote_reader_s3_read"); bvar::Adder peer_read_counter("cached_remote_reader_peer_read"); bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num"); @@ -122,7 +122,7 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) { if (_is_doris_table && config::enable_read_cache_file_directly) { - std::lock_guard lock(_mtx); + UniqueLock lock(_mtx); DCHECK(file_block->state() == FileBlock::State::DOWNLOADED); file_block->_owned_by_cached_reader = true; _cache_file_readers.emplace(file_block->offset(), std::move(file_block)); @@ -325,7 +325,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* // read directly SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer); size_t need_read_size = bytes_req; - std::shared_lock lock(_mtx); + SharedLockGuard lock(_mtx); if (!_cache_file_readers.empty()) { // find the last offset > offset. auto iter = _cache_file_readers.upper_bound(offset); diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index 3f2e1ceb2e1395..b68db5c53781ad 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -20,11 +20,11 @@ #include #include #include -#include #include #include #include "common/status.h" +#include "common/thread_safety_annotations.h" #include "io/cache/block_file_cache.h" #include "io/cache/file_block.h" #include "io/cache/file_cache_common.h" @@ -58,6 +58,13 @@ class CachedRemoteFileReader final : public FileReader, int64_t mtime() const override { return _remote_file_reader->mtime(); } +#ifdef BE_TEST + size_t cache_file_reader_count_for_test() { + SharedLockGuard lock(_mtx); + return _cache_file_readers.size(); + } +#endif + // Asynchronously prefetch a range of file cache blocks. // This method triggers read file cache in dryrun mode to warm up the cache // without actually reading the data into user buffers. @@ -90,8 +97,8 @@ class CachedRemoteFileReader final : public FileReader, FileReaderSPtr _remote_file_reader; UInt128Wrapper _cache_hash; BlockFileCache* _cache; - std::shared_mutex _mtx; - std::map _cache_file_readers; + AnnotatedSharedMutex _mtx; + std::map _cache_file_readers GUARDED_BY(_mtx); }; } // namespace doris::io diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index 43b9c4ae4cb360..15e562191c4938 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -44,6 +44,7 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "common/thread_safety_annotations.h" #include "cpp/sync_point.h" #include "exec/common/hex.h" #include "io/cache/block_file_cache.h" @@ -94,7 +95,7 @@ std::shared_ptr FDCache::get_file_reader(const AccessKeyAndOffset& k return nullptr; } DCHECK(ExecEnv::GetInstance()); - std::shared_lock rlock(_mtx); + SharedLockGuard rlock(_mtx); if (auto iter = _file_name_to_reader.find(key); iter != _file_name_to_reader.end()) { return iter->second->second; } @@ -106,7 +107,7 @@ void FDCache::insert_file_reader(const AccessKeyAndOffset& key, if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] { return; } - std::lock_guard wlock(_mtx); + LockGuard wlock(_mtx); if (auto iter = _file_name_to_reader.find(key); iter == _file_name_to_reader.end()) { if (config::file_cache_max_file_reader_cache_size == _file_reader_list.size()) { @@ -123,7 +124,7 @@ void FDCache::remove_file_reader(const AccessKeyAndOffset& key) { return; } DCHECK(ExecEnv::GetInstance()); - std::lock_guard wlock(_mtx); + LockGuard wlock(_mtx); if (auto iter = _file_name_to_reader.find(key); iter != _file_name_to_reader.end()) { _file_reader_list.erase(iter->second); _file_name_to_reader.erase(key); @@ -131,12 +132,12 @@ void FDCache::remove_file_reader(const AccessKeyAndOffset& key) { } bool FDCache::contains_file_reader(const AccessKeyAndOffset& key) { - std::shared_lock rlock(_mtx); + SharedLockGuard rlock(_mtx); return _file_name_to_reader.contains(key); } size_t FDCache::file_reader_cache_size() { - std::shared_lock rlock(_mtx); + SharedLockGuard rlock(_mtx); return _file_reader_list.size(); } @@ -188,7 +189,7 @@ Status FSFileCacheStorage::init(BlockFileCache* mgr) { Status FSFileCacheStorage::append(const FileCacheKey& key, const Slice& value) { FileWriter* writer = nullptr; { - std::lock_guard lock(_mtx); + LockGuard lock(_mtx); auto file_writer_map_key = std::make_pair(key.hash, key.offset); if (auto iter = _key_to_writer.find(file_writer_map_key); iter != _key_to_writer.end()) { writer = iter->second.get(); @@ -213,7 +214,7 @@ Status FSFileCacheStorage::append(const FileCacheKey& key, const Slice& value) { Status FSFileCacheStorage::finalize(const FileCacheKey& key, const size_t size) { FileWriterPtr file_writer; { - std::lock_guard lock(_mtx); + LockGuard lock(_mtx); auto file_writer_map_key = std::make_pair(key.hash, key.offset); auto iter = _key_to_writer.find(file_writer_map_key); if (iter == _key_to_writer.end()) { diff --git a/be/src/io/cache/fs_file_cache_storage.h b/be/src/io/cache/fs_file_cache_storage.h index 44b04805419995..ee799335406113 100644 --- a/be/src/io/cache/fs_file_cache_storage.h +++ b/be/src/io/cache/fs_file_cache_storage.h @@ -29,11 +29,11 @@ #include #include #include -#include #include #include #include +#include "common/thread_safety_annotations.h" #include "io/cache/cache_block_meta_store.h" #include "io/cache/file_cache_common.h" #include "io/cache/file_cache_storage.h" @@ -58,10 +58,11 @@ class FDCache { size_t file_reader_cache_size(); private: - std::list>> _file_reader_list; + std::list>> _file_reader_list + GUARDED_BY(_mtx); std::unordered_map - _file_name_to_reader; - mutable std::shared_mutex _mtx; + _file_name_to_reader GUARDED_BY(_mtx); + mutable AnnotatedSharedMutex _mtx; }; class FSFileCacheStorage : public FileCacheStorage { @@ -190,8 +191,9 @@ class FSFileCacheStorage : public FileCacheStorage { std::mutex _leak_cleaner_mutex; const std::shared_ptr& fs = global_local_filesystem(); // TODO(Lchangliang): use a more efficient data structure - std::mutex _mtx; - std::unordered_map _key_to_writer; + AnnotatedMutex _mtx; + std::unordered_map _key_to_writer + GUARDED_BY(_mtx); std::shared_ptr _iterator_dir_retry_cnt; std::shared_ptr> _leak_scan_removed_files; std::unique_ptr _meta_store; diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 1ec5b0a83774cd..33c7fd0d9b190c 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -75,18 +75,21 @@ ObjClientHolder::ObjClientHolder(S3ClientConf conf) : _conf(std::move(conf)) {} ObjClientHolder::~ObjClientHolder() = default; Status ObjClientHolder::init() { - _client = S3ClientFactory::instance().create(_conf); - if (!_client) { + auto client = S3ClientFactory::instance().create(_conf); + if (!client) { return Status::InvalidArgument("failed to init s3 client with conf {}", _conf.to_string()); } + LockGuard lock(_mtx); + _client = std::move(client); + return Status::OK(); } Status ObjClientHolder::reset(const S3ClientConf& conf) { S3ClientConf reset_conf; { - std::shared_lock lock(_mtx); + SharedLockGuard lock(_mtx); if (conf.get_hash() == _conf.get_hash()) { return Status::OK(); // Same conf } @@ -115,7 +118,7 @@ Status ObjClientHolder::reset(const S3ClientConf& conf) { LOG(WARNING) << "reset s3 client with new conf: " << conf.to_string(); { - std::lock_guard lock(_mtx); + LockGuard lock(_mtx); _client = std::move(client); _conf = std::move(reset_conf); } diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index f6efa5053324ff..4235096c535938 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -19,12 +19,12 @@ #include #include -#include #include #include #include #include "common/status.h" +#include "common/thread_safety_annotations.h" #include "io/fs/file_reader_writer_fwd.h" #include "io/fs/path.h" #include "io/fs/remote_file_system.h" @@ -53,7 +53,7 @@ class ObjClientHolder { Status reset(const S3ClientConf& conf); std::shared_ptr get() const { - std::shared_lock lock(_mtx); + SharedLockGuard lock(_mtx); return _client; } @@ -64,9 +64,16 @@ class ObjClientHolder { const S3ClientConf& s3_client_conf() { return _conf; } +#ifdef BE_TEST + void set_client_for_test(std::shared_ptr client) { + LockGuard lock(_mtx); + _client = std::move(client); + } +#endif + private: - mutable std::shared_mutex _mtx; - std::shared_ptr _client; + mutable AnnotatedSharedMutex _mtx; + std::shared_ptr _client GUARDED_BY(_mtx); S3ClientConf _conf; }; diff --git a/be/test/exec/dictionary/dictionary_version_test.cpp b/be/test/exec/dictionary/dictionary_version_test.cpp index c80f4b4db1f8f3..ff9317fa853f27 100644 --- a/be/test/exec/dictionary/dictionary_version_test.cpp +++ b/be/test/exec/dictionary/dictionary_version_test.cpp @@ -29,6 +29,7 @@ namespace doris { TEST(DictionaryVersionTest, refresh_dict) { auto dict_factory = std::make_shared(); + int64_t version_id = 0; auto dict = create_complex_hash_map_dict_from_column( "ip dict", @@ -39,17 +40,21 @@ TEST(DictionaryVersionTest, refresh_dict) { std::make_shared(), ""}, }); EXPECT_TRUE(dict_factory->refresh_dict(1, 1, dict)); - EXPECT_EQ(dict_factory->_refreshing_dict_map[1].first, 1); + EXPECT_TRUE(dict_factory->get_refreshing_version_for_test(1, &version_id)); + EXPECT_EQ(version_id, 1); EXPECT_TRUE(dict_factory->refresh_dict(1, 114, dict)); - EXPECT_EQ(dict_factory->_refreshing_dict_map[1].first, 114); + EXPECT_TRUE(dict_factory->get_refreshing_version_for_test(1, &version_id)); + EXPECT_EQ(version_id, 114); EXPECT_TRUE(dict_factory->refresh_dict(2, 114, dict)); - EXPECT_EQ(dict_factory->_refreshing_dict_map[2].first, 114); + EXPECT_TRUE(dict_factory->get_refreshing_version_for_test(2, &version_id)); + EXPECT_EQ(version_id, 114); } TEST(DictionaryVersionTest, abort_refresh_dict) { auto dict_factory = std::make_shared(); + int64_t version_id = 0; auto dict = create_complex_hash_map_dict_from_column( "ip dict", @@ -60,7 +65,8 @@ TEST(DictionaryVersionTest, abort_refresh_dict) { std::make_shared(), ""}, }); EXPECT_TRUE(dict_factory->refresh_dict(3, 5, dict)); - EXPECT_EQ(dict_factory->_refreshing_dict_map[3].first, 5); + EXPECT_TRUE(dict_factory->get_refreshing_version_for_test(3, &version_id)); + EXPECT_EQ(version_id, 5); { auto status = dict_factory->abort_refresh_dict(2, 5); @@ -76,12 +82,13 @@ TEST(DictionaryVersionTest, abort_refresh_dict) { { auto status = dict_factory->abort_refresh_dict(3, 5); EXPECT_TRUE(status.ok()); - EXPECT_TRUE(!dict_factory->_refreshing_dict_map.contains(3)); + EXPECT_FALSE(dict_factory->get_refreshing_version_for_test(3, &version_id)); } } TEST(DictionaryVersionTest, commit_dict) { auto dict_factory = std::make_shared(); + int64_t version_id = 0; auto dict = create_complex_hash_map_dict_from_column( "ip dict", @@ -92,28 +99,31 @@ TEST(DictionaryVersionTest, commit_dict) { std::make_shared(), ""}, }); EXPECT_TRUE(dict_factory->refresh_dict(3, 5, dict)); - EXPECT_EQ(dict_factory->_refreshing_dict_map[3].first, 5); + EXPECT_TRUE(dict_factory->get_refreshing_version_for_test(3, &version_id)); + EXPECT_EQ(version_id, 5); { auto status = dict_factory->commit_refresh_dict(2, 5); EXPECT_FALSE(status.ok()); std::cout << status.msg() << std::endl; - EXPECT_EQ(dict_factory->_refreshing_dict_map[3].first, 5); + EXPECT_TRUE(dict_factory->get_refreshing_version_for_test(3, &version_id)); + EXPECT_EQ(version_id, 5); } { auto status = dict_factory->commit_refresh_dict(3, 6); EXPECT_FALSE(status.ok()); std::cout << status.msg() << std::endl; - EXPECT_EQ(dict_factory->_refreshing_dict_map[3].first, 5); + EXPECT_TRUE(dict_factory->get_refreshing_version_for_test(3, &version_id)); + EXPECT_EQ(version_id, 5); } { auto status = dict_factory->commit_refresh_dict(3, 5); EXPECT_TRUE(status.ok()); - EXPECT_TRUE(!dict_factory->_refreshing_dict_map.contains(3)); - EXPECT_TRUE(dict_factory->_dict_id_to_dict_map.contains(3)); - EXPECT_EQ(dict_factory->_dict_id_to_version_id_map[3], 5); + EXPECT_FALSE(dict_factory->get_refreshing_version_for_test(3, &version_id)); + EXPECT_TRUE(dict_factory->get_committed_version_for_test(3, &version_id)); + EXPECT_EQ(version_id, 5); } auto dict2 = create_complex_hash_map_dict_from_column( @@ -125,7 +135,8 @@ TEST(DictionaryVersionTest, commit_dict) { std::make_shared(), ""}, }); EXPECT_TRUE(dict_factory->refresh_dict(3, 6, dict)); - EXPECT_TRUE(dict_factory->_refreshing_dict_map.contains(3)); + EXPECT_TRUE(dict_factory->get_refreshing_version_for_test(3, &version_id)); + EXPECT_EQ(version_id, 6); { auto status = dict_factory->commit_refresh_dict(3, 5); @@ -135,9 +146,9 @@ TEST(DictionaryVersionTest, commit_dict) { { auto status = dict_factory->commit_refresh_dict(3, 6); EXPECT_TRUE(status.ok()); - EXPECT_TRUE(!dict_factory->_refreshing_dict_map.contains(3)); - EXPECT_TRUE(dict_factory->_dict_id_to_dict_map.contains(3)); - EXPECT_EQ(dict_factory->_dict_id_to_version_id_map[3], 6); + EXPECT_FALSE(dict_factory->get_refreshing_version_for_test(3, &version_id)); + EXPECT_TRUE(dict_factory->get_committed_version_for_test(3, &version_id)); + EXPECT_EQ(version_id, 6); } auto dict3 = create_complex_hash_map_dict_from_column( @@ -149,7 +160,8 @@ TEST(DictionaryVersionTest, commit_dict) { std::make_shared(), ""}, }); EXPECT_TRUE(dict_factory->refresh_dict(3, 4, dict)); - EXPECT_TRUE(dict_factory->_refreshing_dict_map.contains(3)); + EXPECT_TRUE(dict_factory->get_refreshing_version_for_test(3, &version_id)); + EXPECT_EQ(version_id, 4); { auto status = dict_factory->commit_refresh_dict(3, 4); diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index 544455937a1a0c..7bf3607cdfb963 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -4467,20 +4467,20 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_opt_lock) { FileReaderSPtr local_reader; ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader).ok()); auto reader = CachedRemoteFileReader(local_reader, opts); - EXPECT_EQ(reader._cache_file_readers.size(), 0); + EXPECT_EQ(reader.cache_file_reader_count_for_test(), 0); std::string buffer; buffer.resize(6_mb); IOContext io_ctx; size_t bytes_read {0}; ASSERT_TRUE(reader.read_at(1_mb, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx) .ok()); - EXPECT_EQ(reader._cache_file_readers.size(), 6); + EXPECT_EQ(reader.cache_file_reader_count_for_test(), 6); } { FileReaderSPtr local_reader; ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader).ok()); auto reader = CachedRemoteFileReader(local_reader, opts); - EXPECT_EQ(reader._cache_file_readers.size(), 6); + EXPECT_EQ(reader.cache_file_reader_count_for_test(), 6); std::random_device rd; // a seed source for the random number engine std::mt19937 gen(rd()); // mersenne_twister_engine seeded with rd() std::uniform_int_distribution<> distrib(1_mb, 7_mb); @@ -4521,7 +4521,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_opt_lock) { ASSERT_TRUE(reader.read_at(9_mb, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx) .ok()); EXPECT_EQ(buffer, std::string(10086, '9')); - EXPECT_EQ(reader._cache_file_readers.size(), 7); + EXPECT_EQ(reader.cache_file_reader_count_for_test(), 7); } { FileReaderSPtr local_reader; @@ -4534,7 +4534,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_opt_lock) { ASSERT_TRUE( reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); EXPECT_EQ(buffer, std::string(10086, '0')); - EXPECT_EQ(reader._cache_file_readers.size(), 8); + EXPECT_EQ(reader.cache_file_reader_count_for_test(), 8); } std::this_thread::sleep_for(std::chrono::seconds(1)); if (fs::exists(cache_base_path)) { diff --git a/be/test/io/fs/s3_file_writer_test.cpp b/be/test/io/fs/s3_file_writer_test.cpp index 13f9d12c501197..9954ec151ce6cf 100644 --- a/be/test/io/fs/s3_file_writer_test.cpp +++ b/be/test/io/fs/s3_file_writer_test.cpp @@ -1301,7 +1301,7 @@ create_s3_client(const std::string& path) { std::shared_ptr s3_file_writer(static_cast(file_writer.release())); auto holder = std::make_shared(S3ClientConf {}); auto mock_client = std::make_shared(); - holder->_client = mock_client; + holder->set_client_for_test(mock_client); s3_file_writer->_obj_client = holder; return {mock_client, s3_file_writer}; } @@ -1504,7 +1504,7 @@ TEST_F(S3FileWriterTest, test_empty_file) { EXPECT_TRUE(st.ok()) << st; auto holder = std::make_shared(S3ClientConf {}); auto mock_client = std::make_shared(); - holder->_client = mock_client; + holder->set_client_for_test(mock_client); dynamic_cast(file_writer.get())->_obj_client = holder; auto fs = io::global_local_filesystem(); std::string index_path = "/tmp/empty_index_file_test"; diff --git a/be/test/storage/rowset/beta_rowset_test.cpp b/be/test/storage/rowset/beta_rowset_test.cpp index 07d135702e8e25..d424b297f436af 100644 --- a/be/test/storage/rowset/beta_rowset_test.cpp +++ b/be/test/storage/rowset/beta_rowset_test.cpp @@ -304,7 +304,6 @@ TEST_F(BetaRowsetTest, ReadTest) { ASSERT_TRUE(res.has_value()) << res.error(); auto fs = res.value(); StorageResource storage_resource(fs); - auto& client = fs->client_holder()->_client; // failed to head object { Aws::Auth::AWSCredentials aws_cred("ak", "sk"); @@ -313,7 +312,8 @@ TEST_F(BetaRowsetTest, ReadTest) { aws_cred, aws_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true); - client.reset(new io::S3ObjStorageClient(std::move(s3_client))); + fs->client_holder()->set_client_for_test( + std::make_shared(std::move(s3_client))); rowset.rowset_meta()->set_num_segments(1); rowset.rowset_meta()->set_remote_storage_resource(storage_resource); @@ -327,7 +327,7 @@ TEST_F(BetaRowsetTest, ReadTest) { { Aws::Auth::AWSCredentials aws_cred("ak", "sk"); Aws::Client::ClientConfiguration aws_config; - client.reset(new io::S3ObjStorageClient( + fs->client_holder()->set_client_for_test(std::make_shared( std::make_shared(S3ClientMockGetError()))); rowset.rowset_meta()->set_num_segments(1); @@ -342,7 +342,7 @@ TEST_F(BetaRowsetTest, ReadTest) { { Aws::Auth::AWSCredentials aws_cred("ak", "sk"); Aws::Client::ClientConfiguration aws_config; - client.reset(new io::S3ObjStorageClient( + fs->client_holder()->set_client_for_test(std::make_shared( std::make_shared(S3ClientMockGetErrorData()))); rowset.rowset_meta()->set_num_segments(1);