Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,11 @@ struct PAIMON_EXPORT Options {
/// you can add the conf like this: 'file.compression.per.level' = '0:lz4,1:zstd'.
/// If a level is not configured, the default compression set by FILE_COMPRESSION will be used.
static const char FILE_COMPRESSION_PER_LEVEL[];
/// "lookup.cache-max-memory-size" - Max memory size for lookup cache. Default value is 256 mb.
static const char LOOKUP_CACHE_MAX_MEMORY_SIZE[];
/// "lookup.cache.high-priority-pool-ratio" - The fraction of cache memory that is reserved for
/// high-priority data like index, filter. Default value is 0.25.
static const char LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO[];
};

static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();
Expand Down
4 changes: 3 additions & 1 deletion src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ set(PAIMON_COMMON_SRCS
common/io/data_output_stream.cpp
common/io/memory_segment_output_stream.cpp
common/io/offset_input_stream.cpp
common/io/cache/cache.cpp
common/io/cache/cache_key.cpp
common/io/cache/cache_manager.cpp
common/io/cache/lru_cache.cpp
common/logging/logging.cpp
common/lookup/sort/sort_lookup_store_factory.cpp
common/lookup/lookup_store_factory.cpp
Expand Down Expand Up @@ -466,6 +466,7 @@ if(PAIMON_BUILD_TESTS)
common/utils/roaring_bitmap64_test.cpp
common/utils/range_helper_test.cpp
common/utils/read_ahead_cache_test.cpp
common/io/cache/lru_cache_test.cpp
common/utils/byte_range_combiner_test.cpp
common/utils/scope_guard_test.cpp
common/utils/serialization_utils_test.cpp
Expand Down Expand Up @@ -495,6 +496,7 @@ if(PAIMON_BUILD_TESTS)
SOURCES
common/compression/block_compression_factory_test.cpp
common/sst/sst_file_io_test.cpp
common/sst/block_cache_test.cpp
common/utils/crc32c_test.cpp
STATIC_LINK_LIBS
paimon_shared
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,7 @@ const char Options::COMPACTION_FORCE_UP_LEVEL_0[] = "compaction.force-up-level-0
const char Options::LOOKUP_WAIT[] = "lookup-wait";
const char Options::LOOKUP_COMPACT[] = "lookup-compact";
const char Options::LOOKUP_COMPACT_MAX_INTERVAL[] = "lookup-compact.max-interval";
const char Options::LOOKUP_CACHE_MAX_MEMORY_SIZE[] = "lookup.cache-max-memory-size";
const char Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO[] = "lookup.cache.high-priority-pool-ratio";

} // namespace paimon
42 changes: 0 additions & 42 deletions src/paimon/common/io/cache/cache.cpp

This file was deleted.

30 changes: 15 additions & 15 deletions src/paimon/common/io/cache/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
#include "paimon/result.h"

namespace paimon {

class CacheValue;

/// Callback invoked when a cache entry is evicted by the LRU policy.
using CacheCallback = std::function<void(const std::shared_ptr<CacheKey>&)>;

class Cache {
public:
virtual ~Cache() = default;
Expand All @@ -42,31 +46,27 @@ class Cache {

virtual void InvalidateAll() = 0;

virtual CacheKeyMap AsMap() = 0;
};

class NoCache : public Cache {
public:
Result<std::shared_ptr<CacheValue>> Get(
const std::shared_ptr<CacheKey>& key,
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
supplier) override;
void Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) override;
void Invalidate(const std::shared_ptr<CacheKey>& key) override;
void InvalidateAll() override;
CacheKeyMap AsMap() override;
virtual size_t Size() const = 0;
};

class CacheValue {
public:
explicit CacheValue(const MemorySegment& segment) : segment_(segment) {}
CacheValue(const MemorySegment& segment, CacheCallback callback)
: segment_(segment), callback_(std::move(callback)) {}

const MemorySegment& GetSegment() const {
return segment_;
}

/// Invoke the eviction callback, if one was registered.
void OnEvict(const std::shared_ptr<CacheKey>& key) const {
if (callback_) {
callback_(key);
}
}

private:
MemorySegment segment_;
CacheCallback callback_;
};
} // namespace paimon
7 changes: 3 additions & 4 deletions src/paimon/common/io/cache/cache_key.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ class CacheKey {
virtual ~CacheKey() = default;

virtual bool IsIndex() const = 0;
virtual size_t HashCode() const = 0;

virtual bool Equals(const CacheKey& other) const = 0;

virtual size_t HashCode() const = 0;
};

