From f5dd2c0979780da61d7ff4357b9dc8d3ee95064d Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 15 Sep 2022 12:02:44 +0200 Subject: [PATCH 1/7] Add archive procedure --- .../v0.5.13.add-archive-procedure.sql | 156 ++++++++++++++++++ .../v0.5.13.add-archive-procedure.yml | 25 +++ .../liquibase/v0.5.13.add-archive-tables.sql | 51 ++++++ .../liquibase/v0.5.13.add-archive-tables.yml | 25 +++ 4 files changed, 257 insertions(+) create mode 100644 src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql create mode 100644 src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.yml create mode 100644 src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-tables.sql create mode 100644 src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-tables.yml diff --git a/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql new file mode 100644 index 000000000..b210d44f3 --- /dev/null +++ b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql @@ -0,0 +1,156 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE PROCEDURE archive_dag_instances( + IN i_from_ts timestamp default to_timestamp('1970-01-01', 'yyyy-mm-dd'), + IN i_to_ts timestamp default now() - interval '6 months', + IN i_max_records int default 50000000, + IN i_chunk_size int default 100000 +) +AS $$ +------------------------------------------------------------------------------- +-- +-- Procedure: archive_dag_instances(4) +-- Copies dag_instances with a final status from started timestamp i_from_ts +-- to started timestamp i_to_ts to archive_dag_instance table. +-- Along with dag_instance, referenced job_instances and events are +-- copied to the archive_job_instance and archive_event tables, respectively. +-- The copied dag_instances, job_instances and events are then deleted. +-- +-- Archiving takes place in chunks of i_chunk_size. The transaction is +-- committed after each chunk completes. +-- The total number of archived dag_instances can be limited by i_max_records. +-- +-- Parameters: +-- i_from_ts - Lower bound for dag instance started timestamp to archive. +-- Default: 1970-01-01 +-- i_to_ts - Upper bound for dag instance started timestamp to archive. +-- Default: now minus 6 months +-- i_max_records - Maximum number of dag instances to archive. Default: 50M +-- i_chunk_size - Chunk size of transaction. Default: 100K +-- +------------------------------------------------------------------------------- +DECLARE + _min_id BIGINT; + _max_id BIGINT; + _current_max BIGINT; + _max_records_id BIGINT; +BEGIN + SELECT max(di.id), min(di.id) + INTO _max_id, _min_id + FROM dag_instance di + WHERE di.started >= i_from_ts AND di.started <= i_to_ts + AND di.status NOT IN ('Running', 'InQueue'); + + _max_records_id := LEAST(_max_id, _min_id + i_max_records); + RAISE NOTICE 'Going to archive dag instances from % to %, approx. %', _min_id, _max_records_id, _max_records_id - _min_id; + + FOR j IN _min_id.._max_records_id BY i_chunk_size LOOP + _current_max := LEAST(_max_records_id, j + i_chunk_size); + CALL archive_dag_instances_chunk(j, _current_max); + COMMIT; + END LOOP; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE archive_dag_instances_chunk( + IN i_min_id BIGINT, + IN i_max_id BIGINT +) +AS $$ +------------------------------------------------------------------------------- +-- +-- Procedure: archive_dag_instances_chunk(2) +-- Copies dag_instances with a final status from i_min_id to i_max_id to the +-- archive_dag_instance table. +-- Along with dag_instance, referenced job_instances and events are +-- archived to the archive_job_instance and archive_event tables, respectively. +-- This method should not be called directly. Instead, use archive_dag_instances +-- +-- Parameters: +-- i_min_id - Minimum dag instance id to archive +-- i_max_id - Maximum dag instance id to archive +-- +------------------------------------------------------------------------------- +DECLARE + _cnt INT; +BEGIN + RAISE NOTICE '============='; + RAISE NOTICE ' START BATCH'; + RAISE NOTICE '============='; + + CREATE TEMPORARY TABLE dag_instance_ids_to_archive AS + SELECT di.id + FROM dag_instance di + WHERE di.status NOT IN ('Running', 'InQueue') + AND di.id >= i_min_id + AND di.id <= i_max_id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Going to archive % dag instances from % to %', _cnt, i_min_id, i_max_id; + + INSERT INTO archive_dag_instance (status, workflow_id, id, started, finished, triggered_by) + SELECT di.status, di.workflow_id, di.id, di.started, di.finished, di.triggered_by + FROM dag_instance di + JOIN dag_instance_ids_to_archive diita ON di.id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id; + + INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id) + SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id + FROM job_instance ji + JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % job instances', _cnt; + + INSERT INTO archive_event (sensor_event_id, sensor_id, dag_instance_id, id, payload) + SELECT e.sensor_event_id, e.sensor_id, e.dag_instance_id, e.id, e.payload + FROM "event" e + JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % events', _cnt; + + RAISE NOTICE 'Going to delete dag instances'; + + DELETE FROM job_instance ji + USING dag_instance_ids_to_archive diita + WHERE ji.dag_instance_id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % job instances', _cnt; + + DELETE FROM "event" e + USING dag_instance_ids_to_archive diita + WHERE e.dag_instance_id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % events', _cnt; + + DELETE FROM dag_instance di + USING dag_instance_ids_to_archive diita + WHERE di.id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % dag instances', _cnt; + + DROP TABLE dag_instance_ids_to_archive; + + RAISE NOTICE '============='; + RAISE NOTICE ' END BATCH'; + RAISE NOTICE '============='; +END; +$$ LANGUAGE plpgsql; + + +CREATE INDEX IF NOT EXISTS event_dag_instance_idx ON "event" (dag_instance_id); diff --git a/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.yml b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.yml new file mode 100644 index 000000000..9d7268cdd --- /dev/null +++ b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.yml @@ -0,0 +1,25 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +databaseChangeLog: + - changeSet: + id: v0.5.13.add-archive-procedure + logicalFilePath: v0.5.13.add-archive-procedure + author: HyperdriveDevTeam@absa.africa + context: default + changes: + - sqlFile: + relativeToChangelogFile: true + path: v0.5.13.add-archive-procedure.sql diff --git a/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-tables.sql b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-tables.sql new file mode 100644 index 000000000..506413bb5 --- /dev/null +++ b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-tables.sql @@ -0,0 +1,51 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +create table if not exists archive_dag_instance +( + status varchar not null, + workflow_id bigint not null, + id bigint primary key, + started timestamp default now() not null, + finished timestamp, + triggered_by varchar default 'unknown'::character varying not null +); + +create table if not exists archive_job_instance +( + job_name varchar not null, + job_status varchar not null, + executor_job_id varchar, + created timestamp not null, + updated timestamp, + "order" integer not null, + dag_instance_id bigint not null + constraint archive_job_instance_archive_dag_instance_idx + references archive_dag_instance, + id bigint primary key, + application_id varchar, + step_id varchar +); + +create table if not exists archive_event +( + sensor_event_id varchar(70) not null unique, + sensor_id bigint not null, + dag_instance_id bigint + constraint archive_event_archive_dag_instance_fk + references archive_dag_instance, + id bigint primary key, + payload jsonb not null +); diff --git a/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-tables.yml b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-tables.yml new file mode 100644 index 000000000..89a6784b5 --- /dev/null +++ b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-tables.yml @@ -0,0 +1,25 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +databaseChangeLog: + - changeSet: + id: v0.5.13.add-archive-tables + logicalFilePath: v0.5.13.add-archive-tables + author: HyperdriveDevTeam@absa.africa + context: default + changes: + - sqlFile: + relativeToChangelogFile: true + path: v0.5.13.add-archive-tables.sql From bdb9ffda7e5496faac6e603edf9d10914f0a9485 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 15 Sep 2022 12:03:38 +0200 Subject: [PATCH 2/7] Update db changelog --- src/main/resources/db_scripts/liquibase/db.changelog.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/resources/db_scripts/liquibase/db.changelog.yml b/src/main/resources/db_scripts/liquibase/db.changelog.yml index f6ccb1e6e..1aa7be012 100644 --- a/src/main/resources/db_scripts/liquibase/db.changelog.yml +++ b/src/main/resources/db_scripts/liquibase/db.changelog.yml @@ -86,4 +86,10 @@ databaseChangeLog: - include: relativeToChangelogFile: true file: v0.5.11.additional-spark-config-map-to-array.yml - \ No newline at end of file + - include: + relativeToChangelogFile: true + file: v0.5.13.add-archive-tables.yml + - include: + relativeToChangelogFile: true + file: v0.5.13.add-archive-procedure.yml + From 75ed14a1582d1b298e471a7b2d7520b599fe04af Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 15 Sep 2022 12:06:16 +0200 Subject: [PATCH 3/7] Update db_script_latest --- .../resources/db_scripts/db_script_latest.sql | 178 ++++++++++++++++++ 1 file changed, 178 insertions(+) diff --git a/src/main/resources/db_scripts/db_script_latest.sql b/src/main/resources/db_scripts/db_script_latest.sql index 995193354..94609106a 100644 --- a/src/main/resources/db_scripts/db_script_latest.sql +++ b/src/main/resources/db_scripts/db_script_latest.sql @@ -130,6 +130,43 @@ create table "notification_rule_history" ( "notification_rule" JSONB NOT NULL ); +create table archive_dag_instance +( + status varchar not null, + workflow_id bigint not null, + id bigint primary key, + started timestamp default now() not null, + finished timestamp, + triggered_by varchar default 'unknown'::character varying not null +); + +create table archive_job_instance +( + job_name varchar not null, + job_status varchar not null, + executor_job_id varchar, + created timestamp not null, + updated timestamp, + "order" integer not null, + dag_instance_id bigint not null + constraint archive_job_instance_archive_dag_instance_idx + references archive_dag_instance, + id bigint primary key, + application_id varchar, + step_id varchar +); + +create table archive_event +( + sensor_event_id varchar(70) not null unique, + sensor_id bigint not null, + dag_instance_id bigint + constraint archive_event_archive_dag_instance_fk + references archive_dag_instance, + id bigint primary key, + payload jsonb not null +); + alter table "job_instance" add constraint "job_instance_dag_instance_fk" foreign key("dag_instance_id") @@ -188,3 +225,144 @@ CREATE INDEX job_instance_dag_instance_idx ON job_instance (dag_instance_id); CREATE INDEX dag_instance_workflow_id_idx ON dag_instance (workflow_id); CREATE INDEX dag_instance_started_idx ON dag_instance (started); CREATE INDEX workflow_scheduler_inst_id_idx ON workflow (scheduler_instance_id); +CREATE INDEX event_dag_instance_idx ON "event" (dag_instance_id); + + +CREATE OR REPLACE PROCEDURE archive_dag_instances( + IN i_from_ts timestamp default to_timestamp('1970-01-01', 'yyyy-mm-dd'), + IN i_to_ts timestamp default now() - interval '6 months', + IN i_max_records int default 50000000, + IN i_chunk_size int default 100000 +) +AS $$ +------------------------------------------------------------------------------- +-- +-- Procedure: archive_dag_instances(4) +-- Copies dag_instances with a final status from started timestamp i_from_ts +-- to started timestamp i_to_ts to archive_dag_instance table. +-- Along with dag_instance, referenced job_instances and events are +-- copied to the archive_job_instance and archive_event tables, respectively. +-- The copied dag_instances, job_instances and events are then deleted. +-- +-- Archiving takes place in chunks of i_chunk_size. The transaction is +-- committed after each chunk completes. +-- The total number of archived dag_instances can be limited by i_max_records. +-- +-- Parameters: +-- i_from_ts - Lower bound for dag instance started timestamp to archive. +-- Default: 1970-01-01 +-- i_to_ts - Upper bound for dag instance started timestamp to archive. +-- Default: now minus 6 months +-- i_max_records - Maximum number of dag instances to archive. Default: 50M +-- i_chunk_size - Chunk size of transaction. Default: 100K +-- +------------------------------------------------------------------------------- +DECLARE + _min_id BIGINT; + _max_id BIGINT; + _current_max BIGINT; + _max_records_id BIGINT; +BEGIN + SELECT max(di.id), min(di.id) + INTO _max_id, _min_id + FROM dag_instance di + WHERE di.started >= i_from_ts AND di.started <= i_to_ts + AND di.status NOT IN ('Running', 'InQueue'); + + _max_records_id := LEAST(_max_id, _min_id + i_max_records); + RAISE NOTICE 'Going to archive dag instances from % to %, approx. %', _min_id, _max_records_id, _max_records_id - _min_id; + + FOR j IN _min_id.._max_records_id BY i_chunk_size LOOP + _current_max := LEAST(_max_records_id, j + i_chunk_size); + CALL archive_dag_instances_chunk(j, _current_max); + COMMIT; + END LOOP; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE archive_dag_instances_chunk( + IN i_min_id BIGINT, + IN i_max_id BIGINT +) +AS $$ +------------------------------------------------------------------------------- +-- +-- Procedure: archive_dag_instances_chunk(2) +-- Copies dag_instances with a final status from i_min_id to i_max_id to the +-- archive_dag_instance table. +-- Along with dag_instance, referenced job_instances and events are +-- archived to the archive_job_instance and archive_event tables, respectively. +-- This method should not be called directly. Instead, use archive_dag_instances +-- +-- Parameters: +-- i_min_id - Minimum dag instance id to archive +-- i_max_id - Maximum dag instance id to archive +-- +------------------------------------------------------------------------------- +DECLARE + _cnt INT; +BEGIN + RAISE NOTICE '============='; + RAISE NOTICE ' START BATCH'; + RAISE NOTICE '============='; + + CREATE TEMPORARY TABLE dag_instance_ids_to_archive AS + SELECT di.id + FROM dag_instance di + WHERE di.status NOT IN ('Running', 'InQueue') + AND di.id >= i_min_id + AND di.id <= i_max_id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Going to archive % dag instances from % to %', _cnt, i_min_id, i_max_id; + + INSERT INTO archive_dag_instance (status, workflow_id, id, started, finished, triggered_by) + SELECT di.status, di.workflow_id, di.id, di.started, di.finished, di.triggered_by + FROM dag_instance di + JOIN dag_instance_ids_to_archive diita ON di.id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id; + + INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id) + SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id + FROM job_instance ji + JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % job instances', _cnt; + + INSERT INTO archive_event (sensor_event_id, sensor_id, dag_instance_id, id, payload) + SELECT e.sensor_event_id, e.sensor_id, e.dag_instance_id, e.id, e.payload + FROM "event" e + JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % events', _cnt; + + RAISE NOTICE 'Going to delete dag instances'; + + DELETE FROM job_instance ji + USING dag_instance_ids_to_archive diita + WHERE ji.dag_instance_id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % job instances', _cnt; + + DELETE FROM "event" e + USING dag_instance_ids_to_archive diita + WHERE e.dag_instance_id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % events', _cnt; + + DELETE FROM dag_instance di + USING dag_instance_ids_to_archive diita + WHERE di.id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % dag instances', _cnt; + + DROP TABLE dag_instance_ids_to_archive; + + RAISE NOTICE '============='; + RAISE NOTICE ' END BATCH'; + RAISE NOTICE '============='; +END; +$$ LANGUAGE plpgsql; From 0905529354cbb558963cbb9cf4aea33f2a13f95c Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 15 Sep 2022 12:58:38 +0200 Subject: [PATCH 4/7] Change default chunk size to 50K --- .../db_scripts/liquibase/v0.5.13.add-archive-procedure.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql index b210d44f3..cf06e18e0 100644 --- a/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql +++ b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql @@ -17,7 +17,7 @@ CREATE OR REPLACE PROCEDURE archive_dag_instances( IN i_from_ts timestamp default to_timestamp('1970-01-01', 'yyyy-mm-dd'), IN i_to_ts timestamp default now() - interval '6 months', IN i_max_records int default 50000000, - IN i_chunk_size int default 100000 + IN i_chunk_size int default 50000 ) AS $$ ------------------------------------------------------------------------------- @@ -38,8 +38,8 @@ AS $$ -- Default: 1970-01-01 -- i_to_ts - Upper bound for dag instance started timestamp to archive. -- Default: now minus 6 months --- i_max_records - Maximum number of dag instances to archive. Default: 50M --- i_chunk_size - Chunk size of transaction. Default: 100K +-- i_max_records - Maximum number of dag instances to archive. Default: 50M (unlimited) +-- i_chunk_size - Chunk size of transaction. Default: 50K -- ------------------------------------------------------------------------------- DECLARE From 9efec95a7622ee38caee33662df9dd8b11aeaaaf Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 15 Sep 2022 13:00:08 +0200 Subject: [PATCH 5/7] Fix formatting --- .../liquibase/v0.5.13.add-archive-procedure.sql | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql index cf06e18e0..b6221bb2f 100644 --- a/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql +++ b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.sql @@ -103,7 +103,7 @@ BEGIN INSERT INTO archive_dag_instance (status, workflow_id, id, started, finished, triggered_by) SELECT di.status, di.workflow_id, di.id, di.started, di.finished, di.triggered_by FROM dag_instance di - JOIN dag_instance_ids_to_archive diita ON di.id = diita.id + JOIN dag_instance_ids_to_archive diita ON di.id = diita.id ON CONFLICT (id) DO NOTHING; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id; @@ -111,7 +111,7 @@ BEGIN INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id) SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id FROM job_instance ji - JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id + JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id ON CONFLICT (id) DO NOTHING; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % job instances', _cnt; @@ -119,7 +119,7 @@ BEGIN INSERT INTO archive_event (sensor_event_id, sensor_id, dag_instance_id, id, payload) SELECT e.sensor_event_id, e.sensor_id, e.dag_instance_id, e.id, e.payload FROM "event" e - JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id + JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id ON CONFLICT (id) DO NOTHING; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % events', _cnt; @@ -127,13 +127,13 @@ BEGIN RAISE NOTICE 'Going to delete dag instances'; DELETE FROM job_instance ji - USING dag_instance_ids_to_archive diita + USING dag_instance_ids_to_archive diita WHERE ji.dag_instance_id = diita.id; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Deleted % job instances', _cnt; DELETE FROM "event" e - USING dag_instance_ids_to_archive diita + USING dag_instance_ids_to_archive diita WHERE e.dag_instance_id = diita.id; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Deleted % events', _cnt; From 33d203858192f9de2a274a92d647558095e4403a Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 15 Sep 2022 17:27:45 +0200 Subject: [PATCH 6/7] Copy latest changes from procedures script --- .../resources/db_scripts/db_script_latest.sql | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/resources/db_scripts/db_script_latest.sql b/src/main/resources/db_scripts/db_script_latest.sql index 94609106a..70d623390 100644 --- a/src/main/resources/db_scripts/db_script_latest.sql +++ b/src/main/resources/db_scripts/db_script_latest.sql @@ -227,12 +227,11 @@ CREATE INDEX dag_instance_started_idx ON dag_instance (started); CREATE INDEX workflow_scheduler_inst_id_idx ON workflow (scheduler_instance_id); CREATE INDEX event_dag_instance_idx ON "event" (dag_instance_id); - CREATE OR REPLACE PROCEDURE archive_dag_instances( IN i_from_ts timestamp default to_timestamp('1970-01-01', 'yyyy-mm-dd'), IN i_to_ts timestamp default now() - interval '6 months', IN i_max_records int default 50000000, - IN i_chunk_size int default 100000 + IN i_chunk_size int default 50000 ) AS $$ ------------------------------------------------------------------------------- @@ -253,8 +252,8 @@ AS $$ -- Default: 1970-01-01 -- i_to_ts - Upper bound for dag instance started timestamp to archive. -- Default: now minus 6 months --- i_max_records - Maximum number of dag instances to archive. Default: 50M --- i_chunk_size - Chunk size of transaction. Default: 100K +-- i_max_records - Maximum number of dag instances to archive. Default: 50M (unlimited) +-- i_chunk_size - Chunk size of transaction. Default: 50K -- ------------------------------------------------------------------------------- DECLARE @@ -318,7 +317,7 @@ BEGIN INSERT INTO archive_dag_instance (status, workflow_id, id, started, finished, triggered_by) SELECT di.status, di.workflow_id, di.id, di.started, di.finished, di.triggered_by FROM dag_instance di - JOIN dag_instance_ids_to_archive diita ON di.id = diita.id + JOIN dag_instance_ids_to_archive diita ON di.id = diita.id ON CONFLICT (id) DO NOTHING; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id; @@ -326,7 +325,7 @@ BEGIN INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id) SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id FROM job_instance ji - JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id + JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id ON CONFLICT (id) DO NOTHING; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % job instances', _cnt; @@ -334,7 +333,7 @@ BEGIN INSERT INTO archive_event (sensor_event_id, sensor_id, dag_instance_id, id, payload) SELECT e.sensor_event_id, e.sensor_id, e.dag_instance_id, e.id, e.payload FROM "event" e - JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id + JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id ON CONFLICT (id) DO NOTHING; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % events', _cnt; @@ -342,13 +341,13 @@ BEGIN RAISE NOTICE 'Going to delete dag instances'; DELETE FROM job_instance ji - USING dag_instance_ids_to_archive diita + USING dag_instance_ids_to_archive diita WHERE ji.dag_instance_id = diita.id; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Deleted % job instances', _cnt; DELETE FROM "event" e - USING dag_instance_ids_to_archive diita + USING dag_instance_ids_to_archive diita WHERE e.dag_instance_id = diita.id; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Deleted % events', _cnt; @@ -366,3 +365,4 @@ BEGIN RAISE NOTICE '============='; END; $$ LANGUAGE plpgsql; + From 4d1d97a9455d9e338b14cb1b9e8dccd122a7850d Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Fri, 16 Sep 2022 08:25:41 +0200 Subject: [PATCH 7/7] Fix liquibase error --- .../db_scripts/liquibase/v0.5.13.add-archive-procedure.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.yml b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.yml index 9d7268cdd..3a5521294 100644 --- a/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.yml +++ b/src/main/resources/db_scripts/liquibase/v0.5.13.add-archive-procedure.yml @@ -23,3 +23,4 @@ databaseChangeLog: - sqlFile: relativeToChangelogFile: true path: v0.5.13.add-archive-procedure.sql + splitStatements: false