Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,8 @@ PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
// to make sure the buffer reader will start to read at right position.
for (int i = 0; i < buffer_num; i++) {
_pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>(
_file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(),
_io_ctx_holder, sync_buffer));
_file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader, _io_ctx_holder,
sync_buffer));
}
}

Expand Down
8 changes: 4 additions & 4 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,12 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public Pro
enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };

PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t whole_buffer_size,
io::FileReader* reader, std::shared_ptr<const IOContext> io_ctx,
io::FileReaderSPtr reader, std::shared_ptr<const IOContext> io_ctx,
std::function<void(PrefetchBuffer&)> sync_profile)
: _file_range(file_range),
_size(buffer_size),
_whole_buffer_size(whole_buffer_size),
_reader(reader),
_reader(std::move(reader)),
_io_ctx_holder(std::move(io_ctx)),
_io_ctx(_io_ctx_holder.get()),
_sync_profile(std::move(sync_profile)) {}
Expand All @@ -445,7 +445,7 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public Pro
_random_access_ranges(other._random_access_ranges),
_size(other._size),
_whole_buffer_size(other._whole_buffer_size),
_reader(other._reader),
_reader(std::move(other._reader)),
_io_ctx_holder(std::move(other._io_ctx_holder)),
_io_ctx(_io_ctx_holder.get()),
_buf(std::move(other._buf)),
Expand All @@ -462,7 +462,7 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public Pro
size_t _size {0};
size_t _len {0};
size_t _whole_buffer_size;
io::FileReader* _reader = nullptr;
io::FileReaderSPtr _reader;
std::shared_ptr<const IOContext> _io_ctx_holder;
const IOContext* _io_ctx = nullptr;
PODArray<char> _buf;
Expand Down
85 changes: 84 additions & 1 deletion be/test/io/fs/buffered_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
#include <gtest/gtest-test-part.h>
#include <limits.h>

#include <atomic>
#include <memory>
#include <ostream>
#include <thread>

#include "common/config.h"
#include "gtest/gtest_pred_impl.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
#include "util/countdown_latch.h"
#include "util/stopwatch.hpp"
#include "util/threadpool.h"

Expand Down Expand Up @@ -102,7 +106,7 @@ class MockOffsetFileReader : public io::FileReader {

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const io::IOContext* io_ctx) override {
const io::IOContext* /*io_ctx*/) override {
if (offset >= _size) {
*bytes_read = 0;
return Status::OK();
Expand All @@ -120,6 +124,52 @@ class MockOffsetFileReader : public io::FileReader {
io::Path _path = "/tmp/mock";
};

class BlockingFileReader : public io::FileReader {
public:
BlockingFileReader(size_t size, CountDownLatch* read_started, CountDownLatch* continue_read,
std::atomic<bool>* destroyed)
: _size(size),
_read_started(read_started),
_continue_read(continue_read),
_destroyed(destroyed) {}

~BlockingFileReader() override { _destroyed->store(true); }

Status close() override {
_closed = true;
return Status::OK();
}

const io::Path& path() const override { return _path; }

size_t size() const override { return _size; }

bool closed() const override { return _closed; }

int64_t mtime() const override { return 0; }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const io::IOContext* /*io_ctx*/) override {
_read_started->count_down();
_continue_read->wait();
if (offset >= _size) {
*bytes_read = 0;
return Status::OK();
}
*bytes_read = std::min(_size - offset, result.size);
return Status::TimedOut("injected prefetch timeout");
}

private:
size_t _size;
CountDownLatch* _read_started;
CountDownLatch* _continue_read;
std::atomic<bool>* _destroyed;
bool _closed = false;
io::Path _path = "/tmp/blocking";
};

class TestingRangeCacheFileReader : public io::FileReader {
public:
TestingRangeCacheFileReader(std::shared_ptr<io::FileReader> delegate) : _delegate(delegate) {};
Expand Down Expand Up @@ -302,6 +352,39 @@ TEST_F(BufferedReaderTest, test_miss) {
EXPECT_EQ(45, bytes_read);
}

TEST_F(BufferedReaderTest, prefetch_task_keeps_reader_alive_after_close_timeout) {
const auto saved_timeout_ms = config::buffered_reader_read_timeout_ms;
config::buffered_reader_read_timeout_ms = 50;

CountDownLatch read_started(1);
CountDownLatch continue_read(1);
std::atomic<bool> reader_destroyed = false;
auto inner_reader = std::make_shared<BlockingFileReader>(1024, &read_started, &continue_read,
&reader_destroyed);
auto weak_reader = std::weak_ptr<io::FileReader>(inner_reader);
{
io::PrefetchBufferedReader reader(nullptr, std::move(inner_reader),
io::PrefetchRange(0, 1024));
uint8_t buf[1];
size_t bytes_read = 0;
Status st = reader.read_at(0, Slice {buf, 1}, &bytes_read);
EXPECT_TRUE(st.is<ErrorCode::TIMEOUT>());
EXPECT_TRUE(read_started.wait_for(std::chrono::seconds(1)));
EXPECT_FALSE(reader_destroyed.load());
}
EXPECT_FALSE(reader_destroyed.load());
EXPECT_FALSE(weak_reader.expired());

continue_read.count_down();
for (int i = 0; i < 100 && !reader_destroyed.load(); ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
EXPECT_TRUE(reader_destroyed.load());
EXPECT_TRUE(weak_reader.expired());

config::buffered_reader_read_timeout_ms = saved_timeout_ms;
}

TEST_F(BufferedReaderTest, test_read_amplify) {
size_t kb = 1024;
io::FileReaderSPtr offset_reader = std::make_shared<MockOffsetFileReader>(2048 * kb); // 2MB
Expand Down
Loading