Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the order in which the locks are acquired #7542

Merged
merged 7 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/backend/distributed/utils/resource_lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -707,13 +707,10 @@ SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode)
}

List *replicatedShardList = NIL;
if (AnyTableReplicated(shardIntervalList, &replicatedShardList))
{
if (ClusterHasKnownMetadataWorkers() && !IsFirstWorkerNode())
{
LockShardListResourcesOnFirstWorker(lockMode, replicatedShardList);
}
bool anyTableReplicated = AnyTableReplicated(shardIntervalList, &replicatedShardList);

if (anyTableReplicated)
{
ShardInterval *firstShardInterval =
(ShardInterval *) linitial(replicatedShardList);
if (ReferenceTableShardId(firstShardInterval->shardId))
Expand All @@ -728,7 +725,10 @@ SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode)
LockReferencedReferenceShardResources(firstShardInterval->shardId, lockMode);
}
}

if (anyTableReplicated && ClusterHasKnownMetadataWorkers() && !IsFirstWorkerNode())
Copy link
Member

@onderkalaci onderkalaci Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that with this change we are diverging the behavior of Postgres by first locking the referenced tables/shards when there is a foreign key. Postgres acquires the locks on main table first, then cascades into the referenced tables.

So could this change might introduce new classes of concurrency issues where there are concurrent modifications to the ReferencedTables (e.g., distributed table) while there are modifications to the main table (e.g., reference table)?

Also this code doesn't look like what the PR description tells, it seems doing the opposite.

It feels like the code should be much more explicit about what we are doing, for example, the following code block is how I think this logic should look like -- though probably the code could be nicer:

bool acquiredLocally = false

// simplest case, no query from any node
// first acquire the locks on the modified table
// then cascade into the referenced tables
if (!ClusterHasKnownMetadataWorkers())
	LockShardListResources(shardIntervalList, lockMode);
	for replicatedShard in replicatedShards
		LockReferencedReferenceShardResourcesLocally(replicatedTable->firstShardInterval->shardId, lockMode);

	acquiredLocally = true
else
	
	// moderate case, we are the first worker node
	// acquire the locks locally, same as the simplest case
	if IsFirstWorkerNode() && anyTableReplicated
		LockShardListResources(shardIntervalList, lockMode);
		for replicatedShard in replicatedShards
			LockReferencedReferenceShardResourcesLocally(replicatedTable->firstShardInterval->shardId, lockMode);
		acquiredLocally = true

	// ok, not we are on a worker that is not first worker node or we are on the coordinator
	else
		LockShardListResourcesOnFirstWorker(lockMode, replicatedShardList);
		for replicatedShard in replicatedShards
			LockReferencedReferenceShardResourcesOnTheFirstNode(..)

// ok, above we serialized all the modifications on the relevant tables across nodes
// by acquiring the locks on the first worker node
// so no two concurrent modifications across multiple nodes could interleave any modifications

// so, now lets always acquire the locks locally as there are several other concurrency scenarios
// where we expect shard resource locks are acquired locally
if !acquiredLocally 
	LockShardListResources(shardIntervalList, lockMode);
	for replicatedShard in replicatedShards
		LockReferencedReferenceShardResourcesLocally(replicatedTable->firstShardInterval->shardId, lockMode);

note that I think it is even OK to drop all code relevant to !ClusterHasKnownMetadataWorkers() given that with Citus 11.0 we expect all clusters to have metadata synced. This is like a safe-guard in case old clusters upgraded to CItus 11.0 could not sync the metadata.

Copy link
Contributor Author

@eaydingol eaydingol Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the lead regarding the lock order.

I still think the PR description is right, please let me know if I miss anything. Prior to the changes, "LockShardListResources(shardIntervalList, lockMode);" was the last statement, which acquires the locks for the modified table locally. It is also the last lock acquired when the request is initiated from the first worker node. In particular, the PR changes the order in which the locks are acquired on the first worker node with respect to the node that received the request.
The first version acquired locks on the reference tables and then the modified table.

As you suggested, I changed the order. With the last commit, independent of the node that received the request, the locks are acquired for the modified table and then the reference tables on the first node.

I noticed your suggestion to acquire remote locks before local ones. In the current version, locks are acquired for the modified table on the first worker, followed by local locks. Similarly, locks for reference tables are obtained on the first worker and then locally. I haven’t incorporated that suggestion yet.

Copy link
Contributor

@JelteF JelteF Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm quite sure the code in this PR is correct.

