Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix writing appended files to incremental backups #49725

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 26 additions & 20 deletions src/IO/LimitSeekableReadBuffer.cpp
Expand Up @@ -19,7 +19,7 @@ LimitSeekableReadBuffer::LimitSeekableReadBuffer(std::unique_ptr<SeekableReadBuf
, in(std::move(in_))
, min_offset(start_offset_)
, max_offset(start_offset_ + limit_size_)
, need_seek(start_offset_)
, need_seek(min_offset) /// We always start reading from `min_offset`.
{
}

Expand All @@ -31,44 +31,50 @@ bool LimitSeekableReadBuffer::nextImpl()
if (need_seek)
{
/// Do actual seek.
if (in->getPosition() != *need_seek)
if (in->seek(*need_seek, SEEK_SET) != static_cast<off_t>(*need_seek))
{
if (in->seek(*need_seek, SEEK_SET) != static_cast<off_t>(*need_seek))
{
/// Failed to seek, maybe because the new seek position is located after EOF.
set(in->position(), 0);
return false;
}
/// Failed to seek, maybe because the new seek position is located after EOF.
set(in->position(), 0);
return false;
}
need_seek.reset();
}

if (in->getPosition() >= max_offset)
off_t seek_pos = in->getPosition();
off_t offset_after_min = seek_pos - min_offset;
off_t available_before_max = max_offset - seek_pos;

if (offset_after_min < 0 || available_before_max <= 0)
{
/// Limit reached.
set(in->position(), 0);
return false;
}

if (in->eof())
if (in->eof()) /// `in->eof()` can call `in->next()`
{
/// EOF reached.
set(in->position(), 0);
return false;
}

/// Adjust the size of the buffer (we don't allow to read more than `max_offset - min_offset`).
off_t size = in->buffer().size();
size = std::min(size, max_offset - in->getPosition());
/// in->eof() shouldn't change the seek position.
chassert(seek_pos == in->getPosition());

if (!size || (static_cast<off_t>(in->offset()) >= size))
{
/// Limit reached.
set(in->position(), 0);
return false;
}
/// Adjust the beginning and the end of the working buffer.
/// Because we don't want to read before `min_offset` or after `max_offset`.
auto * ptr = in->position();
auto * begin = in->buffer().begin();
auto * end = in->buffer().end();

if (ptr - begin > offset_after_min)
begin = ptr - offset_after_min;
if (end - ptr > available_before_max)
end = ptr + available_before_max;

BufferBase::set(begin, end - begin, ptr - begin);
chassert(position() == ptr && available());

BufferBase::set(in->buffer().begin(), size, in->offset());
Copy link
Member Author

@vitlibar vitlibar May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the calculation was wrong. It started reading from the correct position but both the beginning and the end of the buffer were not adjusted correctly. As a result it correctly assigned the Content-Length for reading the appended file after the matched prefix, but passed a read buffer which couldn't read anything. As a result AWS SDK couldn't read anything from the passed read buffer and failed by timeout always.

return true;
}

Expand Down
23 changes: 23 additions & 0 deletions tests/integration/test_backup_restore_new/test.py
Expand Up @@ -473,6 +473,29 @@ def test_incremental_backup_for_log_family():
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "102\t5081\n"


def test_incremental_backup_append_table_def():
backup_name = new_backup_name()
create_and_fill_table()

assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")

instance.query("ALTER TABLE test.table MODIFY SETTING parts_to_throw_insert=100")

incremental_backup_name = new_backup_name()
instance.query(
f"BACKUP TABLE test.table TO {incremental_backup_name} SETTINGS base_backup = {backup_name}"
)

instance.query("DROP TABLE test.table")
instance.query(f"RESTORE TABLE test.table FROM {incremental_backup_name}")

assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
assert "parts_to_throw_insert = 100" in instance.query(
"SHOW CREATE TABLE test.table"
)


def test_backup_not_found_or_already_exists():
backup_name = new_backup_name()

Expand Down
27 changes: 27 additions & 0 deletions tests/integration/test_backup_restore_s3/test.py
Expand Up @@ -172,3 +172,30 @@ def test_backup_to_s3_native_copy_multipart():
assert node.contains_in_log(
f"copyS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}/"
)


def test_incremental_backup_append_table_def():
backup_name = f"S3('http://minio1:9001/root/data/backups/{new_backup_name()}', 'minio', 'minio123')"

node.query(
"CREATE TABLE data (x UInt32, y String) Engine=MergeTree() ORDER BY y PARTITION BY x%10 SETTINGS storage_policy='policy_s3'"
)

node.query("INSERT INTO data SELECT number, toString(number) FROM numbers(100)")
assert node.query("SELECT count(), sum(x) FROM data") == "100\t4950\n"

node.query(f"BACKUP TABLE data TO {backup_name}")

node.query("ALTER TABLE data MODIFY SETTING parts_to_throw_insert=100")

incremental_backup_name = f"S3('http://minio1:9001/root/data/backups/{new_backup_name()}', 'minio', 'minio123')"

node.query(
f"BACKUP TABLE data TO {incremental_backup_name} SETTINGS base_backup = {backup_name}"
)

node.query("DROP TABLE data")
node.query(f"RESTORE TABLE data FROM {incremental_backup_name}")

assert node.query("SELECT count(), sum(x) FROM data") == "100\t4950\n"
assert "parts_to_throw_insert = 100" in node.query("SHOW CREATE TABLE data")