Skip to content

Commit

Permalink
Merge pull request #62188 from ClickHouse/backport/24.3/61972
Browse files Browse the repository at this point in the history
Backport #61972 to 24.3: Fix storage join files loading order
  • Loading branch information
vdimir committed Apr 3, 2024
2 parents 302c565 + ab5826b commit 4d5cfa1
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 5 deletions.
1 change: 0 additions & 1 deletion src/Disks/DiskLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ class DiskLocalDirectoryIterator final : public IDirectoryIterator
return dir_path / entry->path().filename();
}


String name() const override { return entry->path().filename(); }

private:
Expand Down
13 changes: 12 additions & 1 deletion src/Storages/StorageSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ void StorageSetOrJoinBase::restore()
static const char * file_suffix = ".bin";
static const auto file_suffix_size = strlen(".bin");

using FilePriority = std::pair<UInt64, String>;
std::priority_queue<FilePriority, std::vector<FilePriority>, std::greater<>> backup_files;
for (auto dir_it{disk->iterateDirectory(path)}; dir_it->isValid(); dir_it->next())
{
const auto & name = dir_it->name();
Expand All @@ -261,9 +263,18 @@ void StorageSetOrJoinBase::restore()
if (file_num > increment)
increment = file_num;

restoreFromFile(dir_it->path());
backup_files.push({file_num, file_path});
}
}

/// Restore in the same order as blocks were written
/// It may be important for storage Join, user expect to get the first row (unless `join_any_take_last_row` setting is set)
/// but after restart we may have different order of blocks in memory.
while (!backup_files.empty())
{
restoreFromFile(backup_files.top().second);
backup_files.pop();
}
}


Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_async_load_databases/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ def test_dependent_tables(started_cluster):
)
query("create table system.join (n int, m int) engine=Join(any, left, n)")
query("insert into system.join values (1, 1)")
for i in range(2, 100):
query(f"insert into system.join values (1, {i})")

query(
"create table src (n int, m default joinGet('system.join', 'm', 1::int),"
"t default dictGetOrNull('a.d', 'm', toUInt64(3)),"
Expand Down
14 changes: 11 additions & 3 deletions tests/integration/test_join_set_family_s3/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,18 @@ def test_join_s3(cluster):
"CREATE TABLE testLocalJoin(`id` UInt64, `val` String) ENGINE = Join(ANY, LEFT, id)"
)
node.query(
"CREATE TABLE testS3Join(`id` UInt64, `val` String) ENGINE = Join(ANY, LEFT, id) SETTINGS disk='s3'"
"CREATE TABLE testS3Join(`id` UInt64, `val` String) ENGINE = Join(ANY, LEFT, id) SETTINGS disk='s3', join_any_take_last_row = 1"
)

node.query("INSERT INTO testLocalJoin VALUES (1, 'a')")
for i in range(1, 10):
c = chr(ord("a") + i)
node.query(f"INSERT INTO testLocalJoin VALUES (1, '{c}')")

# because of `join_any_take_last_row = 1` we expect the last row with 'a' value
for i in range(1, 10):
c = chr(ord("a") + i)
node.query(f"INSERT INTO testS3Join VALUES (1, '{c}')")
node.query("INSERT INTO testS3Join VALUES (1, 'a')")

assert (
Expand All @@ -105,7 +113,7 @@ def test_join_s3(cluster):
)
== "\t\na\ta\n\t\n"
)
assert_objects_count(cluster, 1)
assert_objects_count(cluster, 10)

node.query("INSERT INTO testLocalJoin VALUES (2, 'b')")
node.query("INSERT INTO testS3Join VALUES (2, 'b')")
Expand All @@ -116,7 +124,7 @@ def test_join_s3(cluster):
)
== "\t\na\ta\nb\tb\n"
)
assert_objects_count(cluster, 2)
assert_objects_count(cluster, 11)

node.restart_clickhouse()
assert (
Expand Down

0 comments on commit 4d5cfa1

Please sign in to comment.