Skip to content

Commit

Permalink
Merge pull request #56961 from nickitat/some_questionable_code
Browse files Browse the repository at this point in the history
Bypass `Poco::BasicBufferedStreamBuf` in `ReadBufferFromIStream`
  • Loading branch information
nickitat committed Jan 22, 2024
2 parents 0f0785c + b5e333d commit 4d6ef8e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 113 deletions.
7 changes: 7 additions & 0 deletions base/poco/Foundation/include/Poco/BufferedStreamBuf.h
Expand Up @@ -26,6 +26,11 @@
#include "Poco/StreamUtil.h"


namespace DB
{
class ReadBufferFromIStream;
}

namespace Poco
{

Expand Down Expand Up @@ -120,6 +125,8 @@ class BasicBufferedStreamBuf : public std::basic_streambuf<ch, tr>
openmode getMode() const { return _mode; }

private:
friend class DB::ReadBufferFromIStream;

virtual int readFromDevice(char_type * /*buffer*/, std::streamsize /*length*/) { return 0; }

virtual int writeToDevice(const char_type * /*buffer*/, std::streamsize /*length*/) { return 0; }
Expand Down
51 changes: 32 additions & 19 deletions src/IO/ReadBufferFromIStream.cpp
Expand Up @@ -12,33 +12,46 @@ namespace ErrorCodes

bool ReadBufferFromIStream::nextImpl()
{
istr.read(internal_buffer.begin(), internal_buffer.size());
size_t gcount = istr.gcount();
if (eof)
return false;

if (!gcount)
{
if (istr.eof())
return false;

if (istr.fail())
throw Exception(ErrorCodes::CANNOT_READ_FROM_ISTREAM, "Cannot read from istream at offset {}", count());
size_t bytes_read = 0;
char * read_to = internal_buffer.begin();

throw Exception(ErrorCodes::CANNOT_READ_FROM_ISTREAM, "Unexpected state of istream at offset {}", count());
/// It is necessary to read in a loop, since socket usually returns only data available at the moment.
while (bytes_read < internal_buffer.size())
{
try
{
const auto bytes_read_last_time = stream_buf.readFromDevice(read_to, internal_buffer.size() - bytes_read);
if (bytes_read_last_time <= 0)
{
eof = true;
break;
}

bytes_read += bytes_read_last_time;
read_to += bytes_read_last_time;
}
catch (...)
{
throw Exception(
ErrorCodes::CANNOT_READ_FROM_ISTREAM,
"Cannot read from istream at offset {}: {}",
count(),
getCurrentExceptionMessage(/*with_stacktrace=*/true));
}
}
else
working_buffer.resize(gcount);

return true;
if (bytes_read)
working_buffer.resize(bytes_read);

return bytes_read;
}

ReadBufferFromIStream::ReadBufferFromIStream(std::istream & istr_, size_t size)
: BufferWithOwnMemory<ReadBuffer>(size), istr(istr_)
: BufferWithOwnMemory<ReadBuffer>(size), istr(istr_), stream_buf(dynamic_cast<Poco::Net::HTTPBasicStreamBuf &>(*istr.rdbuf()))
{
/// - badbit will be set if some exception will be throw from ios implementation
/// - failbit can be set when for instance read() reads less data, so we
/// cannot set it, since we are requesting to read more data, then the
/// buffer has now.
istr.exceptions(std::ios::badbit);
}

}
4 changes: 4 additions & 0 deletions src/IO/ReadBufferFromIStream.h
Expand Up @@ -3,6 +3,8 @@
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>

#include <Poco/Net/HTTPBasicStreamBuf.h>


namespace DB
{
Expand All @@ -11,6 +13,8 @@ class ReadBufferFromIStream : public BufferWithOwnMemory<ReadBuffer>
{
private:
std::istream & istr;
Poco::Net::HTTPBasicStreamBuf & stream_buf;
bool eof = false;

bool nextImpl() override;

Expand Down
94 changes: 0 additions & 94 deletions src/IO/tests/gtest_writebuffer_s3.cpp
Expand Up @@ -1185,98 +1185,4 @@ String fillStringWithPattern(String pattern, int n)
return data;
}

TEST_F(WBS3Test, ReadBeyondLastOffset) {
const String remote_file = "ReadBeyondLastOffset";

const String key = "1234567812345678";
const String data = fillStringWithPattern("0123456789", 10);

ReadSettings disk_read_settings;
disk_read_settings.enable_filesystem_cache = false;
disk_read_settings.local_fs_buffer_size = 70;
disk_read_settings.remote_fs_buffer_size = FileEncryption::Header::kSize + 60;

{
/// write encrypted file

FileEncryption::Header header;
header.algorithm = FileEncryption::Algorithm::AES_128_CTR;
header.key_fingerprint = FileEncryption::calculateKeyFingerprint(key);
header.init_vector = FileEncryption::InitVector::random();

auto wbs3 = getWriteBuffer(remote_file);
getAsyncPolicy().setAutoExecute(true);

WriteBufferFromEncryptedFile wb(10, std::move(wbs3), key, header);
wb.write(data.data(), data.size());
wb.finalize();
}

auto reader = std::make_unique<ThreadPoolRemoteFSReader>(1, 1);
std::unique_ptr<ReadBufferFromEncryptedFile> encrypted_read_buffer;

{
/// create encrypted file reader

auto cache_log = std::shared_ptr<FilesystemCacheLog>();
const StoredObjects objects = { StoredObject(remote_file, /* local_path */ "", data.size() + FileEncryption::Header::kSize) };
auto async_read_counters = std::make_shared<AsyncReadCounters>();
auto prefetch_log = std::shared_ptr<FilesystemReadPrefetchesLog>();

auto rb_creator = [this, disk_read_settings] (const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
{
S3Settings::RequestSettings request_settings;
return std::make_unique<ReadBufferFromS3>(
client,
bucket,
path,
"Latest",
request_settings,
disk_read_settings,
/* use_external_buffer */true,
/* offset */0,
read_until_position,
/* restricted_seek */true);
};

auto rb_remote_fs = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(rb_creator),
objects,
disk_read_settings,
cache_log,
true);

auto rb_async = std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(rb_remote_fs), *reader, disk_read_settings, async_read_counters, prefetch_log);

/// read the header from the buffer
/// as a result AsynchronousBoundedReadBuffer consists some data from the file inside working buffer
FileEncryption::Header header;
header.read(*rb_async);

ASSERT_EQ(rb_async->available(), disk_read_settings.remote_fs_buffer_size - FileEncryption::Header::kSize);
ASSERT_EQ(rb_async->getPosition(), FileEncryption::Header::kSize);
ASSERT_EQ(rb_async->getFileOffsetOfBufferEnd(), disk_read_settings.remote_fs_buffer_size);

/// ReadBufferFromEncryptedFile is constructed over a ReadBuffer which was already in use.
/// The 'FileEncryption::Header' has been read from `rb_async`.
/// 'rb_async' will read the data from `rb_async` working buffer
encrypted_read_buffer = std::make_unique<ReadBufferFromEncryptedFile>(
disk_read_settings.local_fs_buffer_size, std::move(rb_async), key, header);
}

/// When header is read, file is read into working buffer till some position. Tn the test the file is read until remote_fs_buffer_size (124) position.
/// Set the right border before that position and make sure that encrypted_read_buffer does not have access to it
ASSERT_GT(disk_read_settings.remote_fs_buffer_size, 50);
encrypted_read_buffer->setReadUntilPosition(50);

/// encrypted_read_buffer reads the data with buffer size `local_fs_buffer_size`
/// If the impl file has read the data beyond the ReadUntilPosition, encrypted_read_buffer does not read it
/// getFileOffsetOfBufferEnd should read data till `ReadUntilPosition`
String res;
readStringUntilEOF(res, *encrypted_read_buffer);
ASSERT_EQ(res, data.substr(0, 50));
ASSERT_TRUE(encrypted_read_buffer->eof());
}

#endif

0 comments on commit 4d6ef8e

Please sign in to comment.