Skip to content

Commit

Permalink
Introduce fixed schedules for background jobs
Browse files Browse the repository at this point in the history
Currently, the next start of a scheduled background job is
calculated by adding the `schedule_interval` to its finish
time. This does not allow scheduling jobs to execute at fixed
times, as the next execution is "shifted" by the job duration.

This commit introduces the option to execute a job on a fixed
schedule instead. Users are expected to provide an initial_start
parameter on which subsequent job executions are aligned. The next
start is calculated by computing the next time_bucket of the finish
time with initial_start origin.
An `initial_start` parameter is added to the compression, retention,
reorder and continuous aggregate `add_policy` signatures. By passing
that upon policy creation users indicate the policy will execute on
a fixed schedule, or drifting schedule if `initial_start` is not
provided.
To allow users to pick a drifting schedule when registering a UDA,
an additional parameter `fixed_schedule` is added to `add_job`
to allow users to specify the old behavior by setting it to false.

Additionally, an optional TEXT parameter, `timezone`, is added to both
add_job and add_policy signatures, to address the 1-hour shift in
execution time caused by DST switches. As internally the next start of
a fixed schedule job is calculated using time_bucket, the timezone
parameter allows using timezone-aware buckets to calculate
the next start.
  • Loading branch information
konskov committed Oct 18, 2022
1 parent 8950abe commit 54ed0d5
Show file tree
Hide file tree
Showing 53 changed files with 3,816 additions and 292 deletions.
4 changes: 2 additions & 2 deletions .github/gh_matrix_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def build_debug_config(overrides):
"build_type": "Debug",
"pg_build_args": "--enable-debug --enable-cassert",
"tsdb_build_args": "-DCODECOVERAGE=ON -DWARNINGS_AS_ERRORS=ON -DREQUIRE_ALL_TESTS=ON",
"installcheck_args": "IGNORES='bgw_db_scheduler'",
"installcheck_args": "IGNORES='bgw_db_scheduler bgw_db_scheduler_fixed'",
"coverage": True,
"extra_packages": "clang-9 llvm-9 llvm-9-dev llvm-9-tools",
"llvm_config": "llvm-config-9",
Expand Down Expand Up @@ -105,7 +105,7 @@ def macos_config(overrides):
"tsdb_build_args": "-DASSERTIONS=ON -DREQUIRE_ALL_TESTS=ON -DOPENSSL_ROOT_DIR=/usr/local/opt/openssl",
"llvm_config": "/usr/local/opt/llvm/bin/llvm-config",
"coverage": False,
"installcheck_args": "IGNORES='bgw_db_scheduler bgw_launcher pg_dump remote_connection compressed_collation'",
"installcheck_args": "IGNORES='bgw_db_scheduler bgw_db_scheduler_fixed bgw_launcher pg_dump remote_connection compressed_collation'",
"extra_packages": "",
})
base_config.update(overrides)
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sanitizer-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
# test seems to fail due to a PostgreSQL bug where AbortStartTime in
# postmaster.c is not atomic but read/written across signal handlers
# and ServerLoop.
make -k -C build installcheck SKIPS='remote_txn' IGNORES='bgw_db_scheduler debug_notice' | tee installcheck.log
make -k -C build installcheck SKIPS='remote_txn' IGNORES='bgw_db_scheduler bgw_db_scheduler_fixed debug_notice' | tee installcheck.log
- name: Show regression diffs
if: always()
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/windows-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
build_type: ${{ fromJson(needs.config.outputs.build_type) }}
ignores: ["chunk_adaptive metadata"]
tsl_ignores: ["compression_algos dist_partial_agg remote_connection"]
tsl_skips: ["bgw_db_scheduler cagg_ddl_dist_ht data_fetcher dist_compression dist_move_chunk dist_remote_error remote_txn"]
tsl_skips: ["bgw_db_scheduler bgw_db_scheduler_fixed cagg_ddl_dist_ht data_fetcher dist_compression dist_move_chunk dist_remote_error remote_txn"]
pg_config: ["-cfsync=off -cstatement_timeout=60s"]
include:
- pg: 12
Expand Down
2 changes: 1 addition & 1 deletion scripts/test_sanitizers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,5 @@ echo "Testing"
# postmaster.c is not atomic but read/written across signal handlers
# and ServerLoop.
docker exec -i -u postgres -w /tsdb_build/timescaledb/build timescaledb-san /bin/bash <<EOF
make -k regresscheck regresscheck-t SKIPS='remote_txn' IGNORES='bgw_db_scheduler bgw_launcher cluster-11 continuous_aggs_ddl-11'
make -k regresscheck regresscheck-t SKIPS='remote_txn' IGNORES='bgw_db_scheduler bgw_db_scheduler_fixed bgw_launcher cluster-11 continuous_aggs_ddl-11'
EOF
7 changes: 5 additions & 2 deletions sql/job_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ CREATE OR REPLACE FUNCTION @extschema@.add_job(
config JSONB DEFAULT NULL,
initial_start TIMESTAMPTZ DEFAULT NULL,
scheduled BOOL DEFAULT true,
check_config REGPROC DEFAULT NULL
check_config REGPROC DEFAULT NULL,
fixed_schedule BOOL DEFAULT TRUE,
timezone TEXT DEFAULT NULL
) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_job_add' LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION @extschema@.delete_job(job_id INTEGER) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_job_delete' LANGUAGE C VOLATILE STRICT;
Expand All @@ -27,7 +29,8 @@ CREATE OR REPLACE FUNCTION @extschema@.alter_job(
if_exists BOOL = FALSE,
check_config REGPROC = NULL
)
RETURNS TABLE (job_id INTEGER, schedule_interval INTERVAL, max_runtime INTERVAL, max_retries INTEGER, retry_period INTERVAL, scheduled BOOL, config JSONB, next_start TIMESTAMPTZ, check_config TEXT)
RETURNS TABLE (job_id INTEGER, schedule_interval INTERVAL, max_runtime INTERVAL, max_retries INTEGER, retry_period INTERVAL, scheduled BOOL, config JSONB,
next_start TIMESTAMPTZ, check_config TEXT)
AS '@MODULE_PATHNAME@', 'ts_job_alter'
LANGUAGE C VOLATILE;

