Skip to content

Commit

Permalink
Merge pull request #49725 from vitlibar/incremental-backup-append-tab…
Browse files Browse the repository at this point in the history
…le-def

Fix writing appended files to incremental backups
  • Loading branch information
nikitamikhaylov committed May 12, 2023
2 parents 9224204 + 478140f commit 66badcd
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 20 deletions.
46 changes: 26 additions & 20 deletions src/IO/LimitSeekableReadBuffer.cpp
Expand Up @@ -19,7 +19,7 @@ LimitSeekableReadBuffer::LimitSeekableReadBuffer(std::unique_ptr<SeekableReadBuf
, in(std::move(in_))
, min_offset(start_offset_)
, max_offset(start_offset_ + limit_size_)
, need_seek(start_offset_)
, need_seek(min_offset) /// We always start reading from `min_offset`.
{
}

Expand All @@ -31,44 +31,50 @@ bool LimitSeekableReadBuffer::nextImpl()
if (need_seek)
{
/// Do actual seek.
if (in->getPosition() != *need_seek)
if (in->seek(*need_seek, SEEK_SET) != static_cast<off_t>(*need_seek))
{
if (in->seek(*need_seek, SEEK_SET) != static_cast<off_t>(*need_seek))
{
/// Failed to seek, maybe because the new seek position is located after EOF.
set(in->position(), 0);
return false;
}
/// Failed to seek, maybe because the new seek position is located after EOF.
set(in->position(), 0);
return false;
}
need_seek.reset();
}

if (in->getPosition() >= max_offset)
off_t seek_pos = in->getPosition();
off_t offset_after_min = seek_pos - min_offset;
off_t available_before_max = max_offset - seek_pos;

if (offset_after_min < 0 || available_before_max <= 0)
{
/// Limit reached.
set(in->position(), 0);
return false;
}

if (in->eof())
if (in->eof()) /// `in->eof()` can call `in->next()`
{
/// EOF reached.
set(in->position(), 0);
return false;
}

/// Adjust the size of the buffer (we don't allow to read more than `max_offset - min_offset`).
off_t size = in->buffer().size();
size = std::min(size, max_offset - in->getPosition());
/// in->eof() shouldn't change the seek position.
chassert(seek_pos == in->getPosition());

if (!size || (static_cast<off_t>(in->offset()) >= size))
{
/// Limit reached.
set(in->position(), 0);
return false;
}
/// Adjust the beginning and the end of the working buffer.
/// Because we don't want to read before `min_offset` or after `max_offset`.
auto * ptr = in->position();
auto * begin = in->buffer().begin();
auto * end = in->buffer().end();

if (ptr - begin > offset_after_min)
begin = ptr - offset_after_min;
if (end - ptr > available_before_max)
end = ptr + available_before_max;

BufferBase::set(begin, end - begin, ptr - begin);
chassert(position() == ptr && available());

BufferBase::set(in->buffer().begin(), size, in->offset());
return true;
}

Expand Down
23 changes: 23 additions & 0 deletions tests/integration/test_backup_restore_new/test.py
Expand Up @@ -473,6 +473,29 @@ def test_incremental_backup_for_log_family():
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "102\t5081\n"


def test_incremental_backup_append_table_def():
backup_name = new_backup_name()
create_and_fill_table()

assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")

instance.query("ALTER TABLE test.table MODIFY SETTING parts_to_throw_insert=100")

incremental_backup_name = new_backup_name()
instance.query(
f"BACKUP TABLE test.table TO {incremental_backup_name} SETTINGS base_backup = {backup_name}"
)

instance.query("DROP TABLE test.table")
instance.query(f"RESTORE TABLE test.table FROM {incremental_backup_name}")

assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
assert "parts_to_throw_insert = 100" in instance.query(
"SHOW CREATE TABLE test.table"
)


def test_backup_not_found_or_already_exists():
backup_name = new_backup_name()

Expand Down
27 changes: 27 additions & 0 deletions tests/integration/test_backup_restore_s3/test.py
Expand Up @@ -172,3 +172,30 @@ def test_backup_to_s3_native_copy_multipart():
assert node.contains_in_log(
f"copyS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}/"
)


def test_incremental_backup_append_table_def():
backup_name = f"S3('http://minio1:9001/root/data/backups/{new_backup_name()}', 'minio', 'minio123')"

node.query(
"CREATE TABLE data (x UInt32, y String) Engine=MergeTree() ORDER BY y PARTITION BY x%10 SETTINGS storage_policy='policy_s3'"
)

node.query("INSERT INTO data SELECT number, toString(number) FROM numbers(100)")
assert node.query("SELECT count(), sum(x) FROM data") == "100\t4950\n"

node.query(f"BACKUP TABLE data TO {backup_name}")

node.query("ALTER TABLE data MODIFY SETTING parts_to_throw_insert=100")

incremental_backup_name = f"S3('http://minio1:9001/root/data/backups/{new_backup_name()}', 'minio', 'minio123')"

node.query(
f"BACKUP TABLE data TO {incremental_backup_name} SETTINGS base_backup = {backup_name}"
)

node.query("DROP TABLE data")
node.query(f"RESTORE TABLE data FROM {incremental_backup_name}")

assert node.query("SELECT count(), sum(x) FROM data") == "100\t4950\n"
assert "parts_to_throw_insert = 100" in node.query("SHOW CREATE TABLE data")

0 comments on commit 66badcd

Please sign in to comment.