Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage MergeTree initial support for S3. #9415

Merged
merged 12 commits into from
Mar 10, 2020
15 changes: 5 additions & 10 deletions dbms/src/Compression/CachedCompressedReadBuffer.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#include "CachedCompressedReadBuffer.h"

#include <IO/createReadBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <Compression/CompressionInfo.h>
#include <Compression/LZ4_decompress_faster.h>

#include <utility>


namespace DB
{
Expand All @@ -19,7 +19,7 @@ void CachedCompressedReadBuffer::initInput()
{
if (!file_in)
{
file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, mmap_threshold, buf_size);
file_in = file_in_creator();
compressed_in = file_in.get();

if (profile_callback)
Expand Down Expand Up @@ -71,17 +71,12 @@ bool CachedCompressedReadBuffer::nextImpl()
return true;
}


CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_,
size_t estimated_size_, size_t aio_threshold_, size_t mmap_threshold_,
size_t buf_size_)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), estimated_size(estimated_size_),
aio_threshold(aio_threshold_), mmap_threshold(mmap_threshold_), file_pos(0)
const std::string & path_, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator_, UncompressedCache * cache_)
: ReadBuffer(nullptr, 0), file_in_creator(std::move(file_in_creator_)), cache(cache_), path(path_), file_pos(0)
{
}


void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
if (owned_cell &&
Expand Down
15 changes: 4 additions & 11 deletions dbms/src/Compression/CachedCompressedReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ namespace DB
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
{
private:
const std::string path;
std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator;
UncompressedCache * cache;
size_t buf_size;
size_t estimated_size;
size_t aio_threshold;
size_t mmap_threshold;

std::unique_ptr<ReadBufferFromFileBase> file_in;

const std::string path;
size_t file_pos;

/// A piece of data from the cache, or a piece of read data that we put into the cache.
Expand All @@ -41,11 +38,7 @@ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadB
clockid_t clock_type {};

public:
CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_,
size_t estimated_size_, size_t aio_threshold_, size_t mmap_threshold_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);

CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_);

void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

Expand Down
19 changes: 17 additions & 2 deletions dbms/src/Compression/tests/cached_compressed_read_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Compression/CompressionFactory.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/createReadBufferFromFileBase.h>
#include <IO/copyData.h>

#include <Common/Stopwatch.h>
Expand Down Expand Up @@ -32,7 +33,14 @@ int main(int argc, char ** argv)

{
Stopwatch watch;
CachedCompressedReadBuffer in(path, &cache, 0, 0, 0);
CachedCompressedReadBuffer in(
path,
[&]()
{
return createReadBufferFromFileBase(path, 0, 0, 0);
},
&cache
);
WriteBufferFromFile out("/dev/null");
copyData(in, out);

Expand All @@ -44,7 +52,14 @@ int main(int argc, char ** argv)

{
Stopwatch watch;
CachedCompressedReadBuffer in(path, &cache, 0, 0, 0);
CachedCompressedReadBuffer in(
path,
[&]()
{
return createReadBufferFromFileBase(path, 0, 0, 0);
},
&cache
);
WriteBufferFromFile out("/dev/null");
copyData(in, out);

Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Disks/DiskLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class DiskLocalDirectoryIterator : public IDiskDirectoryIterator
return dir_path + iter.name();
}

String name() const override { return iter.name(); }

private:
String dir_path;
Poco::DirectoryIterator iter;
Expand Down Expand Up @@ -237,6 +239,21 @@ void DiskLocal::removeRecursive(const String & path)
Poco::File(disk_path + path).remove(true);
}

void DiskLocal::listFiles(const String & path, std::vector<String> & file_names)
{
Poco::File(disk_path + path).list(file_names);
}

void DiskLocal::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
Poco::File(disk_path + path).setLastModified(timestamp);
}

Poco::Timestamp DiskLocal::getLastModified(const String & path)
{
return Poco::File(disk_path + path).getLastModified();
}


