From 162da8811e6af918b3ad48f0e244fa110f4961e2 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Fri, 2 Jun 2023 18:13:30 +0200 Subject: [PATCH] [!] Add chain error handling, closes #575 --- internal/pgengine/access.go | 15 +++---- internal/pgengine/access_test.go | 6 +-- internal/pgengine/migration.go | 6 +++ internal/pgengine/pgengine_test.go | 4 +- internal/pgengine/sql/ddl.sql | 3 +- internal/pgengine/sql/init.sql | 5 ++- internal/pgengine/sql/job_functions.sql | 7 ++-- internal/pgengine/sql/migrations/00575.sql | 2 + internal/pgengine/transaction.go | 13 +++--- internal/scheduler/chain.go | 48 ++++++++++++++++------ internal/scheduler/interval_chain.go | 2 +- samples/ErrorHandling.sql | 11 +++++ 12 files changed, 84 insertions(+), 38 deletions(-) create mode 100644 internal/pgengine/sql/migrations/00575.sql create mode 100644 samples/ErrorHandling.sql diff --git a/internal/pgengine/access.go b/internal/pgengine/access.go index 1cb5c8ce..b9d5e8de 100644 --- a/internal/pgengine/access.go +++ b/internal/pgengine/access.go @@ -11,8 +11,8 @@ import ( // InvalidOid specifies value for non-existent objects const InvalidOid = 0 -// DeleteChainConfig delete chain configuration for self destructive chains -func (pge *PgEngine) DeleteChainConfig(ctx context.Context, chainID int) bool { +// DeleteChain delete chain configuration for self destructive chains +func (pge *PgEngine) DeleteChain(ctx context.Context, chainID int) bool { pge.l.WithField("chain", chainID).Info("Deleting self destructive chain configuration") res, err := pge.ConfigDb.Exec(ctx, "DELETE FROM timetable.chain WHERE chain_id = $1", chainID) if err != nil { @@ -27,8 +27,8 @@ func (pge *PgEngine) IsAlive() bool { return pge.ConfigDb != nil && pge.ConfigDb.Ping(context.Background()) == nil } -// LogChainElementExecution will log current chain element execution status including retcode -func (pge *PgEngine) LogChainElementExecution(ctx context.Context, task *ChainTask, retCode int, output string) { +// LogTaskExecution will log current chain element execution status including retcode +func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retCode int, output string) { _, err := pge.ConfigDb.Exec(ctx, `INSERT INTO timetable.execution_log ( chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, client_name, txid) VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10)`, @@ -66,7 +66,7 @@ func (pge *PgEngine) RemoveChainRunStatus(ctx context.Context, chainID int) { // Select live chains with proper client_name value const sqlSelectLiveChains = `SELECT chain_id, chain_name, self_destruct, exclusive_execution, -COALESCE(max_instances, 16) as max_instances, COALESCE(timeout, 0) as timeout +COALESCE(max_instances, 16) as max_instances, COALESCE(timeout, 0) as timeout, on_error FROM timetable.chain WHERE live AND (client_name = $1 or client_name IS NULL)` // SelectRebootChains returns a list of chains should be executed after reboot @@ -94,7 +94,7 @@ func (pge *PgEngine) SelectChains(ctx context.Context, dest *[]Chain) error { // SelectIntervalChains returns list of interval chains to be executed func (pge *PgEngine) SelectIntervalChains(ctx context.Context, dest *[]IntervalChain) error { const sqlSelectIntervalChains = `SELECT chain_id, chain_name, self_destruct, exclusive_execution, -COALESCE(max_instances, 16), COALESCE(timeout, 0), +COALESCE(max_instances, 16), COALESCE(timeout, 0), on_error, EXTRACT(EPOCH FROM (substr(run_at, 7) :: interval)) :: int4 as interval_seconds, starts_with(run_at, '@after') as repeat_after FROM timetable.chain WHERE live AND (client_name = $1 or client_name IS NULL) AND substr(run_at, 1, 6) IN ('@every', '@after')` @@ -109,7 +109,8 @@ FROM timetable.chain WHERE live AND (client_name = $1 or client_name IS NULL) AN // SelectChain returns the chain with the specified ID func (pge *PgEngine) SelectChain(ctx context.Context, dest *Chain, chainID int) error { // we accept not only live chains here because we want to run them in debug mode - const sqlSelectSingleChain = `SELECT chain_id, chain_name, self_destruct, exclusive_execution, COALESCE(timeout, 0) as timeout, COALESCE(max_instances, 16) as max_instances + const sqlSelectSingleChain = `SELECT chain_id, chain_name, self_destruct, exclusive_execution, +COALESCE(timeout, 0) as timeout, COALESCE(max_instances, 16) as max_instances, on_error FROM timetable.chain WHERE (client_name = $1 OR client_name IS NULL) AND chain_id = $2` rows, err := pge.ConfigDb.Query(ctx, sqlSelectSingleChain, pge.ClientName, chainID) if err != nil { diff --git a/internal/pgengine/access_test.go b/internal/pgengine/access_test.go index e076b0d5..804bab97 100644 --- a/internal/pgengine/access_test.go +++ b/internal/pgengine/access_test.go @@ -19,14 +19,14 @@ func TestDeleteChainConfig(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime+2) defer cancel() mockPool.ExpectExec("DELETE FROM timetable\\.chain").WithArgs(pgxmock.AnyArg()).WillReturnResult(pgxmock.NewResult("EXECUTE", 1)) - assert.True(t, pge.DeleteChainConfig(ctx, 0)) + assert.True(t, pge.DeleteChain(ctx, 0)) }) t.Run("Check DeleteChainConfig if sql fails", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime+2) defer cancel() mockPool.ExpectExec("DELETE FROM timetable\\.chain").WithArgs(pgxmock.AnyArg()).WillReturnError(errors.New("error")) - assert.False(t, pge.DeleteChainConfig(ctx, 0)) + assert.False(t, pge.DeleteChain(ctx, 0)) }) assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations") @@ -104,7 +104,7 @@ func TestLogChainElementExecution(t *testing.T) { pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg()). WillReturnError(errors.New("Failed to log chain element execution status")) - pge.LogChainElementExecution(context.Background(), &pgengine.ChainTask{}, 0, "STATUS") + pge.LogTaskExecution(context.Background(), &pgengine.ChainTask{}, 0, "STATUS") }) assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations") diff --git a/internal/pgengine/migration.go b/internal/pgengine/migration.go index f3785f4f..46f8e844 100644 --- a/internal/pgengine/migration.go +++ b/internal/pgengine/migration.go @@ -129,6 +129,12 @@ var Migrations func() migrator.Option = func() migrator.Option { return ExecuteMigrationScript(ctx, tx, "00573.sql") }, }, + &migrator.Migration{ + Name: "00575 Add on_error handling", + Func: func(ctx context.Context, tx pgx.Tx) error { + return ExecuteMigrationScript(ctx, tx, "00575.sql") + }, + }, // adding new migration here, update "timetable"."migration" in "sql/init.sql" // and "dbapi" variable in main.go! diff --git a/internal/pgengine/pgengine_test.go b/internal/pgengine/pgengine_test.go index d366510f..63e899f8 100644 --- a/internal/pgengine/pgengine_test.go +++ b/internal/pgengine/pgengine_test.go @@ -135,7 +135,7 @@ func TestSchedulerFunctions(t *testing.T) { ctx := context.Background() t.Run("Check DeleteChainConfig funсtion", func(t *testing.T) { - assert.Equal(t, false, pge.DeleteChainConfig(ctx, 0), "Should not delete in clean database") + assert.Equal(t, false, pge.DeleteChain(ctx, 0), "Should not delete in clean database") }) t.Run("Check GetChainElements funсtion", func(t *testing.T) { @@ -170,7 +170,7 @@ func TestSchedulerFunctions(t *testing.T) { t.Run("Check ExecuteSQLCommand function", func(t *testing.T) { tx, txid, err := pge.StartTransaction(ctx, 0) assert.NoError(t, err, "Should start transaction") - assert.Greater(t, txid, int64(0) , "Should return transaction id") + assert.Greater(t, txid, int64(0), "Should return transaction id") f := func(sql string, params []string) error { _, err := pge.ExecuteSQLCommand(ctx, tx, sql, params) return err diff --git a/internal/pgengine/sql/ddl.sql b/internal/pgengine/sql/ddl.sql index 06ad93f1..5ff288f0 100644 --- a/internal/pgengine/sql/ddl.sql +++ b/internal/pgengine/sql/ddl.sql @@ -7,7 +7,8 @@ CREATE TABLE timetable.chain ( live BOOLEAN DEFAULT FALSE, self_destruct BOOLEAN DEFAULT FALSE, exclusive_execution BOOLEAN DEFAULT FALSE, - client_name TEXT + client_name TEXT, + on_error TEXT ); COMMENT ON TABLE timetable.chain IS diff --git a/internal/pgengine/sql/init.sql b/internal/pgengine/sql/init.sql index 5b3d3ed4..3a6657f4 100644 --- a/internal/pgengine/sql/init.sql +++ b/internal/pgengine/sql/init.sql @@ -1,7 +1,7 @@ CREATE SCHEMA timetable; -- define migrations you need to apply --- every change to this file should populate this table. +-- every change to the database schema should populate this table. -- Version value should contain issue number zero padded followed by -- short description of the issue\feature\bug implemented\resolved CREATE TABLE timetable.migration( @@ -24,4 +24,5 @@ VALUES (8, '00436 Add txid column to timetable.execution_log'), (9, '00534 Use cron_split_to_arrays() in cron domain check'), (10, '00560 Alter txid column to bigint'), - (11, '00573 Add ability to start a chain with delay'); \ No newline at end of file + (11, '00573 Add ability to start a chain with delay'), + (12, '00575 Add on_error handling'); \ No newline at end of file diff --git a/internal/pgengine/sql/job_functions.sql b/internal/pgengine/sql/job_functions.sql index fe442761..b8fa821e 100644 --- a/internal/pgengine/sql/job_functions.sql +++ b/internal/pgengine/sql/job_functions.sql @@ -24,12 +24,13 @@ CREATE OR REPLACE FUNCTION timetable.add_job( job_live BOOLEAN DEFAULT TRUE, job_self_destruct BOOLEAN DEFAULT FALSE, job_ignore_errors BOOLEAN DEFAULT TRUE, - job_exclusive BOOLEAN DEFAULT FALSE + job_exclusive BOOLEAN DEFAULT FALSE, + job_on_error TEXT DEFAULT NULL ) RETURNS BIGINT AS $$ WITH cte_chain (v_chain_id) AS ( - INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution) - VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive) + INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution, on_error) + VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive, job_on_error) RETURNING chain_id ), cte_task(v_task_id) AS ( diff --git a/internal/pgengine/sql/migrations/00575.sql b/internal/pgengine/sql/migrations/00575.sql new file mode 100644 index 00000000..0f44b652 --- /dev/null +++ b/internal/pgengine/sql/migrations/00575.sql @@ -0,0 +1,2 @@ +ALTER TABLE timetable.chain + ADD COLUMN on_error TEXT; \ No newline at end of file diff --git a/internal/pgengine/transaction.go b/internal/pgengine/transaction.go index eb214cbb..08d6b7eb 100644 --- a/internal/pgengine/transaction.go +++ b/internal/pgengine/transaction.go @@ -17,12 +17,13 @@ import ( // Chain structure used to represent tasks chains type Chain struct { - ChainID int `db:"chain_id"` - ChainName string `db:"chain_name"` - SelfDestruct bool `db:"self_destruct"` - ExclusiveExecution bool `db:"exclusive_execution"` - MaxInstances int `db:"max_instances"` - Timeout int `db:"timeout"` + ChainID int `db:"chain_id"` + ChainName string `db:"chain_name"` + SelfDestruct bool `db:"self_destruct"` + ExclusiveExecution bool `db:"exclusive_execution"` + MaxInstances int `db:"max_instances"` + Timeout int `db:"timeout"` + OnErrorSQL pgtype.Text `db:"on_error"` } // IntervalChain structure used to represent repeated chains. diff --git a/internal/scheduler/chain.go b/internal/scheduler/chain.go index 83eb5a1b..1ebeb733 100644 --- a/internal/scheduler/chain.go +++ b/internal/scheduler/chain.go @@ -174,6 +174,28 @@ func getTimeoutContext(ctx context.Context, t1 int, t2 int) (context.Context, co return ctx, nil } +func (sch *Scheduler) executeOnErrorHandler(ctx context.Context, chain Chain) { + if ctx.Err() != nil || !chain.OnErrorSQL.Valid { + return + } + l := sch.l.WithField("chain", chain.ChainID) + tx, txid, err := sch.pgengine.StartTransaction(ctx, chain.ChainID) + if err != nil { + l.WithError(err).Error("Cannot start error handling transaction") + return + } + l = l.WithField("txid", txid) + l.Info("Starting error handling") + ctx = log.WithLogger(ctx, l) + if _, err := tx.Exec(ctx, chain.OnErrorSQL.String); err != nil { + sch.pgengine.RollbackTransaction(ctx, tx) + l.Info("Error handler failed") + return + } + sch.pgengine.CommitTransaction(ctx, tx) + l.Info("Error handler executed successfully") +} + /* execute a chain of tasks */ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) { var ChainTasks []pgengine.ChainTask @@ -181,24 +203,23 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) { var cancel context.CancelFunc var txid int64 - ctx, cancel = getTimeoutContext(ctx, sch.Config().Resource.ChainTimeout, chain.Timeout) + chainCtx, cancel := getTimeoutContext(ctx, sch.Config().Resource.ChainTimeout, chain.Timeout) if cancel != nil { defer cancel() } chainL := sch.l.WithField("chain", chain.ChainID) - - tx, txid, err := sch.pgengine.StartTransaction(ctx, chain.ChainID) + tx, txid, err := sch.pgengine.StartTransaction(chainCtx, chain.ChainID) if err != nil { chainL.WithError(err).Error("Cannot start transaction") return } chainL = chainL.WithField("txid", txid) - err = sch.pgengine.GetChainElements(ctx, &ChainTasks, chain.ChainID) + err = sch.pgengine.GetChainElements(chainCtx, &ChainTasks, chain.ChainID) if err != nil { chainL.WithError(err).Error("Failed to retrieve chain elements") - sch.pgengine.RollbackTransaction(ctx, tx) + sch.pgengine.RollbackTransaction(chainCtx, tx) return } @@ -208,30 +229,32 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) { task.Txid = txid l := chainL.WithField("task", task.TaskID) l.Info("Starting task") - ctx = log.WithLogger(ctx, l) - retCode := sch.executeСhainElement(ctx, tx, &task) + taskCtx := log.WithLogger(chainCtx, l) + retCode := sch.executeСhainElement(taskCtx, tx, &task) - // we use background context here because current one (ctx) might be cancelled - bctx = log.WithLogger(context.Background(), l) + // we use background context here because current one (chainCtx) might be cancelled + bctx = log.WithLogger(ctx, l) if retCode != 0 { if !task.IgnoreError { chainL.Error("Chain failed") sch.pgengine.RemoveChainRunStatus(bctx, chain.ChainID) sch.pgengine.RollbackTransaction(bctx, tx) + sch.executeOnErrorHandler(bctx, chain) return } l.Info("Ignoring task failure") } } - bctx = log.WithLogger(context.Background(), chainL) + bctx = log.WithLogger(chainCtx, chainL) sch.pgengine.CommitTransaction(bctx, tx) chainL.Info("Chain executed successfully") sch.pgengine.RemoveChainRunStatus(bctx, chain.ChainID) if chain.SelfDestruct { - sch.pgengine.DeleteChainConfig(bctx, chain.ChainID) + sch.pgengine.DeleteChain(bctx, chain.ChainID) } } +/* execute a task */ func (sch *Scheduler) executeСhainElement(ctx context.Context, tx pgx.Tx, task *pgengine.ChainTask) int { var ( paramValues []string @@ -242,7 +265,6 @@ func (sch *Scheduler) executeСhainElement(ctx context.Context, tx pgx.Tx, task ) l := log.GetLogger(ctx) - err = sch.pgengine.GetChainParamValues(ctx, ¶mValues, task) if err != nil { l.WithError(err).Error("cannot fetch parameters values for chain: ", err) @@ -278,6 +300,6 @@ func (sch *Scheduler) executeСhainElement(ctx context.Context, tx pgx.Tx, task } else { l.Info("Task executed successfully") } - sch.pgengine.LogChainElementExecution(context.Background(), task, retCode, out) + sch.pgengine.LogTaskExecution(context.Background(), task, retCode, out) return retCode } diff --git a/internal/scheduler/interval_chain.go b/internal/scheduler/interval_chain.go index ef3084d6..6f1ae842 100644 --- a/internal/scheduler/interval_chain.go +++ b/internal/scheduler/interval_chain.go @@ -28,7 +28,7 @@ func (sch *Scheduler) isValid(ichain IntervalChain) bool { func (sch *Scheduler) reschedule(ctx context.Context, ichain IntervalChain) { if ichain.SelfDestruct { - sch.pgengine.DeleteChainConfig(ctx, ichain.ChainID) + sch.pgengine.DeleteChain(ctx, ichain.ChainID) return } log.GetLogger(ctx).Debug("Sleeping before next execution of interval chain") diff --git a/samples/ErrorHandling.sql b/samples/ErrorHandling.sql new file mode 100644 index 00000000..fe23ca1b --- /dev/null +++ b/samples/ErrorHandling.sql @@ -0,0 +1,11 @@ +SELECT timetable.add_job( + job_name => 'fail', + job_schedule => '* * * * *', + job_command => 'SELECT 42/0', + job_kind => 'SQL'::timetable.command_kind, + job_live => TRUE, + job_ignore_errors => FALSE, + job_on_error => $$SELECT pg_notify('monitoring', + format('{"ConfigID": %s, "Message": "Something bad happened"}', + current_setting('pg_timetable.current_chain_id')::bigint))$$ + ) \ No newline at end of file