I do agree with Onder that the flow in this function is somewhat hard to grasp. But I don't think that's the fault of this PR. I feel like the main reason for that is that LockReferencedReferenceShardResources internally does both first-worker and local locking. So that function call is hiding some of the symmetry between the two types of locks. In Onder his pseudo code that symmetry is more pronounced and thus the logic feels easier to follow.

Given we likely want to backport this change though, I don't think we should do such a refactor in this PR.

{
LockShardListResourcesOnFirstWorker(lockMode, replicatedShardList);
}
LockShardListResources(shardIntervalList, lockMode);
}

Expand Down
62 changes: 62 additions & 0 deletions src/test/regress/expected/issue_7477.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
--- Test for updating a table that has a foreign key reference to another reference table.
--- Issue #7477: Distributed deadlock after issuing a simple UPDATE statement
--- https://github.com/citusdata/citus/issues/7477
CREATE TABLE table1 (id INT PRIMARY KEY);
SELECT create_reference_table('table1');
create_reference_table
---------------------------------------------------------------------

(1 row)

INSERT INTO table1 VALUES (1);
CREATE TABLE table2 (
id INT,
info TEXT,
CONSTRAINT table1_id_fk FOREIGN KEY (id) REFERENCES table1 (id)
);
SELECT create_reference_table('table2');
create_reference_table
---------------------------------------------------------------------

(1 row)

INSERT INTO table2 VALUES (1, 'test');
--- Runs the update command in parallel on workers.
--- Due to bug #7477, before the fix, the result is non-deterministic
--- and have several rows of the form:
--- localhost | 57638 | f | ERROR: deadlock detected
--- localhost | 57637 | f | ERROR: deadlock detected
--- localhost | 57637 | f | ERROR: canceling the transaction since it was involved in a distributed deadlock
SELECT * FROM master_run_on_worker(
ARRAY['localhost', 'localhost','localhost', 'localhost','localhost',
'localhost','localhost', 'localhost','localhost', 'localhost']::text[],
ARRAY[57638, 57637, 57637, 57638, 57637, 57638, 57637, 57638, 57638, 57637]::int[],
ARRAY['UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1'
]::text[],
true);
node_name | node_port | success | result
---------------------------------------------------------------------
localhost | 57638 | t | UPDATE 1
localhost | 57637 | t | UPDATE 1
localhost | 57637 | t | UPDATE 1
localhost | 57638 | t | UPDATE 1
localhost | 57637 | t | UPDATE 1
localhost | 57638 | t | UPDATE 1
localhost | 57637 | t | UPDATE 1
localhost | 57638 | t | UPDATE 1
localhost | 57638 | t | UPDATE 1
localhost | 57637 | t | UPDATE 1
(10 rows)

--- cleanup
DROP TABLE table2;
DROP TABLE table1;
2 changes: 1 addition & 1 deletion src/test/regress/multi_schedule
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement
test: binary_protocol
test: alter_table_set_access_method
test: alter_distributed_table
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477
test: object_propagation_debug
test: undistribute_table
test: run_command_on_all_nodes
Expand Down
44 changes: 44 additions & 0 deletions src/test/regress/sql/issue_7477.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

--- Test for updating a table that has a foreign key reference to another reference table.
--- Issue #7477: Distributed deadlock after issuing a simple UPDATE statement
--- https://github.com/citusdata/citus/issues/7477

CREATE TABLE table1 (id INT PRIMARY KEY);
SELECT create_reference_table('table1');
INSERT INTO table1 VALUES (1);

CREATE TABLE table2 (
id INT,
info TEXT,
CONSTRAINT table1_id_fk FOREIGN KEY (id) REFERENCES table1 (id)
);
SELECT create_reference_table('table2');
INSERT INTO table2 VALUES (1, 'test');

--- Runs the update command in parallel on workers.
--- Due to bug #7477, before the fix, the result is non-deterministic
--- and have several rows of the form:
--- localhost | 57638 | f | ERROR: deadlock detected
--- localhost | 57637 | f | ERROR: deadlock detected
--- localhost | 57637 | f | ERROR: canceling the transaction since it was involved in a distributed deadlock

SELECT * FROM master_run_on_worker(
ARRAY['localhost', 'localhost','localhost', 'localhost','localhost',
'localhost','localhost', 'localhost','localhost', 'localhost']::text[],
ARRAY[57638, 57637, 57637, 57638, 57637, 57638, 57637, 57638, 57638, 57637]::int[],
ARRAY['UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1'
]::text[],
true);

--- cleanup
DROP TABLE table2;
DROP TABLE table1;
Loading