Skip to content

Commit

Permalink
Add support for reading and writing backups as a
Browse files Browse the repository at this point in the history
tar archive using libarchive.
  • Loading branch information
josh-hildred committed Feb 5, 2024
1 parent 556b637 commit d7aa0eb
Show file tree
Hide file tree
Showing 14 changed files with 647 additions and 70 deletions.
2 changes: 1 addition & 1 deletion src/Backups/BackupImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ void BackupImpl::writeFile(const BackupFileInfo & info, BackupEntryPtr entry)

const auto write_info_to_archive = [&](const auto & file_name)
{
auto out = archive_writer->writeFile(file_name);
auto out = archive_writer->writeFile(file_name, info.size);
auto read_buffer = entry->getReadBuffer(writer->getReadSettings());
if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET);
Expand Down
3 changes: 3 additions & 0 deletions src/IO/Archives/IArchiveWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class IArchiveWriter : public std::enable_shared_from_this<IArchiveWriter>, boos
/// of the function `writeFile()` should be destroyed before next call of `writeFile()`.
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) = 0;

virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename, const size_t & size) = 0;


/// Returns true if there is an active instance of WriteBuffer returned by writeFile().
/// This function should be used mostly for debugging purposes.
virtual bool isWritingFile() const = 0;
Expand Down
156 changes: 121 additions & 35 deletions src/IO/Archives/LibArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

#include <IO/Archives/ArchiveUtils.h>

#include <mutex>

namespace DB
{
Expand All @@ -14,35 +13,60 @@ namespace DB

namespace ErrorCodes
{
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int UNSUPPORTED_METHOD;
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int UNSUPPORTED_METHOD;
}

class LibArchiveReader::StreamInfo
{
public:
explicit StreamInfo(std::unique_ptr<SeekableReadBuffer> read_buffer_)
: read_buffer(std::move(read_buffer_))
{
}

static ssize_t read([[maybe_unused]] struct archive * a, void * client_data, const void ** buff)
{
auto * read_stream = reinterpret_cast<StreamInfo *>(client_data);
*buff = reinterpret_cast<void *>(read_stream->buf);
return read_stream->read_buffer->read(read_stream->buf, DBMS_DEFAULT_BUFFER_SIZE);
}

std::unique_ptr<SeekableReadBuffer> read_buffer;
char buf[DBMS_DEFAULT_BUFFER_SIZE];
};

class LibArchiveReader::Handle
{
public:
explicit Handle(std::string path_to_archive_, bool lock_on_reading_)
: path_to_archive(path_to_archive_), lock_on_reading(lock_on_reading_)
: path_to_archive(path_to_archive_), lock_on_reading(lock_on_reading_)
{
current_archive = openWithPath(path_to_archive);
}
explicit Handle(std::string path_to_archive_, bool lock_on_reading_, const ReadArchiveFunction & archive_read_function_)
: path_to_archive(path_to_archive_), archive_read_function(archive_read_function_), lock_on_reading(lock_on_reading_)
{
current_archive = open(path_to_archive);
read_stream = std::make_unique<StreamInfo>(archive_read_function());
current_archive = openWithReader(&(*read_stream));
}

Handle(const Handle &) = delete;
Handle(Handle && other) noexcept
: current_archive(other.current_archive)
, current_entry(other.current_entry)
, read_stream(std::move(other.read_stream))
, archive_read_function(std::move(other.archive_read_function))
, lock_on_reading(other.lock_on_reading)

{
other.current_archive = nullptr;
other.current_entry = nullptr;
}

~Handle()
{
close(current_archive);
}
~Handle() { close(current_archive); }

bool locateFile(const std::string & filename)
{
Expand All @@ -64,10 +88,14 @@ class LibArchiveReader::Handle
break;

if (filter(archive_entry_pathname(current_entry)))
{
valid = true;
return true;
}
}

checkError(err);
valid = false;
return false;
}

Expand All @@ -81,28 +109,40 @@ class LibArchiveReader::Handle
} while (err == ARCHIVE_RETRY);

