Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix deadlock during waiting for apply worker stop (issue #258) #260

Open
wants to merge 11 commits into
base: REL2_x_STABLE
Choose a base branch
from
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ DATA = pglogical--1.0.0.sql pglogical--1.0.0--1.0.1.sql \
pglogical--2.3.2--2.3.3.sql \
pglogical--2.3.3.sql \
pglogical--2.3.3--2.3.4.sql \
pglogical--2.3.4.sql
pglogical--2.3.4.sql \
pglogical--2.3.4-1.sql \
pglogical--2.3.4--2.3.4.1.sql

OBJS = pglogical_apply.o pglogical_conflict.o pglogical_manager.o \
pglogical.o pglogical_node.o pglogical_relcache.o \
Expand Down
34 changes: 34 additions & 0 deletions pglogical--2.3.4--2.3.4.1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
ALTER TABLE pglogical.subscription ADD COLUMN sub_data_replace boolean NOT NULL DEFAULT true;
ALTER TABLE pglogical.subscription ADD COLUMN sub_after_sync_queries text[];

DROP FUNCTION pglogical.create_subscription(subscription_name name, provider_dsn text,
replication_sets text[], synchronize_structure boolean,
synchronize_data boolean, forward_origins text[], apply_delay interval,
force_text_transfer boolean);
CREATE FUNCTION pglogical.create_subscription(subscription_name name, provider_dsn text,
replication_sets text[] = '{default,default_insert_only,ddl_sql}', synchronize_structure boolean = false,
synchronize_data boolean = true, data_replace boolean = true, after_sync_queries text[] = '{}',
forward_origins text[] = '{all}', apply_delay interval DEFAULT '0',
force_text_transfer boolean = false)
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_create_subscription';

ALTER TABLE pglogical.replication_set_table ADD COLUMN set_sync_clear_filter text;

DROP FUNCTION pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean,
columns text[], row_filter text);
CREATE FUNCTION pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean DEFAULT false,
columns text[] DEFAULT NULL, row_filter text DEFAULT NULL, sync_clear_filter text DEFAULT NULL)
RETURNS boolean CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_add_table';

TRUNCATE pglogical.queue;
ALTER TABLE pglogical.queue ADD COLUMN node_id oid REFERENCES node(node_id);
ALTER TABLE pglogical.queue ADD COLUMN original_node_id oid REFERENCES node(node_id);


DROP FUNCTION pglogical.wait_for_subscription_sync_complete(subscription_name name);
CREATE FUNCTION pglogical.wait_for_subscription_sync_complete(subscription_name name)
RETURNS boolean RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_wait_for_subscription_sync_complete';

DROP FUNCTION pglogical.wait_for_table_sync_complete(subscription_name name, relation regclass);
CREATE FUNCTION pglogical.wait_for_table_sync_complete(subscription_name name, relation regclass)
RETURNS boolean RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_wait_for_table_sync_complete';
255 changes: 255 additions & 0 deletions pglogical--2.3.4-1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
\echo Use "CREATE EXTENSION pglogical" to load this file. \quit

CREATE TABLE pglogical.node (
node_id oid NOT NULL PRIMARY KEY,
node_name name NOT NULL UNIQUE
) WITH (user_catalog_table=true);

CREATE TABLE pglogical.node_interface (
if_id oid NOT NULL PRIMARY KEY,
if_name name NOT NULL, -- default same as node name
if_nodeid oid REFERENCES node(node_id),
if_dsn text NOT NULL,
UNIQUE (if_nodeid, if_name)
);

CREATE TABLE pglogical.local_node (
node_id oid PRIMARY KEY REFERENCES node(node_id),
node_local_interface oid NOT NULL REFERENCES node_interface(if_id)
);

CREATE TABLE pglogical.subscription (
sub_id oid NOT NULL PRIMARY KEY,
sub_name name NOT NULL UNIQUE,
sub_origin oid NOT NULL REFERENCES node(node_id),
sub_target oid NOT NULL REFERENCES node(node_id),
sub_origin_if oid NOT NULL REFERENCES node_interface(if_id),
sub_target_if oid NOT NULL REFERENCES node_interface(if_id),
sub_enabled boolean NOT NULL DEFAULT true,
sub_data_replace boolean NOT NULL DEFAULT true,
sub_slot_name name NOT NULL,
sub_replication_sets text[],
sub_forward_origins text[],
sub_apply_delay interval NOT NULL DEFAULT '0',
sub_force_text_transfer boolean NOT NULL DEFAULT 'f',
sub_after_sync_queries text[]
);

