-
Notifications
You must be signed in to change notification settings - Fork 642
/
distributed_procedure.out
229 lines (197 loc) · 9.09 KB
/
distributed_procedure.out
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
SET citus.next_shard_id TO 20030000;
CREATE USER procedureuser;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
SELECT run_command_on_workers($$CREATE USER procedureuser;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE ROLE")
(localhost,57638,t,"CREATE ROLE")
(2 rows)
CREATE SCHEMA procedure_tests AUTHORIZATION procedureuser;
CREATE SCHEMA procedure_tests2 AUTHORIZATION procedureuser;
SET search_path TO procedure_tests;
SET citus.shard_count TO 4;
-- Create and distribute a simple function
CREATE OR REPLACE PROCEDURE raise_info(text)
LANGUAGE PLPGSQL AS $proc$
BEGIN
RAISE INFO 'information message %', $1;
END;
$proc$;
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('colocation_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | CALL
localhost | 57638 | t | CALL
(2 rows)
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
ALTER PROCEDURE raise_info(text) SECURITY INVOKER;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER PROCEDURE raise_info(text) SECURITY DEFINER;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
-- Test SET/RESET for alter procedure
ALTER PROCEDURE raise_info(text) SET client_min_messages TO warning;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER PROCEDURE raise_info(text) SET client_min_messages TO error;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER PROCEDURE raise_info(text) SET client_min_messages TO debug;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER PROCEDURE raise_info(text) RESET client_min_messages;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
-- rename function and make sure the new name can be used on the workers while the old name can't
ALTER PROCEDURE raise_info(text) RENAME TO raise_info2;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info2(text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
localhost | 57638 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
(2 rows)
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info2('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | CALL
localhost | 57638 | t | CALL
(2 rows)
ALTER PROCEDURE raise_info2(text) RENAME TO raise_info;
-- change the owner of the function and verify the owner has been changed on the workers
ALTER PROCEDURE raise_info(text) OWNER TO procedureuser;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'raise_info';
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"(procedureuser,procedure_tests,raise_info)")
(localhost,57638,t,"(procedureuser,procedure_tests,raise_info)")
(2 rows)
-- change the schema of the procedure and verify the old schema doesn't exist anymore while
-- the new schema has the function.
ALTER PROCEDURE raise_info(text) SET SCHEMA procedure_tests2;
SELECT public.verify_function_is_same_on_workers('procedure_tests2.raise_info(text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
localhost | 57638 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
(2 rows)
SELECT * FROM run_command_on_workers($$CALL procedure_tests2.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | CALL
localhost | 57638 | t | CALL
(2 rows)
ALTER PROCEDURE procedure_tests2.raise_info(text) SET SCHEMA procedure_tests;
DROP PROCEDURE raise_info(text);
-- call should fail as procedure should have been dropped
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
localhost | 57638 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
(2 rows)
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA procedure_tests CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA procedure_tests CASCADE;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP SCHEMA")
(localhost,57638,t,"DROP SCHEMA")
(2 rows)
DROP SCHEMA procedure_tests2 CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA procedure_tests2 CASCADE;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP SCHEMA")
(localhost,57638,t,"DROP SCHEMA")
(2 rows)
DROP USER procedureuser;
SELECT run_command_on_workers($$DROP USER procedureuser;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP ROLE")
(localhost,57638,t,"DROP ROLE")
(2 rows)
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)