Skip to content

Commit

Permalink
Merge pull request #55455 from ClickHouse/cherrypick/23.3/666c690b4f4…
Browse files Browse the repository at this point in the history
…356d283b48eed91adb749cdeb9366

Cherry pick #55309 to 23.3: Fix bug with inability to drop detached partition in replicated merge tree on top of S3 without zero copy
  • Loading branch information
CheSema committed Oct 13, 2023
2 parents 36a0e53 + 3b40cd6 commit acf0f3d
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 5 deletions.
29 changes: 25 additions & 4 deletions src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,10 +633,17 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile

if (autocommit)
{
create_metadata_callback = [tx = shared_from_this(), mode, path, blob_name] (size_t count)
create_metadata_callback = [tx = shared_from_this(), mode, path, blob_name](size_t count)
{
if (mode == WriteMode::Rewrite)
{
// Otherwise we will produce lost blobs which nobody points to
/// WriteOnce storages are not affected by the issue
if (!tx->object_storage.isWriteOnce() && tx->metadata_storage.exists(path))
tx->object_storage.removeObjectsIfExist(tx->metadata_storage.getStorageObjects(path));

tx->metadata_transaction->createMetadataFile(path, blob_name, count);
}
else
tx->metadata_transaction->addBlobToMetadata(path, blob_name, count);

Expand All @@ -645,7 +652,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
}
else
{
create_metadata_callback = [write_op = write_operation.get(), mode, path, blob_name] (size_t count)
create_metadata_callback = [object_storage_tx = shared_from_this(), write_op = write_operation.get(), mode, path, blob_name](size_t count)
{
/// This callback called in WriteBuffer finalize method -- only there we actually know
/// how many bytes were written. We don't control when this finalize method will be called
Expand All @@ -657,15 +664,24 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
/// ...
/// buf1->finalize() // shouldn't do anything with metadata operations, just memoize what to do
/// tx->commit()
write_op->setOnExecute([mode, path, blob_name, count](MetadataTransactionPtr tx)
write_op->setOnExecute([object_storage_tx, mode, path, blob_name, count](MetadataTransactionPtr tx)
{
if (mode == WriteMode::Rewrite)
{
/// Otherwise we will produce lost blobs which nobody points to
/// WriteOnce storages are not affected by the issue
if (!object_storage_tx->object_storage.isWriteOnce() && object_storage_tx->metadata_storage.exists(path))
{
object_storage_tx->object_storage.removeObjectsIfExist(
object_storage_tx->metadata_storage.getStorageObjects(path));
}

tx->createMetadataFile(path, blob_name, count);
}
else
tx->addBlobToMetadata(path, blob_name, count);
});
};

}

operations_to_execute.emplace_back(std::move(write_operation));
Expand Down Expand Up @@ -711,7 +727,12 @@ void DiskObjectStorageTransaction::writeFileUsingCustomWriteObject(

/// Create metadata (see create_metadata_callback in DiskObjectStorageTransaction::writeFile()).
if (mode == WriteMode::Rewrite)
{
if (!object_storage.isWriteOnce() && metadata_storage.exists(path))
object_storage.removeObjectsIfExist(metadata_storage.getStorageObjects(path));

metadata_transaction->createMetadataFile(path, blob_name, object_size);
}
else
metadata_transaction->addBlobToMetadata(path, blob_name, object_size);

Expand Down
3 changes: 2 additions & 1 deletion src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8988,7 +8988,8 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(

bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name)
{
if (disk->supportZeroCopyReplication())
auto settings_ptr = getSettings();
if (disk->supportZeroCopyReplication() && settings_ptr->allow_remote_fs_zero_copy_replication)
{
String table_id = getTableSharedID();
return removeSharedDetachedPart(disk, path, part_name, table_id, replica_name, zookeeper_path, getContext(), current_zookeeper);
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<clickhouse>
<logger>
<level>test</level>
</logger>

<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<default>
<default>
<disk>default</disk>
</default>
</default>
<s3>
<volumes>
<main>
<disk>s3</disk>
<prefer_not_to_merge>False</prefer_not_to_merge>
<perform_ttl_move_on_insert>True</perform_ttl_move_on_insert>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
</clickhouse>
106 changes: 106 additions & 0 deletions tests/integration/test_replicated_s3_zero_copy_drop_partition/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/usr/bin/env python3

import logging
import random
import string
import time

import pytest
from helpers.cluster import ClickHouseCluster
import minio


cluster = ClickHouseCluster(__file__)


@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.add_instance(
"node1",
main_configs=["configs/storage_conf.xml"],
with_minio=True,
with_zookeeper=True,
)
cluster.start()

yield cluster
finally:
cluster.shutdown()


def get_objects_in_data_path():
minio = cluster.minio_client
objects = minio.list_objects(cluster.minio_bucket, "data/", recursive=True)
return [obj.object_name for obj in objects]


def test_drop_after_fetch(started_cluster):
node1 = cluster.instances["node1"]

node1.query(
"""
CREATE TABLE test_local(c1 Int8, c2 Date) ENGINE = ReplicatedMergeTree('/test/tables/shard/test_local', '1') PARTITION BY c2 ORDER BY c2
"""
)
node1.query(
"""
CREATE TABLE test_s3(c1 Int8, c2 Date) ENGINE = ReplicatedMergeTree('/test/tables/shard/test_s3', '1') PARTITION BY c2 ORDER BY c2 SETTINGS storage_policy = 's3', allow_remote_fs_zero_copy_replication=0
"""
)

node1.query("INSERT INTO test_local VALUES (1, '2023-10-04'), (2, '2023-10-04')")

assert node1.query("SELECT count() FROM test_local") == "2\n"

objects_before = get_objects_in_data_path()
node1.query(
"ALTER TABLE test_s3 FETCH PARTITION '2023-10-04' FROM '/test/tables/shard/test_local'"
)

node1.query(
"ALTER TABLE test_s3 DROP DETACHED PARTITION '2023-10-04' SETTINGS allow_drop_detached = 1"
)

objects_after = get_objects_in_data_path()

assert objects_before == objects_after


def test_drop_complex_columns(started_cluster):
start_objects = get_objects_in_data_path()
print("Objects before", start_objects)
node1 = cluster.instances["node1"]
node1.query(
"""
CREATE TABLE test_s3_complex_types(
c1 Int8,
c2 Date,
`c3.k` Array(String),
`c3.v1` Array(Int64),
`c3.v3` Array(Int64),
`c3.v4` Array(Int64)
) ENGINE = MergeTree
order by (c1,c2) SETTINGS storage_policy = 's3',
min_bytes_for_wide_part=1,
vertical_merge_algorithm_min_rows_to_activate=1,
vertical_merge_algorithm_min_columns_to_activate=1;"""
)

node1.query(
"insert into test_s3_complex_types values(1,toDate('2020-10-01'), ['a','b'], [1,2], [3,4], [5,6])"
)

node1.query(
"insert into test_s3_complex_types values(1,toDate('2020-10-01'), ['a','b'], [7,8], [9,10], [11,12])"
)

print("Objects in insert", get_objects_in_data_path())
node1.query("optimize table test_s3_complex_types final")

print("Objects in optimize", get_objects_in_data_path())

node1.query("DROP TABLE test_s3_complex_types SYNC")
end_objects = get_objects_in_data_path()
print("Objects after drop", end_objects)
assert start_objects == end_objects

0 comments on commit acf0f3d

Please sign in to comment.