CREATE TABLE pglogical.local_sync_status (
sync_kind "char" NOT NULL CHECK (sync_kind IN ('i', 's', 'd', 'f')),
sync_subid oid NOT NULL REFERENCES pglogical.subscription(sub_id),
sync_nspname name,
sync_relname name,
sync_status "char" NOT NULL,
sync_statuslsn pg_lsn NOT NULL,
UNIQUE (sync_subid, sync_nspname, sync_relname)
);


CREATE FUNCTION pglogical.create_node(node_name name, dsn text)
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_create_node';
CREATE FUNCTION pglogical.drop_node(node_name name, ifexists boolean DEFAULT false)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_drop_node';

CREATE FUNCTION pglogical.alter_node_add_interface(node_name name, interface_name name, dsn text)
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_node_add_interface';
CREATE FUNCTION pglogical.alter_node_drop_interface(node_name name, interface_name name)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_node_drop_interface';

CREATE FUNCTION pglogical.create_subscription(subscription_name name, provider_dsn text,
replication_sets text[] = '{default,default_insert_only,ddl_sql}', synchronize_structure boolean = false,
synchronize_data boolean = true, data_replace boolean = true, after_sync_queries text[] = '{}',
forward_origins text[] = '{all}', apply_delay interval DEFAULT '0',
force_text_transfer boolean = false)
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_create_subscription';
CREATE FUNCTION pglogical.drop_subscription(subscription_name name, ifexists boolean DEFAULT false)
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_drop_subscription';

CREATE FUNCTION pglogical.alter_subscription_interface(subscription_name name, interface_name name)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_interface';

CREATE FUNCTION pglogical.alter_subscription_disable(subscription_name name, immediate boolean DEFAULT false)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_disable';
CREATE FUNCTION pglogical.alter_subscription_enable(subscription_name name, immediate boolean DEFAULT false)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_enable';

CREATE FUNCTION pglogical.alter_subscription_add_replication_set(subscription_name name, replication_set name)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_add_replication_set';
CREATE FUNCTION pglogical.alter_subscription_remove_replication_set(subscription_name name, replication_set name)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_remove_replication_set';

CREATE FUNCTION pglogical.show_subscription_status(subscription_name name DEFAULT NULL,
OUT subscription_name text, OUT status text, OUT provider_node text,
OUT provider_dsn text, OUT slot_name text, OUT replication_sets text[],
OUT forward_origins text[])
RETURNS SETOF record STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_show_subscription_status';

CREATE TABLE pglogical.replication_set (
set_id oid NOT NULL PRIMARY KEY,
set_nodeid oid NOT NULL,
set_name name NOT NULL,
replicate_insert boolean NOT NULL DEFAULT true,
replicate_update boolean NOT NULL DEFAULT true,
replicate_delete boolean NOT NULL DEFAULT true,
replicate_truncate boolean NOT NULL DEFAULT true,
UNIQUE (set_nodeid, set_name)
) WITH (user_catalog_table=true);

CREATE TABLE pglogical.replication_set_table (
set_id oid NOT NULL,
set_reloid regclass NOT NULL,
set_att_list text[],
set_row_filter pg_node_tree,
set_sync_clear_filter text,
PRIMARY KEY(set_id, set_reloid)
) WITH (user_catalog_table=true);

CREATE TABLE pglogical.replication_set_seq (
set_id oid NOT NULL,
set_seqoid regclass NOT NULL,
PRIMARY KEY(set_id, set_seqoid)
) WITH (user_catalog_table=true);

CREATE TABLE pglogical.sequence_state (
seqoid oid NOT NULL PRIMARY KEY,
cache_size integer NOT NULL,
last_value bigint NOT NULL
) WITH (user_catalog_table=true);

CREATE TABLE pglogical.depend (
classid oid NOT NULL,
objid oid NOT NULL,
objsubid integer NOT NULL,

refclassid oid NOT NULL,
refobjid oid NOT NULL,
refobjsubid integer NOT NULL,

deptype "char" NOT NULL
) WITH (user_catalog_table=true);

CREATE VIEW pglogical.TABLES AS
WITH set_relations AS (
SELECT s.set_name, r.set_reloid
FROM pglogical.replication_set_table r,
pglogical.replication_set s,
pglogical.local_node n
WHERE s.set_nodeid = n.node_id
AND s.set_id = r.set_id
),
user_tables AS (
SELECT r.oid, n.nspname, r.relname, r.relreplident
FROM pg_catalog.pg_class r,
pg_catalog.pg_namespace n
WHERE r.relkind = 'r'
AND r.relpersistence = 'p'
AND n.oid = r.relnamespace
AND n.nspname !~ '^pg_'
AND n.nspname != 'information_schema'
AND n.nspname != 'pglogical'
)
SELECT r.oid AS relid, n.nspname, r.relname, s.set_name
FROM pg_catalog.pg_namespace n,
pg_catalog.pg_class r,
set_relations s
WHERE r.relkind = 'r'
AND n.oid = r.relnamespace
AND r.oid = s.set_reloid
UNION
SELECT t.oid AS relid, t.nspname, t.relname, NULL
FROM user_tables t
WHERE t.oid NOT IN (SELECT set_reloid FROM set_relations);

