Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,13 @@ struct PAIMON_EXPORT Options {
/// "lookup.cache.high-priority-pool-ratio" - The fraction of cache memory that is reserved for
/// high-priority data like index, filter. Default value is 0.25.
static const char LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO[];
/// "lookup.cache-file-retention" - The cached files retention time for lookup.
/// After the file expires, if there is a need for access, it will be re-read from the DFS
/// to build an index on the local disk. Default value is 1 hour.
static const char LOOKUP_CACHE_FILE_RETENTION[];
/// "lookup.cache-max-disk-size" - Max disk size for lookup cache, you can use this option
/// to limit the use of local disks. Default value is unlimited (INT64_MAX).
static const char LOOKUP_CACHE_MAX_DISK_SIZE[];
};

static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ set(PAIMON_CORE_SRCS
core/mergetree/merge_tree_writer.cpp
core/mergetree/write_buffer.cpp
core/mergetree/levels.cpp
core/mergetree/lookup_file.cpp
core/mergetree/lookup_levels.cpp
core/mergetree/lookup/remote_lookup_file_manager.cpp
core/migrate/file_meta_utils.cpp
Expand Down Expand Up @@ -479,6 +480,7 @@ if(PAIMON_BUILD_TESTS)
common/utils/uuid_test.cpp
common/utils/decimal_utils_test.cpp
common/utils/threadsafe_queue_test.cpp
common/utils/generic_lru_cache_test.cpp
STATIC_LINK_LIBS
paimon_shared
test_utils_static
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,7 @@ const char Options::LOOKUP_COMPACT[] = "lookup-compact";
const char Options::LOOKUP_COMPACT_MAX_INTERVAL[] = "lookup-compact.max-interval";
const char Options::LOOKUP_CACHE_MAX_MEMORY_SIZE[] = "lookup.cache-max-memory-size";
const char Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO[] = "lookup.cache.high-priority-pool-ratio";
const char Options::LOOKUP_CACHE_FILE_RETENTION[] = "lookup.cache-file-retention";
const char Options::LOOKUP_CACHE_MAX_DISK_SIZE[] = "lookup.cache-max-disk-size";

} // namespace paimon
11 changes: 9 additions & 2 deletions src/paimon/common/io/cache/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class Cache {
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
supplier) = 0;

virtual void Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) = 0;
virtual Status Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) = 0;

virtual void Invalidate(const std::shared_ptr<CacheKey>& key) = 0;

Expand All @@ -65,6 +65,13 @@ class CacheValue {
}
}

bool operator==(const CacheValue& other) const {
if (this == &other) {
return true;
}
return segment_ == other.segment_;
Comment thread
lxy-9602 marked this conversation as resolved.
}

private:
MemorySegment segment_;
CacheCallback callback_;
Expand Down
125 changes: 24 additions & 101 deletions src/paimon/common/io/cache/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,128 +18,51 @@

namespace paimon {

LruCache::LruCache(int64_t max_weight) : max_weight_(max_weight), current_weight_(0) {}
LruCache::LruCache(int64_t max_weight)
: inner_cache_(InnerCache::Options{
.max_weight = max_weight,
.expire_after_access_ms = -1,
.weigh_func = [](const std::shared_ptr<CacheKey>& /*key*/,
const std::shared_ptr<CacheValue>& value) -> int64_t {
return value ? value->GetSegment().Size() : 0;
},
.removal_callback =
[](const std::shared_ptr<CacheKey>& key, const std::shared_ptr<CacheValue>& value,
auto cause) {
if (value) {
value->OnEvict(key);
}
}}) {}

Result<std::shared_ptr<CacheValue>> LruCache::Get(
const std::shared_ptr<CacheKey>& key,
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)> supplier) {
{
std::unique_lock<std::shared_mutex> write_lock(mutex_);
auto cached = FindAndPromote(key);
if (cached) {
return cached.value();
}
}
// Cache miss: load via supplier (outside lock)
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<CacheValue> value, supplier(key));
if (GetWeight(value) > max_weight_) {
return value;
}

std::unique_lock<std::shared_mutex> write_lock(mutex_);
// Another thread may have inserted the key while we were loading
auto cached = FindAndPromote(key);
if (cached) {
return cached.value();
}

Insert(key, value);
EvictIfNeeded();
return value;
return inner_cache_.Get(key, std::move(supplier));
}

void LruCache::Put(const std::shared_ptr<CacheKey>& key, const std::shared_ptr<CacheValue>& value) {
if (GetWeight(value) > max_weight_) {
return;
}
std::unique_lock<std::shared_mutex> write_lock(mutex_);

auto it = lru_map_.find(key);
if (it != lru_map_.end()) {
// Update existing entry: adjust weight
current_weight_ -= GetWeight(it->second->second);
it->second->second = value;
current_weight_ += GetWeight(value);
lru_list_.splice(lru_list_.begin(), lru_list_, it->second);
} else {
Insert(key, value);
}

EvictIfNeeded();
Status LruCache::Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) {
return inner_cache_.Put(key, value);
}