Expand Down
8 changes: 6 additions & 2 deletions sql/job_error_log_retention.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ INSERT INTO _timescaledb_config.bgw_job (
scheduled,
config,
check_schema,
check_name
check_name,
fixed_schedule,
initial_start
)
VALUES
(
Expand All @@ -67,5 +69,7 @@ VALUES
true,
'{"drop_after":"1 month"}',
'_timescaledb_internal',
'policy_job_error_retention_check'
'policy_job_error_retention_check',
true,
'2000-01-01 00:00:00+00'::timestamptz
) ON CONFLICT (id) DO NOTHING;
30 changes: 25 additions & 5 deletions sql/policy_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ CREATE OR REPLACE FUNCTION @extschema@.add_retention_policy(
relation REGCLASS,
drop_after "any",
if_not_exists BOOL = false,
schedule_interval INTERVAL = NULL
schedule_interval INTERVAL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
)
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
LANGUAGE C VOLATILE;
Expand All @@ -27,16 +29,28 @@ AS '@MODULE_PATHNAME@', 'ts_policy_retention_remove'
LANGUAGE C VOLATILE STRICT;

/* reorder policy */
CREATE OR REPLACE FUNCTION @extschema@.add_reorder_policy(hypertable REGCLASS, index_name NAME, if_not_exists BOOL = false) RETURNS INTEGER
CREATE OR REPLACE FUNCTION @extschema@.add_reorder_policy(
hypertable REGCLASS,
index_name NAME,
if_not_exists BOOL = false,
initial_start timestamptz = NULL,
timezone TEXT = NULL
) RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_add'
LANGUAGE C VOLATILE STRICT;
LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION @extschema@.remove_reorder_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_remove'
LANGUAGE C VOLATILE STRICT;