CREATE FUNCTION pglogical.create_replication_set(set_name name,
replicate_insert boolean = true, replicate_update boolean = true,
replicate_delete boolean = true, replicate_truncate boolean = true)
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_create_replication_set';
CREATE FUNCTION pglogical.alter_replication_set(set_name name,
replicate_insert boolean DEFAULT NULL, replicate_update boolean DEFAULT NULL,
replicate_delete boolean DEFAULT NULL, replicate_truncate boolean DEFAULT NULL)
RETURNS oid CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_replication_set';
CREATE FUNCTION pglogical.drop_replication_set(set_name name, ifexists boolean DEFAULT false)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_drop_replication_set';

CREATE FUNCTION pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean DEFAULT false,
columns text[] DEFAULT NULL, row_filter text DEFAULT NULL, sync_clear_filter text DEFAULT NULL)
RETURNS boolean CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_add_table';
CREATE FUNCTION pglogical.replication_set_add_all_tables(set_name name, schema_names text[], synchronize_data boolean DEFAULT false)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_add_all_tables';
CREATE FUNCTION pglogical.replication_set_remove_table(set_name name, relation regclass)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_remove_table';

CREATE FUNCTION pglogical.replication_set_add_sequence(set_name name, relation regclass, synchronize_data boolean DEFAULT false)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_add_sequence';
CREATE FUNCTION pglogical.replication_set_add_all_sequences(set_name name, schema_names text[], synchronize_data boolean DEFAULT false)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_add_all_sequences';
CREATE FUNCTION pglogical.replication_set_remove_sequence(set_name name, relation regclass)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_remove_sequence';

CREATE FUNCTION pglogical.alter_subscription_synchronize(subscription_name name, truncate boolean DEFAULT false)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_synchronize';

CREATE FUNCTION pglogical.alter_subscription_resynchronize_table(subscription_name name, relation regclass,
truncate boolean DEFAULT true)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_resynchronize_table';

CREATE FUNCTION pglogical.synchronize_sequence(relation regclass)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_synchronize_sequence';

CREATE FUNCTION pglogical.table_data_filtered(reltyp anyelement, relation regclass, repsets text[])
RETURNS SETOF anyelement CALLED ON NULL INPUT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_table_data_filtered';

CREATE FUNCTION pglogical.show_repset_table_info(relation regclass, repsets text[], OUT relid oid, OUT nspname text,
OUT relname text, OUT att_list text[], OUT has_row_filter boolean, OUT sync_clear_filter text)
RETURNS record STRICT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_show_repset_table_info';

CREATE FUNCTION pglogical.show_subscription_table(subscription_name name, relation regclass, OUT nspname text, OUT relname text, OUT status text)
RETURNS record STRICT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_show_subscription_table';

CREATE TABLE pglogical.queue (
node_id oid REFERENCES node(node_id),
original_node_id oid REFERENCES node(node_id),
queued_at timestamp with time zone NOT NULL,
role name NOT NULL,
replication_sets text[],
message_type "char" NOT NULL,
message json NOT NULL
);

CREATE FUNCTION pglogical.replicate_ddl_command(command text, replication_sets text[] DEFAULT '{ddl_sql}')
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replicate_ddl_command';

CREATE OR REPLACE FUNCTION pglogical.queue_truncate()
RETURNS trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_queue_truncate';

CREATE FUNCTION pglogical.pglogical_node_info(OUT node_id oid, OUT node_name text, OUT sysid text, OUT dbname text, OUT replication_sets text)
RETURNS record
STABLE STRICT LANGUAGE c AS 'MODULE_PATHNAME';

CREATE FUNCTION pglogical.pglogical_gen_slot_name(name, name, name)
RETURNS name
IMMUTABLE STRICT LANGUAGE c AS 'MODULE_PATHNAME';

CREATE FUNCTION pglogical_version() RETURNS text
LANGUAGE c AS 'MODULE_PATHNAME';

CREATE FUNCTION pglogical_version_num() RETURNS integer
LANGUAGE c AS 'MODULE_PATHNAME';