void DiskLocalReservation::update(UInt64 new_size)
{
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Disks/DiskLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class DiskLocal : public IDisk

void copyFile(const String & from_path, const String & to_path) override;

void listFiles(const String & path, std::vector<String> & file_names) override;

std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
Expand All @@ -85,6 +87,10 @@ class DiskLocal : public IDisk

void removeRecursive(const String & path) override;

void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;

Poco::Timestamp getLastModified(const String & path) override;

private:
bool tryReserve(UInt64 bytes);

Expand Down
21 changes: 15 additions & 6 deletions dbms/src/Disks/DiskMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h>
#include <Poco/Path.h>


namespace DB
Expand All @@ -23,7 +24,7 @@ namespace ErrorCodes
class DiskMemoryDirectoryIterator : public IDiskDirectoryIterator
{
public:
explicit DiskMemoryDirectoryIterator(std::vector<String> && dir_file_paths_)
explicit DiskMemoryDirectoryIterator(std::vector<Poco::Path> && dir_file_paths_)
: dir_file_paths(std::move(dir_file_paths_)), iter(dir_file_paths.begin())
{
}
Expand All @@ -32,11 +33,13 @@ class DiskMemoryDirectoryIterator : public IDiskDirectoryIterator

bool isValid() const override { return iter != dir_file_paths.end(); }

String path() const override { return *iter; }
String path() const override { return (*iter).toString(); }

String name() const override { return (*iter).getFileName(); }

private:
std::vector<String> dir_file_paths;
std::vector<String>::iterator iter;
std::vector<Poco::Path> dir_file_paths;
std::vector<Poco::Path>::iterator iter;
};

/// Adapter with actual behaviour as ReadBufferFromString.
Expand Down Expand Up @@ -264,10 +267,10 @@ DiskDirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path)
if (!path.empty() && files.find(path) == files.end())
throw Exception("Directory '" + path + "' does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);

std::vector<String> dir_file_paths;
std::vector<Poco::Path> dir_file_paths;
for (const auto & file : files)
if (parentPath(file.first) == path)
dir_file_paths.push_back(file.first);
dir_file_paths.emplace_back(file.first);

return std::make_unique<DiskMemoryDirectoryIterator>(std::move(dir_file_paths));
}
Expand Down Expand Up @@ -381,6 +384,12 @@ void DiskMemory::removeRecursive(const String & path)
}
}

void DiskMemory::listFiles(const String & path, std::vector<String> & file_names)
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
file_names.push_back(it->name());
}


using DiskMemoryPtr = std::shared_ptr<DiskMemory>;

Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Disks/DiskMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class DiskMemory : public IDisk

void copyFile(const String & from_path, const String & to_path) override;

void listFiles(const String & path, std::vector<String> & file_names) override;

std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
Expand All @@ -78,6 +80,10 @@ class DiskMemory : public IDisk

void removeRecursive(const String & path) override;

void setLastModified(const String &, const Poco::Timestamp &) override { }

Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp(); }

private:
void createDirectoriesImpl(const String & path);
void replaceFileImpl(const String & from_path, const String & to_path);
Expand Down
65 changes: 49 additions & 16 deletions dbms/src/Disks/DiskS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int FILE_ALREADY_EXISTS;
extern const int PATH_ACCESS_DENIED;
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int UNKNOWN_FORMAT;
}
Expand Down Expand Up @@ -157,19 +156,36 @@ namespace

off_t seek(off_t offset_, int whence) override
{
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);

if (offset_ < 0 || metadata.total_size <= static_cast<UInt64>(offset_))
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset_) + ", Max: " + std::to_string(metadata.total_size),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);

absolute_position = offset_;
if (whence == SEEK_CUR)
{
/// If position within current working buffer - shift pos.
if (working_buffer.size() && size_t(getPosition() + offset_) < absolute_position)
{
pos += offset_;
return getPosition();
}
else
{
absolute_position += offset_;
}
}
else if (whence == SEEK_SET)
{
/// If position within current working buffer - shift pos.
if (working_buffer.size() && size_t(offset_) >= absolute_position - working_buffer.size()
&& size_t(offset_) < absolute_position)
{
pos = working_buffer.end() - (absolute_position - offset_);
return getPosition();
}
else
{
absolute_position = offset_;
}
}
else
throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);

/// TODO: Do not re-initialize buffer if current position within working buffer.
current_buf = initialize();
pos = working_buffer.end();

Expand All @@ -187,8 +203,7 @@ namespace
for (UInt32 i = 0; i < metadata.s3_objects_count; ++i)
{
current_buf_idx = i;
auto path = metadata.s3_objects[i].first;
auto size = metadata.s3_objects[i].second;
auto [path, size] = metadata.s3_objects[i];
if (size > offset)
{
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, path, buf_size);
Expand Down Expand Up @@ -325,6 +340,8 @@ class DiskS3DirectoryIterator : public IDiskDirectoryIterator
return folder_path + iter.name();
}

String name() const override { return iter.name(); }

private:
Poco::DirectoryIterator iter;
String folder_path;
Expand Down Expand Up @@ -547,7 +564,7 @@ void DiskS3::removeRecursive(const String & path)
Poco::File file(metadata_path + path);
if (file.isFile())
{
remove(metadata_path + path);
remove(path);
}
else
{
Expand Down Expand Up @@ -591,6 +608,22 @@ bool DiskS3::tryReserve(UInt64 bytes)
return false;
}

void DiskS3::listFiles(const String & path, std::vector<String> & file_names)
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
file_names.push_back(it->name());
}

void DiskS3::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
Poco::File(metadata_path + path).setLastModified(timestamp);
}

Poco::Timestamp DiskS3::getLastModified(const String & path)
{
return Poco::File(metadata_path + path).getLastModified();
}


DiskS3Reservation::~DiskS3Reservation()
{
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Disks/DiskS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class DiskS3 : public IDisk

void copyFile(const String & from_path, const String & to_path) override;

void listFiles(const String & path, std::vector<String> & file_names) override;

std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
Expand All @@ -85,6 +87,10 @@ class DiskS3 : public IDisk

void removeRecursive(const String & path) override;

void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;

Poco::Timestamp getLastModified(const String & path) override;

private:
String getRandomName() const;

Expand Down