Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions be/src/exprs/function/dictionary_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,28 @@ DictionaryFactory::~DictionaryFactory() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_dict_id_to_dict_map.clear();
_dict_id_to_version_id_map.clear();
_refreshing_dict_map.clear();
}

void DictionaryFactory::get_dictionary_status(std::vector<TDictionaryStatus>& result,
std::vector<int64_t> dict_ids) {
std::shared_lock lc(_mutex);
SharedLockGuard lock(_mutex);
if (dict_ids.empty()) { // empty means ALL
for (const auto& [dict_id, dict] : _dict_id_to_dict_map) {
TDictionaryStatus status;
status.__set_dictionary_id(dict_id);
status.__set_version_id(_dict_id_to_version_id_map[dict_id]);
status.__set_version_id(_dict_id_to_version_id_map.at(dict_id));
status.__set_dictionary_memory_size(dict->allocated_bytes());
result.emplace_back(std::move(status));
}
} else {
for (auto dict_id : dict_ids) {
if (_dict_id_to_dict_map.contains(dict_id)) {
auto dict_iter = _dict_id_to_dict_map.find(dict_id);
if (dict_iter != _dict_id_to_dict_map.end()) {
TDictionaryStatus status;
status.__set_dictionary_id(dict_id);
status.__set_version_id(_dict_id_to_version_id_map[dict_id]);
status.__set_dictionary_memory_size(
_dict_id_to_dict_map[dict_id]->allocated_bytes());
status.__set_version_id(_dict_id_to_version_id_map.at(dict_id));
status.__set_dictionary_memory_size(dict_iter->second->allocated_bytes());
result.emplace_back(std::move(status));
}
}
Expand Down
56 changes: 41 additions & 15 deletions be/src/exprs/function/dictionary_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

#include <gen_cpp/BackendService_types.h>

#include <mutex>

#include "common/config.h"
#include "common/logging.h"
#include "common/thread_safety_annotations.h"
#include "exprs/function/dictionary.h"

namespace doris {
Expand All @@ -35,11 +34,16 @@ class DictionaryFactory : private boost::noncopyable {

// Returns nullptr if failed
std::shared_ptr<const IDictionary> get(int64_t dict_id, int64_t version_id) {
std::unique_lock lc(_mutex);
SharedLockGuard lock(_mutex);
auto dict_iter = _dict_id_to_dict_map.find(dict_id);
if (dict_iter == _dict_id_to_dict_map.end()) {
return nullptr;
}
auto version_iter = _dict_id_to_version_id_map.find(dict_id);
// dict_id and version_id must match
if (_dict_id_to_dict_map.contains(dict_id) &&
_dict_id_to_version_id_map[dict_id] == version_id) {
return _dict_id_to_dict_map[dict_id];
if (version_iter != _dict_id_to_version_id_map.end() &&
version_iter->second == version_id) {
return dict_iter->second;
}
return nullptr;
}
Expand All @@ -48,7 +52,7 @@ class DictionaryFactory : private boost::noncopyable {
VLOG_DEBUG << "DictionaryFactory refresh dictionary"
<< " dict_id: " << dict_id << " version_id: " << version_id
<< " dict name: " << dict->dict_name();
std::unique_lock lc(_mutex);
UniqueLock lock(_mutex);
dict->_mem_tracker = _mem_tracker;
_refreshing_dict_map[dict_id] = std::make_pair(version_id, dict);
// Set the mem tracker for the dictionary
Expand All @@ -58,7 +62,7 @@ class DictionaryFactory : private boost::noncopyable {
Status abort_refresh_dict(int64_t dict_id, int64_t version_id) {
VLOG_DEBUG << "DictionaryFactory abort refresh dictionary"
<< " dict_id: " << dict_id << " version_id: " << version_id;
std::unique_lock lc(_mutex);
UniqueLock lock(_mutex);
if (!_refreshing_dict_map.contains(dict_id)) {
// FE will abort all, including succeed and failed.
return Status::OK();
Expand All @@ -76,7 +80,7 @@ class DictionaryFactory : private boost::noncopyable {
Status commit_refresh_dict(int64_t dict_id, int64_t version_id) {
VLOG_DEBUG << "DictionaryFactory commit refresh dictionary"
<< " dict_id: " << dict_id << " version_id: " << version_id;
std::unique_lock lc(_mutex);
UniqueLock lock(_mutex);
if (!_refreshing_dict_map.contains(dict_id)) {
return Status::InvalidArgument("Dictionary is not refreshing dict_id: {}", dict_id);
}
Expand Down Expand Up @@ -117,7 +121,7 @@ class DictionaryFactory : private boost::noncopyable {

Status delete_dict(int64_t dict_id) {
VLOG_DEBUG << "DictionaryFactory delete dictionary, dict_id: " << dict_id;
std::unique_lock lc(_mutex);
UniqueLock lock(_mutex);
if (!_dict_id_to_dict_map.contains(dict_id)) {
LOG_WARNING("DictionaryFactory Failed to delete dictionary").tag("dict_id", dict_id);
return Status::OK();
Expand All @@ -136,14 +140,36 @@ class DictionaryFactory : private boost::noncopyable {
void get_dictionary_status(std::vector<TDictionaryStatus>& result,
std::vector<int64_t> dict_ids);

#ifdef BE_TEST
bool get_refreshing_version_for_test(int64_t dict_id, int64_t* version_id) {
SharedLockGuard lock(_mutex);
auto iter = _refreshing_dict_map.find(dict_id);
if (iter == _refreshing_dict_map.end()) {
return false;
}
*version_id = iter->second.first;
return true;
}

bool get_committed_version_for_test(int64_t dict_id, int64_t* version_id) {
SharedLockGuard lock(_mutex);
auto iter = _dict_id_to_version_id_map.find(dict_id);
if (iter == _dict_id_to_version_id_map.end()) {
return false;
}
*version_id = iter->second;
return true;
}
#endif

private:
std::map<int64_t, DictionaryPtr> _dict_id_to_dict_map;
std::map<int64_t, int64_t> _dict_id_to_version_id_map;
std::map<int64_t, DictionaryPtr> _dict_id_to_dict_map GUARDED_BY(_mutex);
std::map<int64_t, int64_t> _dict_id_to_version_id_map GUARDED_BY(_mutex);

std::map<int64_t, std::pair<int64_t, DictionaryPtr>>
_refreshing_dict_map; // dict_id -> (version_id, dict)
std::map<int64_t, std::pair<int64_t, DictionaryPtr>> _refreshing_dict_map
GUARDED_BY(_mutex); // dict_id -> (version_id, dict)

std::shared_mutex _mutex;
AnnotatedSharedMutex _mutex;

std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};
Expand Down
14 changes: 7 additions & 7 deletions be/src/exprs/function/regexps.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@

#include <boost/container_hash/hash.hpp>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <utility>
#include <vector>

#include "common/exception.h"
#include "common/thread_safety_annotations.h"
#include "core/string_ref.h"

namespace doris::multiregexps {
Expand Down Expand Up @@ -73,7 +73,7 @@ class DeferredConstructedRegexps {
: constructor(std::move(constructor_)) {}

Regexps* get() {
std::lock_guard lock(mutex);
LockGuard lock(mutex);
if (regexps) {
return &*regexps;
}
Expand All @@ -82,9 +82,9 @@ class DeferredConstructedRegexps {
}

private:
std::mutex mutex;
AnnotatedMutex mutex;
std::function<Regexps()> constructor;
std::optional<Regexps> regexps;
std::optional<Regexps> regexps GUARDED_BY(mutex);
};

using DeferredConstructedRegexpsPtr = std::shared_ptr<DeferredConstructedRegexps>;
Expand Down Expand Up @@ -204,8 +204,8 @@ struct GlobalCacheTable {
DeferredConstructedRegexpsPtr regexps; /// value
};

std::mutex mutex;
std::array<Bucket, CACHE_SIZE> known_regexps;
AnnotatedMutex mutex;
std::array<Bucket, CACHE_SIZE> known_regexps GUARDED_BY(mutex);

static size_t getBucketIndexFor(const std::vector<String> patterns,
std::optional<UInt32> edit_distance) {
Expand Down Expand Up @@ -235,7 +235,7 @@ DeferredConstructedRegexpsPtr getOrSet(const std::vector<StringRef>& patterns,
size_t bucket_idx = GlobalCacheTable::getBucketIndexFor(str_patterns, edit_distance);

/// Lock cache to find compiled regexp for given pattern vector + edit distance.
std::lock_guard lock(pool.mutex);
LockGuard lock(pool.mutex);

GlobalCacheTable::Bucket& bucket = pool.known_regexps[bucket_idx];

Expand Down
6 changes: 3 additions & 3 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "cloud/config.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/thread_safety_annotations.h"
#include "cpp/sync_point.h"
#include "cpp/token_bucket_rate_limiter.h"
#include "io/cache/block_file_cache.h"
Expand All @@ -61,7 +62,6 @@
#include "util/debug_points.h"

namespace doris::io {

bvar::Adder<uint64_t> s3_read_counter("cached_remote_reader_s3_read");
bvar::Adder<uint64_t> peer_read_counter("cached_remote_reader_peer_read");
bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num");
Expand Down Expand Up @@ -122,7 +122,7 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader

void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) {
if (_is_doris_table && config::enable_read_cache_file_directly) {
std::lock_guard lock(_mtx);
UniqueLock lock(_mtx);
DCHECK(file_block->state() == FileBlock::State::DOWNLOADED);
file_block->_owned_by_cached_reader = true;
_cache_file_readers.emplace(file_block->offset(), std::move(file_block));
Expand Down Expand Up @@ -325,7 +325,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
// read directly
SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer);
size_t need_read_size = bytes_req;
std::shared_lock lock(_mtx);
SharedLockGuard lock(_mtx);
if (!_cache_file_readers.empty()) {
// find the last offset > offset.
auto iter = _cache_file_readers.upper_bound(offset);
Expand Down
13 changes: 10 additions & 3 deletions be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
#include <cstddef>
#include <cstdint>
#include <map>
#include <shared_mutex>
#include <utility>
#include <vector>

#include "common/status.h"
#include "common/thread_safety_annotations.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
Expand Down Expand Up @@ -58,6 +58,13 @@ class CachedRemoteFileReader final : public FileReader,

int64_t mtime() const override { return _remote_file_reader->mtime(); }

#ifdef BE_TEST
size_t cache_file_reader_count_for_test() {
SharedLockGuard lock(_mtx);
return _cache_file_readers.size();
}
#endif

// Asynchronously prefetch a range of file cache blocks.
// This method triggers read file cache in dryrun mode to warm up the cache
// without actually reading the data into user buffers.
Expand Down Expand Up @@ -90,8 +97,8 @@ class CachedRemoteFileReader final : public FileReader,
FileReaderSPtr _remote_file_reader;
UInt128Wrapper _cache_hash;
BlockFileCache* _cache;
std::shared_mutex _mtx;
std::map<size_t, FileBlockSPtr> _cache_file_readers;
AnnotatedSharedMutex _mtx;
std::map<size_t, FileBlockSPtr> _cache_file_readers GUARDED_BY(_mtx);
};

} // namespace doris::io
15 changes: 8 additions & 7 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/thread_safety_annotations.h"
#include "cpp/sync_point.h"
#include "exec/common/hex.h"
#include "io/cache/block_file_cache.h"
Expand Down Expand Up @@ -94,7 +95,7 @@ std::shared_ptr<FileReader> FDCache::get_file_reader(const AccessKeyAndOffset& k
return nullptr;
}
DCHECK(ExecEnv::GetInstance());
std::shared_lock rlock(_mtx);
SharedLockGuard rlock(_mtx);
if (auto iter = _file_name_to_reader.find(key); iter != _file_name_to_reader.end()) {
return iter->second->second;
}
Expand All @@ -106,7 +107,7 @@ void FDCache::insert_file_reader(const AccessKeyAndOffset& key,
if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
return;
}
std::lock_guard wlock(_mtx);
LockGuard wlock(_mtx);

if (auto iter = _file_name_to_reader.find(key); iter == _file_name_to_reader.end()) {
if (config::file_cache_max_file_reader_cache_size == _file_reader_list.size()) {
Expand All @@ -123,20 +124,20 @@ void FDCache::remove_file_reader(const AccessKeyAndOffset& key) {
return;
}
DCHECK(ExecEnv::GetInstance());
std::lock_guard wlock(_mtx);
LockGuard wlock(_mtx);
if (auto iter = _file_name_to_reader.find(key); iter != _file_name_to_reader.end()) {
_file_reader_list.erase(iter->second);
_file_name_to_reader.erase(key);
}
}

bool FDCache::contains_file_reader(const AccessKeyAndOffset& key) {
std::shared_lock rlock(_mtx);
SharedLockGuard rlock(_mtx);
return _file_name_to_reader.contains(key);
}

size_t FDCache::file_reader_cache_size() {
std::shared_lock rlock(_mtx);
SharedLockGuard rlock(_mtx);
return _file_reader_list.size();
}

Expand Down Expand Up @@ -188,7 +189,7 @@ Status FSFileCacheStorage::init(BlockFileCache* mgr) {
Status FSFileCacheStorage::append(const FileCacheKey& key, const Slice& value) {
FileWriter* writer = nullptr;
{
std::lock_guard lock(_mtx);
LockGuard lock(_mtx);
auto file_writer_map_key = std::make_pair(key.hash, key.offset);
if (auto iter = _key_to_writer.find(file_writer_map_key); iter != _key_to_writer.end()) {
writer = iter->second.get();
Expand All @@ -213,7 +214,7 @@ Status FSFileCacheStorage::append(const FileCacheKey& key, const Slice& value) {
Status FSFileCacheStorage::finalize(const FileCacheKey& key, const size_t size) {
FileWriterPtr file_writer;
{
std::lock_guard lock(_mtx);
LockGuard lock(_mtx);
auto file_writer_map_key = std::make_pair(key.hash, key.offset);
auto iter = _key_to_writer.find(file_writer_map_key);
if (iter == _key_to_writer.end()) {
Expand Down
14 changes: 8 additions & 6 deletions be/src/io/cache/fs_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
#include <functional>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <unordered_set>
#include <vector>

#include "common/thread_safety_annotations.h"
#include "io/cache/cache_block_meta_store.h"
#include "io/cache/file_cache_common.h"
#include "io/cache/file_cache_storage.h"
Expand All @@ -58,10 +58,11 @@ class FDCache {
size_t file_reader_cache_size();

private:
std::list<std::pair<AccessKeyAndOffset, std::shared_ptr<FileReader>>> _file_reader_list;
std::list<std::pair<AccessKeyAndOffset, std::shared_ptr<FileReader>>> _file_reader_list
GUARDED_BY(_mtx);
std::unordered_map<AccessKeyAndOffset, decltype(_file_reader_list.begin()), KeyAndOffsetHash>
_file_name_to_reader;
mutable std::shared_mutex _mtx;
_file_name_to_reader GUARDED_BY(_mtx);
mutable AnnotatedSharedMutex _mtx;
};

class FSFileCacheStorage : public FileCacheStorage {
Expand Down Expand Up @@ -190,8 +191,9 @@ class FSFileCacheStorage : public FileCacheStorage {
std::mutex _leak_cleaner_mutex;
const std::shared_ptr<LocalFileSystem>& fs = global_local_filesystem();
// TODO(Lchangliang): use a more efficient data structure
std::mutex _mtx;
std::unordered_map<FileWriterMapKey, FileWriterPtr, FileWriterMapKeyHash> _key_to_writer;
AnnotatedMutex _mtx;
std::unordered_map<FileWriterMapKey, FileWriterPtr, FileWriterMapKeyHash> _key_to_writer
GUARDED_BY(_mtx);
std::shared_ptr<bvar::LatencyRecorder> _iterator_dir_retry_cnt;
std::shared_ptr<bvar::Adder<size_t>> _leak_scan_removed_files;
std::unique_ptr<CacheBlockMetaStore> _meta_store;
Expand Down
Loading
Loading