diff --git a/be/src/io/fs/file_handle_cache.cpp b/be/src/io/fs/file_handle_cache.cpp index fbb904e3473295..41617ba10159fc 100644 --- a/be/src/io/fs/file_handle_cache.cpp +++ b/be/src/io/fs/file_handle_cache.cpp @@ -21,6 +21,7 @@ #include "io/fs/file_handle_cache.h" +#include #include #include @@ -100,7 +101,7 @@ FileHandleCache::Accessor::~Accessor() { #ifdef USE_HADOOP_HDFS if (hdfsUnbufferFile(get()->file()) != 0) { VLOG_FILE << "FS does not support file handle unbuffering, closing file=" - << _cache_accessor.get_key()->first; + << _cache_accessor.get_key()->second.first; destroy(); } else { // Calling explicit release to handle metrics @@ -148,11 +149,13 @@ Status FileHandleCache::get_file_handle(const hdfsFS& fs, const std::string& fna FileHandleCache::Accessor* accessor, bool* cache_hit) { DCHECK_GE(mtime, 0); // Hash the key and get appropriate partition - int index = - HashUtil::hash(fname.data(), cast_set(fname.size()), 0) % _cache_partitions.size(); + uintptr_t fs_identity = reinterpret_cast(fs); + uint32_t seed = HashUtil::hash(&fs_identity, sizeof(fs_identity), 0); + int index = HashUtil::hash(fname.data(), cast_set(fname.size()), seed) % + _cache_partitions.size(); FileHandleCachePartition& p = _cache_partitions[index]; - auto cache_key = std::make_pair(fname, mtime); + auto cache_key = make_cache_key(fs, fname, mtime); // If this requires a new handle, skip to the creation codepath. Otherwise, // find an unused entry with the same mtime @@ -187,6 +190,15 @@ Status FileHandleCache::get_file_handle(const hdfsFS& fs, const std::string& fna return Status::OK(); } +#ifdef BE_TEST +bool FileHandleCache::same_cache_key_for_test(const hdfsFS& lhs_fs, const std::string& lhs_fname, + int64_t lhs_mtime, const hdfsFS& rhs_fs, + const std::string& rhs_fname, int64_t rhs_mtime) { + return make_cache_key(lhs_fs, lhs_fname, lhs_mtime) == + make_cache_key(rhs_fs, rhs_fname, rhs_mtime); +} +#endif + void FileHandleCache::_evict_handles_loop() { while (!_is_shut_down.load()) { if (_unused_handle_timeout_secs) { diff --git a/be/src/io/fs/file_handle_cache.h b/be/src/io/fs/file_handle_cache.h index 057bdefc61da77..ce3c708ba99242 100644 --- a/be/src/io/fs/file_handle_cache.h +++ b/be/src/io/fs/file_handle_cache.h @@ -22,9 +22,12 @@ #pragma once #include +#include #include #include #include +#include +#include #include "common/status.h" #include "io/fs/file_system.h" @@ -111,13 +114,15 @@ class ExclusiveHdfsFileHandle : public HdfsFileHandle { /// mtime is older than the file's current mtime. class FileHandleCache { private: + using CacheKey = std::pair>; + /// Each partition operates independently, and thus has its own thread-safe cache. /// To avoid contention on the lock_ due to false sharing the partitions are /// aligned to cache line boundaries. struct FileHandleCachePartition : public CacheLineAligned { - // Cache key is a pair of filename and mtime - // Using std::pair to spare boilerplate of hash function - typedef LruMultiCache, CachedHdfsFileHandle> CacheType; + // The same HDFS path can be opened through different hdfsFS instances with + // different authentication contexts, so the filesystem handle is part of the key. + typedef LruMultiCache CacheType; CacheType cache; }; @@ -176,7 +181,17 @@ class FileHandleCache { int64_t file_size, bool require_new_handle, Accessor* accessor, bool* cache_hit) WARN_UNUSED_RESULT; +#ifdef BE_TEST + static bool same_cache_key_for_test(const hdfsFS& lhs_fs, const std::string& lhs_fname, + int64_t lhs_mtime, const hdfsFS& rhs_fs, + const std::string& rhs_fname, int64_t rhs_mtime); +#endif + private: + static CacheKey make_cache_key(const hdfsFS& fs, const std::string& fname, int64_t mtime) { + return {fs, {fname, mtime}}; + } + /// Periodic check to evict unused file handles. Only executed by _eviction_thread. void _evict_handles_loop(); diff --git a/be/test/io/fs/file_handle_cache_test.cpp b/be/test/io/fs/file_handle_cache_test.cpp new file mode 100644 index 00000000000000..5c1f7d1d9e05e8 --- /dev/null +++ b/be/test/io/fs/file_handle_cache_test.cpp @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/file_handle_cache.h" + +#include + +#include +#include + +namespace doris::io { + +TEST(FileHandleCacheTest, CacheKeyIncludesHdfsFs) { + auto first_fs = reinterpret_cast(static_cast(0x1)); + auto second_fs = reinterpret_cast(static_cast(0x2)); + const std::string fname = "/user/hive/warehouse/table/data.parquet"; + constexpr int64_t mtime = 12345; + + EXPECT_TRUE(FileHandleCache::same_cache_key_for_test(first_fs, fname, mtime, first_fs, fname, + mtime)); + EXPECT_FALSE(FileHandleCache::same_cache_key_for_test(first_fs, fname, mtime, second_fs, fname, + mtime)); + EXPECT_FALSE(FileHandleCache::same_cache_key_for_test(first_fs, fname, mtime, first_fs, + fname + ".other", mtime)); + EXPECT_FALSE(FileHandleCache::same_cache_key_for_test(first_fs, fname, mtime, first_fs, fname, + mtime + 1)); +} + +} // namespace doris::io