CREATE FUNCTION pglogical_max_proto_version() RETURNS integer
LANGUAGE c AS 'MODULE_PATHNAME';

CREATE FUNCTION pglogical_min_proto_version() RETURNS integer
LANGUAGE c AS 'MODULE_PATHNAME';

CREATE FUNCTION
pglogical.wait_slot_confirm_lsn(slotname name, target pg_lsn)
RETURNS void LANGUAGE c AS 'pglogical','pglogical_wait_slot_confirm_lsn';
CREATE FUNCTION pglogical.wait_for_subscription_sync_complete(subscription_name name)
RETURNS boolean RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_wait_for_subscription_sync_complete';

CREATE FUNCTION pglogical.wait_for_table_sync_complete(subscription_name name, relation regclass)
RETURNS boolean RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_wait_for_table_sync_complete';

CREATE FUNCTION pglogical.xact_commit_timestamp_origin("xid" xid, OUT "timestamp" timestamptz, OUT "roident" oid)
RETURNS record RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_xact_commit_timestamp_origin';
1 change: 1 addition & 0 deletions pglogical--2.3.4.sql
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,4 @@ RETURNS void RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME'

CREATE FUNCTION pglogical.xact_commit_timestamp_origin("xid" xid, OUT "timestamp" timestamptz, OUT "roident" oid)
RETURNS record RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_xact_commit_timestamp_origin';

4 changes: 2 additions & 2 deletions pglogical.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

#include "pglogical_compat.h"

#define PGLOGICAL_VERSION "2.3.4"
#define PGLOGICAL_VERSION_NUM 20304
#define PGLOGICAL_VERSION "2.3.4-1"
#define PGLOGICAL_VERSION_NUM 2030401

#define PGLOGICAL_MIN_PROTO_VERSION_NUM 1
#define PGLOGICAL_MAX_PROTO_VERSION_NUM 1
Expand Down
35 changes: 25 additions & 10 deletions pglogical_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ static TransactionId remote_xid;

static void multi_insert_finish(void);

static void handle_queued_message(HeapTuple msgtup, bool tx_just_started);
static void handle_queued_message(QueuedMessage *queued_message, bool tx_just_started);
static void handle_startup_param(const char *key, const char *value);
static bool parse_bool_param(const char *key, const char *value);
static void process_syncing_tables(XLogRecPtr end_lsn);
Expand Down Expand Up @@ -576,9 +576,6 @@ handle_insert(StringInfo s)
}
}

/* Normal insert. */
apply_api.do_insert(rel, &newtup);

/* if INSERT was into our queue, process the message. */
if (RelationGetRelid(rel->rel) == QueueRelid)
{
Expand All @@ -593,12 +590,29 @@ handle_insert(StringInfo s)
ht = heap_form_tuple(RelationGetDescr(rel->rel),
newtup.values, newtup.nulls);

QueuedMessage *queued_message = queued_message_from_tuple(ht);
PGLogicalLocalNode *local_node = get_local_node(false, false);

// ignore queue messages forwarded from local node and not original messages
if ( queued_message->node_id == local_node->node->id ||
queued_message->node_id != queued_message->orig_node_id )
{
pglogical_relation_close(rel, NoLock);
return;
}

// change node_id to current
queued_message_tuple_set_local_node_id( &newtup.values, local_node->node->id );

/* Normal insert. */
apply_api.do_insert(rel, &newtup);

LockRelationIdForSession(&lockid, RowExclusiveLock);
pglogical_relation_close(rel, NoLock);

apply_api.on_commit();

handle_queued_message(ht, started_tx);
handle_queued_message(queued_message, started_tx);

heap_freetuple(ht);

Expand All @@ -615,7 +629,11 @@ handle_insert(StringInfo s)
// CommitTransactionCommand();
}
else
pglogical_relation_close(rel, NoLock);
{
/* Normal insert. */
apply_api.do_insert(rel, &newtup);
pglogical_relation_close( rel, NoLock );
}
}

static void
Expand Down Expand Up @@ -1057,16 +1075,13 @@ handle_sql(QueuedMessage *queued_message, bool tx_just_started)
* Handles messages comming from the queue.
*/
static void
handle_queued_message(HeapTuple msgtup, bool tx_just_started)
handle_queued_message(QueuedMessage *queued_message, bool tx_just_started)
{
QueuedMessage *queued_message;
const char *old_action_name;

old_action_name = errcallback_arg.action_name;
errcallback_arg.is_ddl_or_drop = true;

queued_message = queued_message_from_tuple(msgtup);

switch (queued_message->message_type)
{
case QUEUE_COMMAND_TYPE_SQL:
Expand Down
Loading