void LruCache::Invalidate(const std::shared_ptr<CacheKey>& key) {
std::unique_lock<std::shared_mutex> write_lock(mutex_);

auto it = lru_map_.find(key);
if (it != lru_map_.end()) {
RemoveEntry(it->second);
}
inner_cache_.Invalidate(key);
}

void LruCache::InvalidateAll() {
std::unique_lock<std::shared_mutex> write_lock(mutex_);

while (!lru_list_.empty()) {
RemoveEntry(std::prev(lru_list_.end()));
}
current_weight_ = 0;
inner_cache_.InvalidateAll();
}

size_t LruCache::Size() const {
std::shared_lock<std::shared_mutex> read_lock(mutex_);
return lru_map_.size();
return inner_cache_.Size();
}

int64_t LruCache::GetCurrentWeight() const {
std::shared_lock<std::shared_mutex> read_lock(mutex_);
return current_weight_;
return inner_cache_.GetCurrentWeight();
}

int64_t LruCache::GetMaxWeight() const {
return max_weight_;
}

std::optional<std::shared_ptr<CacheValue>> LruCache::FindAndPromote(
const std::shared_ptr<CacheKey>& key) {
auto it = lru_map_.find(key);
if (it != lru_map_.end()) {
lru_list_.splice(lru_list_.begin(), lru_list_, it->second);
return it->second->second;
}
return std::nullopt;
return inner_cache_.GetMaxWeight();
}

void LruCache::Insert(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) {
// Insert at front of LRU list
lru_list_.emplace_front(key, value);
lru_map_[key] = lru_list_.begin();
current_weight_ += GetWeight(value);
}

void LruCache::RemoveEntry(LruList::iterator list_it) {
auto entry_key = list_it->first;
auto entry_value = list_it->second;
current_weight_ -= GetWeight(entry_value);
lru_map_.erase(entry_key);
lru_list_.erase(list_it);

if (entry_value) {
entry_value->OnEvict(entry_key);
}
}

void LruCache::EvictIfNeeded() {
while (current_weight_ > max_weight_ && !lru_list_.empty()) {
RemoveEntry(std::prev(lru_list_.end()));
}
}

int64_t LruCache::GetWeight(const std::shared_ptr<CacheValue>& value) {
if (!value) {
return 0;
}
return value->GetSegment().Size();
}
} // namespace paimon
49 changes: 15 additions & 34 deletions src/paimon/common/io/cache/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,25 @@
*/

#pragma once

#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <utility>

#include "paimon/common/io/cache/cache.h"
#include "paimon/common/io/cache/cache_key.h"
#include "paimon/common/memory/memory_segment.h"
#include "paimon/common/utils/generic_lru_cache.h"
#include "paimon/result.h"

namespace paimon {
/// LRU Cache implementation with weight-based eviction.
/// Uses std::list + unordered_map for O(1) get/put/evict:
/// list stores entries in LRU order (most recently used at front)
/// map stores key -> list::iterator for O(1) lookup
/// capacity is measured in bytes (sum of MemorySegment sizes)
/// when an entry is evicted, its CacheCallback is invoked to notify the upper layer
/// @note Thread-safe: all public methods are protected by mutex (read-write lock).

/// LRU Cache implementation with weight-based eviction for block cache.
///
/// Wraps GenericLruCache with CacheKey/CacheValue types. Capacity is measured
/// in bytes (sum of MemorySegment sizes). When an entry is evicted, its
/// CacheCallback is invoked to notify the upper layer.
///
/// @note Thread-safe: all public methods are protected by the underlying GenericLruCache lock.
class LruCache : public Cache {
public:
explicit LruCache(int64_t max_weight);
Expand All @@ -48,8 +43,8 @@ class LruCache : public Cache {
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
supplier) override;

void Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) override;
Status Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) override;

void Invalidate(const std::shared_ptr<CacheKey>& key) override;

Expand All @@ -62,24 +57,10 @@ class LruCache : public Cache {
int64_t GetMaxWeight() const;

private:
using LruEntry = std::pair<std::shared_ptr<CacheKey>, std::shared_ptr<CacheValue>>;
using LruList = std::list<LruEntry>;
using LruMap = std::unordered_map<std::shared_ptr<CacheKey>, LruList::iterator, CacheKeyHash,
CacheKeyEqual>;

std::optional<std::shared_ptr<CacheValue>> FindAndPromote(const std::shared_ptr<CacheKey>& key);
void Insert(const std::shared_ptr<CacheKey>& key, const std::shared_ptr<CacheValue>& value);
void RemoveEntry(LruList::iterator list_it);

void EvictIfNeeded();

static int64_t GetWeight(const std::shared_ptr<CacheValue>& value);
using InnerCache = GenericLruCache<std::shared_ptr<CacheKey>, std::shared_ptr<CacheValue>,
CacheKeyHash, CacheKeyEqual>;

int64_t max_weight_;
int64_t current_weight_;
LruList lru_list_;
LruMap lru_map_;
mutable std::shared_mutex mutex_;
InnerCache inner_cache_;
};

} // namespace paimon
Loading
Loading