checkError(err);
return err == ARCHIVE_OK;
valid = err == ARCHIVE_OK;
return valid;
}

std::vector<std::string> getAllFiles(NameFilter filter)
{
auto * archive = open(path_to_archive);
struct archive * archive;
std::unique_ptr<LibArchiveReader::StreamInfo> rs;
if(archive_read_function)
{
read_stream = std::make_unique<StreamInfo>(archive_read_function());
archive = openWithReader(&(*rs));
}
else
{
archive = openWithPath(path_to_archive);
}

SCOPE_EXIT(
close(archive);
);

struct archive_entry * entry = nullptr;

std::vector<std::string> files;
int error = readNextHeader(archive, &entry);
int error = readNextHeader(current_archive, &entry);
while (error == ARCHIVE_OK || error == ARCHIVE_RETRY)
{
chassert(entry != nullptr);
std::string name = archive_entry_pathname(entry);
if (!filter || filter(name))
files.push_back(std::move(name));

error = readNextHeader(archive, &entry);
error = readNextHeader(current_archive, &entry);
}

checkError(error);
Expand All @@ -112,6 +152,8 @@ class LibArchiveReader::Handle
const String & getFileName() const
{
chassert(current_entry);
if (!valid)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file");
if (!file_name)
file_name.emplace(archive_entry_pathname(current_entry));

Expand All @@ -121,6 +163,8 @@ class LibArchiveReader::Handle
const FileInfo & getFileInfo() const
{
chassert(current_entry);
if (!valid)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file");
if (!file_info)
{
file_info.emplace();
Expand All @@ -134,6 +178,8 @@ class LibArchiveReader::Handle

struct archive * current_archive;
struct archive_entry * current_entry = nullptr;
bool valid = true;

private:
void checkError(int error) const
{
Expand All @@ -147,7 +193,16 @@ class LibArchiveReader::Handle
file_info.reset();
}

static struct archive * open(const String & path_to_archive)
static struct archive * openWithReader(StreamInfo * read_stream_)
{
auto * a = archive_read_new();
archive_read_support_filter_all(a);
archive_read_support_format_all(a);
archive_read_open(a, read_stream_, nullptr, StreamInfo::read, nullptr);
return a;
}

static struct archive * openWithPath(const String & path_to_archive)
{
auto * archive = archive_read_new();
try
Expand Down Expand Up @@ -185,6 +240,8 @@ class LibArchiveReader::Handle
}

const String path_to_archive;
std::unique_ptr<StreamInfo> read_stream = nullptr;
const IArchiveReader::ReadArchiveFunction archive_read_function;

/// for some archive types when we are reading headers static variables are used
/// which are not thread-safe
Expand All @@ -198,14 +255,15 @@ class LibArchiveReader::Handle
class LibArchiveReader::FileEnumeratorImpl : public FileEnumerator
{
public:
explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) {}
explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) { }

const String & getFileName() const override { return handle.getFileName(); }
const FileInfo & getFileInfo() const override { return handle.getFileInfo(); }
bool nextFile() override { return handle.nextFile(); }

/// Releases owned handle to pass it to a read buffer.
Handle releaseHandle() && { return std::move(handle); }

private:
Handle handle;
};
Expand All @@ -217,13 +275,13 @@ class LibArchiveReader::ReadBufferFromLibArchive : public ReadBufferFromFileBase
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, handle(std::move(handle_))
, path_to_archive(std::move(path_to_archive_))
{}
{
}

off_t seek(off_t /* off */, int /* whence */) override
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Seek is not supported when reading from archive");
}

bool checkIfActuallySeekable() override { return false; }

off_t getPosition() override
Expand All @@ -236,14 +294,13 @@ class LibArchiveReader::ReadBufferFromLibArchive : public ReadBufferFromFileBase
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getFileOffsetOfBufferEnd is not supported when reading from archive");
}

off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); }

String getFileName() const override { return handle.getFileName(); }

size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; }