class PositionCacheKey : public CacheKey {
Expand Down Expand Up @@ -76,7 +78,4 @@ struct CacheKeyEqual {
}
};

using CacheKeyMap = std::unordered_map<std::shared_ptr<CacheKey>, std::shared_ptr<CacheValue>,
CacheKeyHash, CacheKeyEqual>;

} // namespace paimon
10 changes: 6 additions & 4 deletions src/paimon/common/io/cache/cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ namespace paimon {

Result<MemorySegment> CacheManager::GetPage(
std::shared_ptr<CacheKey>& key,
std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)> reader) {
std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)> reader,
CacheCallback eviction_callback) {
auto& cache = key->IsIndex() ? index_cache_ : data_cache_;
auto supplier = [&](const std::shared_ptr<CacheKey>& k) -> Result<std::shared_ptr<CacheValue>> {
PAIMON_ASSIGN_OR_RAISE(MemorySegment segment, reader(k));
return std::make_shared<CacheValue>(segment);
auto supplier =
[&](const std::shared_ptr<CacheKey>& key) -> Result<std::shared_ptr<CacheValue>> {
PAIMON_ASSIGN_OR_RAISE(MemorySegment segment, reader(key));
return std::make_shared<CacheValue>(segment, std::move(eviction_callback));
};
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<CacheValue> cache_value, cache->Get(key, supplier));
return cache_value->GetSegment();
Expand Down
53 changes: 48 additions & 5 deletions src/paimon/common/io/cache/cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,67 @@

#include "paimon/common/io/cache/cache.h"
#include "paimon/common/io/cache/cache_key.h"
#include "paimon/common/io/cache/lru_cache.h"
#include "paimon/common/memory/memory_segment.h"
#include "paimon/result.h"

namespace paimon {
class CacheManager {
public:
CacheManager() {
// todo implements cache
data_cache_ = std::make_shared<NoCache>();
index_cache_ = std::make_shared<NoCache>();
/// Refreshing the cache comes with some costs, so not every time we visit the CacheManager, but
/// every 10 visits, refresh the LRU strategy.
static constexpr int32_t REFRESH_COUNT = 10;

/// Container that wraps a MemorySegment with an access counter for refresh.
class SegmentContainer {
public:
explicit SegmentContainer(const MemorySegment& segment) : segment_(segment) {}

const MemorySegment& Access() {
access_count_++;
return segment_;
}

int32_t GetAccessCount() const {
return access_count_;
}

private:
MemorySegment segment_;
int32_t access_count_ = 0;
};

/// Constructs a CacheManager with LRU caching.
/// @param max_memory_bytes Total cache capacity in bytes.
/// @param high_priority_pool_ratio Ratio of capacity reserved for index cache [0.0, 1.0).
/// If 0, index and data share the same cache.
CacheManager(int64_t max_memory_bytes, double high_priority_pool_ratio) {
auto index_cache_bytes = static_cast<int64_t>(max_memory_bytes * high_priority_pool_ratio);
auto data_cache_bytes =
static_cast<int64_t>(max_memory_bytes * (1.0 - high_priority_pool_ratio));
data_cache_ = std::make_shared<LruCache>(data_cache_bytes);
if (high_priority_pool_ratio == 0.0) {
index_cache_ = data_cache_;
} else {
index_cache_ = std::make_shared<LruCache>(index_cache_bytes);
Comment thread
lxy-9602 marked this conversation as resolved.
}
}

Result<MemorySegment> GetPage(
std::shared_ptr<CacheKey>& key,
std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)> reader);
std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)> reader,
CacheCallback eviction_callback);

void InvalidPage(const std::shared_ptr<CacheKey>& key);

const std::shared_ptr<Cache>& DataCache() const {
return data_cache_;
}

const std::shared_ptr<Cache>& IndexCache() const {
return index_cache_;
}

private:
std::shared_ptr<Cache> data_cache_;
std::shared_ptr<Cache> index_cache_;
Expand Down
Loading
Loading