/* compression policy */
CREATE OR REPLACE FUNCTION @extschema@.add_compression_policy(hypertable REGCLASS, compress_after "any", if_not_exists BOOL = false, schedule_interval INTERVAL = NULL)
CREATE OR REPLACE FUNCTION @extschema@.add_compression_policy(
hypertable REGCLASS, compress_after "any",
if_not_exists BOOL = false,
schedule_interval INTERVAL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
LANGUAGE C VOLATILE; -- not strict because we need to set different default values for schedule_interval
Expand All @@ -46,7 +60,13 @@ AS '@MODULE_PATHNAME@', 'ts_policy_compression_remove'
LANGUAGE C VOLATILE STRICT;

/* continuous aggregates policy */
CREATE OR REPLACE FUNCTION @extschema@.add_continuous_aggregate_policy(continuous_aggregate REGCLASS, start_offset "any", end_offset "any", schedule_interval INTERVAL, if_not_exists BOOL = false)
CREATE OR REPLACE FUNCTION @extschema@.add_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add'
LANGUAGE C VOLATILE;
Expand Down
3 changes: 3 additions & 0 deletions sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,13 @@ CREATE TABLE _timescaledb_config.bgw_job (
proc_name name NOT NULL,
owner name NOT NULL DEFAULT CURRENT_ROLE,
scheduled bool NOT NULL DEFAULT TRUE,
fixed_schedule bool not null default true,
initial_start timestamptz,
hypertable_id integer,
config jsonb,
check_schema name,
check_name name,
timezone text,
-- table constraints
CONSTRAINT bgw_job_pkey PRIMARY KEY (id),
CONSTRAINT bgw_job_hypertable_id_fkey FOREIGN KEY (hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE
Expand Down
124 changes: 123 additions & 1 deletion sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

-- gapfill with timezone support
CREATE FUNCTION @extschema@.time_bucket_gapfill(bucket_width INTERVAL, ts TIMESTAMPTZ, timezone TEXT, start TIMESTAMPTZ=NULL, finish TIMESTAMPTZ=NULL) RETURNS TIMESTAMPTZ
AS '@MODULE_PATHNAME@', 'ts_gapfill_timestamptz_timezone_bucket' LANGUAGE C VOLATILE PARALLEL SAFE;
Expand Down Expand Up @@ -175,3 +174,126 @@ $BODY$
UNION ALL
SELECT * FROM _chunk_sizes) AS sizes;
$BODY$ SET search_path TO pg_catalog, pg_temp;

DROP FUNCTION IF EXISTS @extschema@.add_retention_policy(REGCLASS, "any", BOOL, INTERVAL);
DROP FUNCTION IF EXISTS @extschema@.add_compression_policy(REGCLASS, "any", BOOL, INTERVAL);
DROP FUNCTION IF EXISTS @extschema@.add_continuous_aggregate_policy(REGCLASS, "any", "any", INTERVAL, BOOL);
DROP FUNCTION IF EXISTS @extschema@.add_job(REGPROC, INTERVAL, JSONB, TIMESTAMPTZ, BOOL, REGPROC);
DROP FUNCTION IF EXISTS @extschema@.add_reorder_policy(REGCLASS, NAME, BOOL);

DROP VIEW IF EXISTS timescaledb_information.jobs;
DROP VIEW IF EXISTS timescaledb_information.job_stats;
DROP VIEW IF EXISTS timescaledb_experimental.policies;

-- rebuild _timescaledb_config.bgw_job
CREATE TABLE _timescaledb_config.bgw_job_tmp AS SELECT * FROM _timescaledb_config.bgw_job;
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_config.bgw_job;
ALTER EXTENSION timescaledb DROP SEQUENCE _timescaledb_config.bgw_job_id_seq;
ALTER TABLE _timescaledb_internal.bgw_job_stat DROP CONSTRAINT IF EXISTS bgw_job_stat_job_id_fkey;
ALTER TABLE _timescaledb_internal.bgw_policy_chunk_stats DROP CONSTRAINT IF EXISTS bgw_policy_chunk_stats_job_id_fkey;

CREATE TABLE _timescaledb_internal.tmp_bgw_job_seq_value AS SELECT last_value, is_called FROM _timescaledb_config.bgw_job_id_seq;

DROP TABLE _timescaledb_config.bgw_job;

CREATE SEQUENCE _timescaledb_config.bgw_job_id_seq MINVALUE 1000;
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_job_id_seq', '');
SELECT setval('_timescaledb_config.bgw_job_id_seq', last_value, is_called) FROM _timescaledb_internal.tmp_bgw_job_seq_value;
DROP TABLE _timescaledb_internal.tmp_bgw_job_seq_value;

-- new table as we want it
CREATE TABLE _timescaledb_config.bgw_job (
id INTEGER PRIMARY KEY DEFAULT nextval('_timescaledb_config.bgw_job_id_seq'),
application_name name NOT NULL,
schedule_interval interval NOT NULL,
max_runtime interval NOT NULL,
max_retries integer NOT NULL,
retry_period interval NOT NULL,
proc_schema name NOT NULL,
proc_name name NOT NULL,
owner name NOT NULL DEFAULT CURRENT_ROLE,
scheduled bool NOT NULL DEFAULT TRUE,
fixed_schedule bool not null default true,
initial_start timestamptz,
hypertable_id integer REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE,
config jsonb ,
check_schema NAME,
check_name NAME,
timezone TEXT
);

ALTER SEQUENCE _timescaledb_config.bgw_job_id_seq OWNED BY _timescaledb_config.bgw_job.id;
CREATE INDEX bgw_job_proc_hypertable_id_idx ON _timescaledb_config.bgw_job(proc_schema,proc_name,hypertable_id);

INSERT INTO _timescaledb_config.bgw_job(
id, application_name, schedule_interval,
max_runtime, max_retries, retry_period,
proc_schema, proc_name, owner, scheduled,
hypertable_id, config, check_schema, check_name,
fixed_schedule
)
SELECT id, application_name, schedule_interval, max_runtime, max_retries, retry_period,
proc_schema, proc_name, owner, scheduled, hypertable_id, config, check_schema, check_name, FALSE
FROM _timescaledb_config.bgw_job_tmp ORDER BY id;

DROP TABLE _timescaledb_config.bgw_job_tmp;
ALTER TABLE _timescaledb_internal.bgw_job_stat ADD CONSTRAINT bgw_job_stat_job_id_fkey FOREIGN KEY(job_id) REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE;
ALTER TABLE _timescaledb_internal.bgw_policy_chunk_stats ADD CONSTRAINT bgw_policy_chunk_stats_job_id_fkey FOREIGN KEY(job_id) REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_job', 'WHERE id >= 1000');
GRANT SELECT ON _timescaledb_config.bgw_job TO PUBLIC;
GRANT SELECT ON _timescaledb_config.bgw_job_id_seq TO PUBLIC;

-- do simple CREATE for the functions with modified signatures
CREATE FUNCTION @extschema@.add_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add'
LANGUAGE C VOLATILE;

CREATE FUNCTION @extschema@.add_compression_policy(
hypertable REGCLASS, compress_after "any",
if_not_exists BOOL = false,
schedule_interval INTERVAL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
LANGUAGE C VOLATILE;

CREATE FUNCTION @extschema@.add_retention_policy(
relation REGCLASS,
drop_after "any",
if_not_exists BOOL = false,
schedule_interval INTERVAL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
)
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
LANGUAGE C VOLATILE;

CREATE FUNCTION @extschema@.add_job(
proc REGPROC,
schedule_interval INTERVAL,
config JSONB DEFAULT NULL,
initial_start TIMESTAMPTZ DEFAULT NULL,
scheduled BOOL DEFAULT true,
check_config REGPROC DEFAULT NULL,
fixed_schedule BOOL DEFAULT TRUE,
timezone TEXT DEFAULT NULL
) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_job_add' LANGUAGE C VOLATILE;

CREATE FUNCTION @extschema@.add_reorder_policy(
hypertable REGCLASS,
index_name NAME,
if_not_exists BOOL = false,
initial_start timestamptz = NULL,
timezone TEXT = NULL
) RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_add'
LANGUAGE C VOLATILE;

0 comments on commit 54ed0d5

Please sign in to comment.