Skip to content

Commit 2a80a1a

Browse files
Kacper KaweckiAndersson007
andauthored
Fixes ansible-collections#609 Publication with columns (ansible-collections#763)
* Add initial code for creating publication with specified columns * Add integration test * Refactor and fix publication with columns * Add more test for publication with columns * Code clean-up * Add changelog fragment * Ensure that test assertion for publication with columns are run on postgres 15 or newer * Update changelogs fragment Co-authored-by: Andrew Klychkov <aaklychkov@mail.ru> * Add missing check for postgresql version --------- Co-authored-by: Andrew Klychkov <aaklychkov@mail.ru>
1 parent ea9d620 commit 2a80a1a

File tree

3 files changed

+528
-58
lines changed

3 files changed

+528
-58
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
minor_changes:
2+
- postgresql_publication - add possibility of creating publication with column list (https://github.com/ansible-collections/community.postgresql/pull/763).

plugins/modules/postgresql_publication.py

Lines changed: 179 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@
2828
the publication state will be changed.
2929
aliases: [ login_db ]
3030
type: str
31+
columns:
32+
description:
33+
- List of tables and its columns to add to the publication.
34+
- If no columns are passed for table, it will be published as a whole.
35+
- Mutually exclusive with I(tables) and I(tables_in_schema).
36+
type: dict
37+
version_added: '3.8.0'
3138
tables:
3239
description:
3340
- List of tables to add to the publication.
@@ -36,7 +43,7 @@
3643
nothing will be changed.
3744
- If you need to add all tables to the publication with the same name,
3845
drop existent and create new without passing I(tables).
39-
- Mutually exclusive with I(tables_in_schema).
46+
- Mutually exclusive with I(tables_in_schema) and I(columns).
4047
type: list
4148
elements: str
4249
tables_in_schema:
@@ -45,7 +52,7 @@
4552
for all tables in those schemas.
4653
- If you want to remove all schemas, explicitly pass an empty list C([]).
4754
- Supported since PostgreSQL 15.
48-
- Mutually exclusive with I(tables).
55+
- Mutually exclusive with I(tables) and I(columns).
4956
type: list
5057
elements: str
5158
version_added: '3.5.0'
@@ -131,6 +138,15 @@
131138
- prices
132139
- vehicles
133140
141+
- name: Create publication "acme" publishing only prices table and id and named from vehicles tables
142+
community.postgresql.postgresql_publication:
143+
name: acme
144+
columns:
145+
prices:
146+
vehicles:
147+
- id
148+
- name
149+
134150
- name: Create a new publication "acme" for tables in schema "myschema"
135151
community.postgresql.postgresql_publication:
136152
db: test
@@ -210,19 +226,11 @@
210226
from ansible.module_utils.basic import AnsibleModule
211227
from ansible.module_utils.six import iteritems
212228
from ansible_collections.community.postgresql.plugins.module_utils.database import (
213-
check_input,
214-
pg_quote_identifier,
215-
)
229+
check_input, pg_quote_identifier)
216230
from ansible_collections.community.postgresql.plugins.module_utils.postgres import (
217-
connect_to_db,
218-
ensure_required_libs,
219-
exec_sql,
220-
get_conn_params,
221-
get_server_version,
222-
pg_cursor_args,
223-
postgres_common_argument_spec,
224-
set_comment,
225-
)
231+
connect_to_db, ensure_required_libs, exec_sql, get_conn_params,
232+
get_server_version, pg_cursor_args, postgres_common_argument_spec,
233+
set_comment)
226234

227235
SUPPORTED_PG_VERSION = 10000
228236

@@ -231,6 +239,22 @@
231239
# Module functions and classes #
232240
################################
233241

