Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inconsistent parts after clone replica #6523

Merged
merged 3 commits into from Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 29 additions & 2 deletions dbms/src/Storages/StorageReplicatedMergeTree.cpp
Expand Up @@ -1956,10 +1956,37 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
}

/// Add to the queue jobs to receive all the active parts that the reference/master replica has.
Strings parts = zookeeper->getChildren(source_path + "/parts");
ActiveDataPartSet active_parts_set(format_version, parts);
Strings source_replica_parts = zookeeper->getChildren(source_path + "/parts");
ActiveDataPartSet active_parts_set(format_version, source_replica_parts);

Strings active_parts = active_parts_set.getParts();

/// Remove local parts if source replica does not have them, because such parts will never be fetched by other replicas.
Strings local_parts_in_zk = zookeeper->getChildren(replica_path + "/parts");
Strings parts_to_remove_from_zk;
for (const auto & part : local_parts_in_zk)
{
if (active_parts_set.getContainingPart(part).empty())
{
queue.remove(zookeeper, part);
parts_to_remove_from_zk.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part " << part << ". Removing it from ZooKeeper.");
}
}
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_from_zk);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. But all usages of this function look suspicious because result is not checked.


auto local_active_parts = getDataParts();
DataPartsVector parts_to_remove_from_working_set;
for (const auto & part : local_active_parts)
{
if (active_parts_set.getContainingPart(part->name).empty())
{
parts_to_remove_from_working_set.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part " << part->name << ". Removing it from working set.");
}
}
removePartsFromWorkingSet(parts_to_remove_from_working_set, true);

for (const String & name : active_parts)
{
LogEntry log_entry;
Expand Down
@@ -0,0 +1,19 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>shard_0</default_database>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<default_database>shard_0</default_database>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>
@@ -0,0 +1,60 @@
import pytest

from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry


def fill_nodes(nodes, shard):
for node in nodes:
node.query(
'''
CREATE DATABASE test;
CREATE TABLE test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}')
ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
'''.format(shard=shard, replica=node.name))


cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)

@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
fill_nodes([node1, node2], 1)
yield cluster
except Exception as ex:
print ex
finally:
cluster.shutdown()


def test_inconsistent_parts_if_drop_while_replica_not_active(start_cluster):
with PartitionManager() as pm:
# insert into all replicas
for i in range(50):
node1.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(i))
assert_eq_with_retry(node2, "SELECT count(*) FROM test_table", node1.query("SELECT count(*) FROM test_table"))

# disable network on the first replica
pm.partition_instances(node1, node2)
pm.drop_instance_zk_connections(node1)

# drop all parts on the second replica
node2.query_with_retry("ALTER TABLE test_table DROP PARTITION 201908")
assert_eq_with_retry(node2, "SELECT count(*) FROM test_table", "0")

# insert into the second replica
# DROP_RANGE will be removed from the replication log and the first replica will be lost
for i in range(50):
node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(50 + i))

# the first replica will be cloned from the second
pm.heal_all()
assert_eq_with_retry(node1, "SELECT count(*) FROM test_table", node2.query("SELECT count(*) FROM test_table"))