Handle releaseHandle() &&
{
return std::move(handle);
}
Handle releaseHandle() && { return std::move(handle); }

private:
bool nextImpl() override
Expand All @@ -270,7 +327,17 @@ class LibArchiveReader::ReadBufferFromLibArchive : public ReadBufferFromFileBase

LibArchiveReader::LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_)
: archive_name(std::move(archive_name_)), lock_on_reading(lock_on_reading_), path_to_archive(std::move(path_to_archive_))
{}
{
}

LibArchiveReader::LibArchiveReader(
std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_, const ReadArchiveFunction & archive_read_function_)
: archive_name(std::move(archive_name_))
, lock_on_reading(lock_on_reading_)
, path_to_archive(std::move(path_to_archive_))
, archive_read_function(archive_read_function_)
{
}

LibArchiveReader::~LibArchiveReader() = default;

Expand All @@ -281,21 +348,21 @@ const std::string & LibArchiveReader::getPath() const

bool LibArchiveReader::fileExists(const String & filename)
{
Handle handle(path_to_archive, lock_on_reading);
Handle handle = acquireHandle();
return handle.locateFile(filename);
}

LibArchiveReader::FileInfo LibArchiveReader::getFileInfo(const String & filename)
{
Handle handle(path_to_archive, lock_on_reading);
Handle handle = acquireHandle();
if (!handle.locateFile(filename))
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: file not found", path_to_archive);
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: File {} was not found in archive", path_to_archive, quoteString(filename));
return handle.getFileInfo();
}

std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::firstFile()
{
Handle handle(path_to_archive, lock_on_reading);
Handle handle = acquireHandle();
if (!handle.nextFile())
return nullptr;

Expand All @@ -304,17 +371,25 @@ std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::firstFile()

std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(const String & filename, bool throw_on_not_found)
{
return readFile([&](const std::string & file) { return file == filename; }, throw_on_not_found);
Handle handle = acquireHandle();
if (!handle.locateFile(filename))
{
if (throw_on_not_found)
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: File {} was not found in archive", path_to_archive, quoteString(filename));
return nullptr;
}
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
}

std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(NameFilter filter, bool throw_on_not_found)
{
Handle handle(path_to_archive, lock_on_reading);
Handle handle = acquireHandle();
if (!handle.locateFile(filter))
{
if (throw_on_not_found)
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: no file found satisfying the filter", path_to_archive);
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: No file satisfying filter in archive", path_to_archive);
return nullptr;
}
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
Expand All @@ -333,7 +408,8 @@ std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::nextFile(std
{
if (!dynamic_cast<ReadBufferFromLibArchive *>(read_buffer.get()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()");
auto read_buffer_from_libarchive = std::unique_ptr<ReadBufferFromLibArchive>(static_cast<ReadBufferFromLibArchive *>(read_buffer.release()));
auto read_buffer_from_libarchive
= std::unique_ptr<ReadBufferFromLibArchive>(static_cast<ReadBufferFromLibArchive *>(read_buffer.release()));
auto handle = std::move(*read_buffer_from_libarchive).releaseHandle();
if (!handle.nextFile())
return nullptr;
Expand All @@ -347,13 +423,23 @@ std::vector<std::string> LibArchiveReader::getAllFiles()

std::vector<std::string> LibArchiveReader::getAllFiles(NameFilter filter)
{
Handle handle(path_to_archive, lock_on_reading);
Handle handle = acquireHandle();
return handle.getAllFiles(filter);
}

void LibArchiveReader::setPassword(const String & /*password_*/)
void LibArchiveReader::setPassword([[maybe_unused]] const String & password_)
{
if (password_ != "")
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", archive_name);
}

LibArchiveReader::Handle LibArchiveReader::acquireHandle()
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", archive_name);
std::lock_guard lock{mutex};
if (archive_read_function)
return Handle{path_to_archive, lock_on_reading, archive_read_function};
else
return Handle{path_to_archive, lock_on_reading};
}

#endif
Expand Down

0 comments on commit d7aa0eb

Please sign in to comment.