242+
def normalize_table_name(table):
243+
"""Add 'public.' to name of table where a schema identifier is absent
244+
and add quotes to each element.
245+
246+
Args:
247+
table (str): Table name.
248+
249+
Returns:
250+
str: Normalized table name.
251+
"""
252+
if '.' not in table:
253+
return pg_quote_identifier('public.%s' % table.strip(), 'table')
254+
else:
255+
return pg_quote_identifier(table.strip(), 'table')
256+
257+
234258
def transform_tables_representation(tbl_list):
235259
"""Add 'public.' to names of tables where a schema identifier is absent
236260
and add quotes to each element.
@@ -242,14 +266,47 @@ def transform_tables_representation(tbl_list):
242266
tbl_list (list): Changed list.
243267
"""
244268
for i, table in enumerate(tbl_list):
245-
if '.' not in table:
246-
tbl_list[i] = pg_quote_identifier('public.%s' % table.strip(), 'table')
247-
else:
248-
tbl_list[i] = pg_quote_identifier(table.strip(), 'table')
269+
tbl_list[i] = normalize_table_name(table)
249270

250271
return tbl_list
251272

252273

274+
def transform_columns_keys(columns):
275+
"""Add quotes to each element of the columns list.
276+
277+
Args:
278+
columns (dict): Dict with tables and columns.
279+
280+
Returns:
281+
columns (dict): Changed dict.
282+
"""
283+
revmap_columns = {}
284+
for table in columns:
285+
revmap_columns[normalize_table_name(table)] = set(c.strip() for c in columns[table]) if columns[table] else None
286+
287+
return revmap_columns
288+
289+
290+
def pg_quote_column_list(table, columns):
291+
"""Convert a list of columns to a string.
292+
293+
Args:
294+
table (str): Table name.
295+
columns (list): List of columns.
296+
297+
Returns:
298+
str: String with columns.
299+
"""
300+
table = normalize_table_name(table)
301+
302+
if not columns:
303+
return table
304+
305+
quoted_columns = [pg_quote_identifier(col, 'column') for col in columns]
306+
quoted_sql = "%s (%s)" % (table, ', '.join(quoted_columns))
307+
return quoted_sql
308+
309+
253310
class PgPublication():
254311
"""Class to work with PostgreSQL publication.
255312
@@ -279,6 +336,7 @@ def __init__(self, module, cursor, name, pg_srv_ver):
279336
'parameters': {},
280337
'owner': '',
281338
'schemas': [],
339+
'columns': {}
282340
}
283341
self.exists = self.check_pub()
284342

@@ -326,13 +384,18 @@ def check_pub(self):
326384
# FOR TABLES IN SCHEMA statement is supported since PostgreSQL 15
327385
if self.pg_srv_ver >= 150000:
328386
self.attrs['schemas'] = self.__get_schema_pub_info()
387+
column_info = self.__get_columns_pub_info()
388+
columns = {}
389+
for row in column_info:
390+
columns[normalize_table_name(row["schema_dot_table"])] = set(row['columns'])
391+
self.attrs['columns'] = columns
329392
else:
330393
self.attrs['alltables'] = True
331394

332395
# Publication exists:
333396
return True
334397

335-
def create(self, tables, tables_in_schema, params, owner, comment, check_mode=True):
398+
def create(self, tables, tables_in_schema, columns, params, owner, comment, check_mode=True):
336399
"""Create the publication.
337400
338401
Args:
@@ -353,7 +416,12 @@ def create(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr
353416

354417
query_fragments = ["CREATE PUBLICATION %s" % pg_quote_identifier(self.name, 'publication')]
355418

356-
if tables:
419+
if columns:
420+
table_strings = []
421+
for table in columns:
422+
table_strings.append(pg_quote_column_list(table, columns[table]))
423+
query_fragments.append("FOR TABLE %s" % ', '.join(table_strings))
424+
elif tables:
357425
query_fragments.append("FOR TABLE %s" % ', '.join(tables))
358426
elif tables_in_schema:
359427
tables_in_schema = [pg_quote_identifier(schema, 'schema') for schema in tables_in_schema]
@@ -383,7 +451,7 @@ def create(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr
383451

384452
return changed
385453

386-
def update(self, tables, tables_in_schema, params, owner, comment, check_mode=True):
454+
def update(self, tables, tables_in_schema, columns, params, owner, comment, check_mode=True):
387455
"""Update the publication.
388456
389457
Args:
@@ -403,6 +471,34 @@ def update(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr
403471
changed = False
404472

405473
# Add or drop tables from published tables suit:
474+
if columns and not self.attrs['alltables']:
475+
need_set_columns = False
476+
for table in columns:
477+
if table not in self.attrs['tables']:
478+
continue
479+
elif not columns[table]:
480+
all_columns = self.__get_table_columns(table)
481+
if all_columns != self.attrs['columns'][table]:
482+
need_set_columns = True
483+
break
484+
elif self.attrs['columns'][table] != columns[table]:
485+
need_set_columns = True
486+
break
487+
488+
if need_set_columns:
489+
changed = self.__pub_set_columns(columns, check_mode=check_mode)
490+
else:
491+
# Add new tables to the publication:
492+
for table in columns:
493+
if table not in self.attrs['tables']:
494+
changed = self.__pub_add_columns(table, columns[table], check_mode=check_mode)
495+
496+
# Drop redundant tables from the publication:
497+
for table in self.attrs['columns']:
498+
if table not in columns.keys():
499+
changed = self.__pub_drop_table(table, check_mode=check_mode)
500+
elif columns and self.attrs['alltables']:
501+
changed = self.__pub_set_columns(columns, check_mode=check_mode)
406502
if tables and not self.attrs['alltables']:
407503

408504
# 1. If needs to add table to the publication:
@@ -465,9 +561,8 @@ def update(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr
465561
changed = self.__pub_set_param(key, val, check_mode=check_mode)
466562

467563
# Update pub owner:
468-
if owner:
469-
if owner != self.attrs['owner']:
470-
changed = self.__pub_set_owner(owner, check_mode=check_mode)
564+
if owner and owner != self.attrs['owner']:
565+
changed = self.__pub_set_owner(owner, check_mode=check_mode)
471566

472567
if comment is not None and comment != self.attrs['comment']:
473568
changed = set_comment(self.cursor, comment, 'publication',
@@ -537,6 +632,16 @@ def __get_tables_pub_info(self):
537632
"FROM pg_publication_tables WHERE pubname = %(pname)s")
538633
return exec_sql(self, query, query_params={'pname': self.name}, add_to_executed=False)
539634

635+
def __get_columns_pub_info(self):
636+
"""Get and return columns that are published by the publication.
637+
638+
Returns:
639+
List of dicts with published columns.
640+
"""
641+
query = ("SELECT schemaname || '.' || tablename as schema_dot_table, attnames as columns "
642+
"FROM pg_publication_tables WHERE pubname = %(pname)s")
643+
return exec_sql(self, query, query_params={'pname': self.name}, add_to_executed=False)
644+
540645
def __get_schema_pub_info(self):
541646
"""Get and return schemas added to the publication.
542647
@@ -555,6 +660,17 @@ def __get_schema_pub_info(self):
555660
list_of_schemas.extend(d.values())
556661
return list_of_schemas
557662

663+
def __get_table_columns(self, table):
664+
"""Get and return columns names of the table.
665+
666+
Returns:
667+
Set of columns.
668+
"""
669+
query = ("SELECT attname as column_name FROM pg_attribute "
670+
"WHERE attrelid = %(table)s::regclass and attnum > 0 AND NOT attisdropped;")
671+
result = exec_sql(self, query, query_params={'table': table}, add_to_executed=False)
672+
return set([row['column_name'] for row in result])
673+
558674
def __pub_add_table(self, table, check_mode=False):
559675
"""Add a table to the publication.
560676
@@ -607,6 +723,35 @@ def __pub_set_tables(self, tables, check_mode=False):
607723
', '.join(quoted_tables)))
608724
return self.__exec_sql(query, check_mode=check_mode)
609725

726+
def __pub_add_columns(self, table, columns, check_mode=False):
727+
""" Add table with specific columns to the publication.
728+
Args:
729+
table (str): Table name.
730+
columns (list): List of columns.
731+
Kwargs:
732+
check_mode (bool): If True, don't actually change anything,
733+
just make SQL, add it to ``self.executed_queries`` and return True.
734+
Returns:
735+
True if successful, False otherwise.
736+
"""
737+
query = ("ALTER PUBLICATION %s ADD TABLE %s" % (pg_quote_identifier(self.name, 'publication'),
738+
pg_quote_column_list(table, columns)))
739+
return self.__exec_sql(query, check_mode=check_mode)
740+
741+
def __pub_set_columns(self, columns_map, check_mode=False):
742+
"""Set columns that need to be published by the publication.
743+
Args:
744+
columns_map (dict): Dictionary of all tables and list of columns.
745+
Kwargs:
746+
check_mode (bool): If True, don't actually change anything,
747+
just make SQL, add it to ``self.executed_queries`` and return True.
748+
Returns:
749+
True if successful, False otherwise.
750+
"""
751+
table_list = [pg_quote_column_list(table, columns_map[table]) for table in columns_map]
752+
query = ("ALTER PUBLICATION %s SET TABLE %s" % (pg_quote_identifier(self.name, 'publication'), ', '.join(table_list)))
753+
return self.__exec_sql(query, check_mode=check_mode)
754+
610755
def __pub_add_schema(self, schema, check_mode=False):
611756
"""Add a schema to the publication.
612757
@@ -720,11 +865,12 @@ def main():
720865
trust_input=dict(type='bool', default=True),
721866
comment=dict(type='str', default=None),
722867
tables_in_schema=dict(type='list', elements='str', default=None),
868+
columns=dict(type='dict', default=None),
723869
)
724870
module = AnsibleModule(
725871
argument_spec=argument_spec,
726872
supports_check_mode=True,
727-
mutually_exclusive=[('tables', 'tables_in_schema')],
873+
mutually_exclusive=[('tables', 'tables_in_schema', "columns")],
728874
)
729875

730876
# Parameters handling:
@@ -738,6 +884,7 @@ def main():
738884
trust_input = module.params['trust_input']
739885
comment = module.params['comment']
740886
tables_in_schema = module.params['tables_in_schema']
887+
columns = module.params['columns']
741888

742889
if not trust_input:
743890
# Check input for potentially dangerous elements:
@@ -775,6 +922,8 @@ def main():
775922

776923
if tables_in_schema is not None and pg_srv_ver < 150000:
777924
module.fail_json(msg="Publication of tables in schema is supported by PostgreSQL 15 or greater")
925+
if columns and pg_srv_ver < 150000:
926+
module.fail_json(msg="Publication of columns is supported by PostgreSQL 15 or greater")
778927

779928
# Nothing was changed by default:
780929
changed = False
@@ -786,14 +935,17 @@ def main():
786935
if tables:
787936
tables = transform_tables_representation(tables)
788937

938+
if columns:
939+
columns = transform_columns_keys(columns)
940+
789941
# If module.check_mode=True, nothing will be changed:
790942
if state == 'present':
791943
if not publication.exists:
792-
changed = publication.create(tables, tables_in_schema, params, owner, comment,
944+
changed = publication.create(tables, tables_in_schema, columns, params, owner, comment,
793945
check_mode=module.check_mode)
794946

795947
else:
796-
changed = publication.update(tables, tables_in_schema, params, owner, comment,
948+
changed = publication.update(tables, tables_in_schema, columns, params, owner, comment,
797949
check_mode=module.check_mode)
798950

799951
elif state == 'absent':

0 commit comments

Comments
 (0)