Skip to content

Commit

Permalink
[!] Add chain error handling, closes #575
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub committed Jun 2, 2023
1 parent 1a29e8d commit 162da88
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 38 deletions.
15 changes: 8 additions & 7 deletions internal/pgengine/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
// InvalidOid specifies value for non-existent objects
const InvalidOid = 0

// DeleteChainConfig delete chain configuration for self destructive chains
func (pge *PgEngine) DeleteChainConfig(ctx context.Context, chainID int) bool {
// DeleteChain delete chain configuration for self destructive chains
func (pge *PgEngine) DeleteChain(ctx context.Context, chainID int) bool {
pge.l.WithField("chain", chainID).Info("Deleting self destructive chain configuration")
res, err := pge.ConfigDb.Exec(ctx, "DELETE FROM timetable.chain WHERE chain_id = $1", chainID)
if err != nil {
Expand All @@ -27,8 +27,8 @@ func (pge *PgEngine) IsAlive() bool {
return pge.ConfigDb != nil && pge.ConfigDb.Ping(context.Background()) == nil
}

// LogChainElementExecution will log current chain element execution status including retcode
func (pge *PgEngine) LogChainElementExecution(ctx context.Context, task *ChainTask, retCode int, output string) {
// LogTaskExecution will log current chain element execution status including retcode
func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retCode int, output string) {
_, err := pge.ConfigDb.Exec(ctx, `INSERT INTO timetable.execution_log (
chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, client_name, txid)
VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10)`,
Expand Down Expand Up @@ -66,7 +66,7 @@ func (pge *PgEngine) RemoveChainRunStatus(ctx context.Context, chainID int) {

// Select live chains with proper client_name value
const sqlSelectLiveChains = `SELECT chain_id, chain_name, self_destruct, exclusive_execution,
COALESCE(max_instances, 16) as max_instances, COALESCE(timeout, 0) as timeout
COALESCE(max_instances, 16) as max_instances, COALESCE(timeout, 0) as timeout, on_error
FROM timetable.chain WHERE live AND (client_name = $1 or client_name IS NULL)`

// SelectRebootChains returns a list of chains should be executed after reboot
Expand Down Expand Up @@ -94,7 +94,7 @@ func (pge *PgEngine) SelectChains(ctx context.Context, dest *[]Chain) error {
// SelectIntervalChains returns list of interval chains to be executed
func (pge *PgEngine) SelectIntervalChains(ctx context.Context, dest *[]IntervalChain) error {
const sqlSelectIntervalChains = `SELECT chain_id, chain_name, self_destruct, exclusive_execution,
COALESCE(max_instances, 16), COALESCE(timeout, 0),
COALESCE(max_instances, 16), COALESCE(timeout, 0), on_error,
EXTRACT(EPOCH FROM (substr(run_at, 7) :: interval)) :: int4 as interval_seconds,
starts_with(run_at, '@after') as repeat_after
FROM timetable.chain WHERE live AND (client_name = $1 or client_name IS NULL) AND substr(run_at, 1, 6) IN ('@every', '@after')`
Expand All @@ -109,7 +109,8 @@ FROM timetable.chain WHERE live AND (client_name = $1 or client_name IS NULL) AN
// SelectChain returns the chain with the specified ID
func (pge *PgEngine) SelectChain(ctx context.Context, dest *Chain, chainID int) error {
// we accept not only live chains here because we want to run them in debug mode
const sqlSelectSingleChain = `SELECT chain_id, chain_name, self_destruct, exclusive_execution, COALESCE(timeout, 0) as timeout, COALESCE(max_instances, 16) as max_instances
const sqlSelectSingleChain = `SELECT chain_id, chain_name, self_destruct, exclusive_execution,
COALESCE(timeout, 0) as timeout, COALESCE(max_instances, 16) as max_instances, on_error
FROM timetable.chain WHERE (client_name = $1 OR client_name IS NULL) AND chain_id = $2`
rows, err := pge.ConfigDb.Query(ctx, sqlSelectSingleChain, pge.ClientName, chainID)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/pgengine/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ func TestDeleteChainConfig(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime+2)
defer cancel()
mockPool.ExpectExec("DELETE FROM timetable\\.chain").WithArgs(pgxmock.AnyArg()).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
assert.True(t, pge.DeleteChainConfig(ctx, 0))
assert.True(t, pge.DeleteChain(ctx, 0))
})

t.Run("Check DeleteChainConfig if sql fails", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime+2)
defer cancel()
mockPool.ExpectExec("DELETE FROM timetable\\.chain").WithArgs(pgxmock.AnyArg()).WillReturnError(errors.New("error"))
assert.False(t, pge.DeleteChainConfig(ctx, 0))
assert.False(t, pge.DeleteChain(ctx, 0))
})

assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestLogChainElementExecution(t *testing.T) {
pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(),
pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg()).
WillReturnError(errors.New("Failed to log chain element execution status"))
pge.LogChainElementExecution(context.Background(), &pgengine.ChainTask{}, 0, "STATUS")
pge.LogTaskExecution(context.Background(), &pgengine.ChainTask{}, 0, "STATUS")
})

assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
Expand Down
6 changes: 6 additions & 0 deletions internal/pgengine/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ var Migrations func() migrator.Option = func() migrator.Option {
return ExecuteMigrationScript(ctx, tx, "00573.sql")
},
},
&migrator.Migration{
Name: "00575 Add on_error handling",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00575.sql")
},
},
// adding new migration here, update "timetable"."migration" in "sql/init.sql"
// and "dbapi" variable in main.go!

Expand Down
4 changes: 2 additions & 2 deletions internal/pgengine/pgengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestSchedulerFunctions(t *testing.T) {
ctx := context.Background()

t.Run("Check DeleteChainConfig funсtion", func(t *testing.T) {
assert.Equal(t, false, pge.DeleteChainConfig(ctx, 0), "Should not delete in clean database")
assert.Equal(t, false, pge.DeleteChain(ctx, 0), "Should not delete in clean database")
})

t.Run("Check GetChainElements funсtion", func(t *testing.T) {
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestSchedulerFunctions(t *testing.T) {
t.Run("Check ExecuteSQLCommand function", func(t *testing.T) {
tx, txid, err := pge.StartTransaction(ctx, 0)
assert.NoError(t, err, "Should start transaction")
assert.Greater(t, txid, int64(0) , "Should return transaction id")
assert.Greater(t, txid, int64(0), "Should return transaction id")
f := func(sql string, params []string) error {
_, err := pge.ExecuteSQLCommand(ctx, tx, sql, params)
return err
Expand Down
3 changes: 2 additions & 1 deletion internal/pgengine/sql/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ CREATE TABLE timetable.chain (
live BOOLEAN DEFAULT FALSE,
self_destruct BOOLEAN DEFAULT FALSE,
exclusive_execution BOOLEAN DEFAULT FALSE,
client_name TEXT
client_name TEXT,
on_error TEXT
);

COMMENT ON TABLE timetable.chain IS
Expand Down
5 changes: 3 additions & 2 deletions internal/pgengine/sql/init.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CREATE SCHEMA timetable;

-- define migrations you need to apply
-- every change to this file should populate this table.
-- every change to the database schema should populate this table.
-- Version value should contain issue number zero padded followed by
-- short description of the issue\feature\bug implemented\resolved
CREATE TABLE timetable.migration(
Expand All @@ -24,4 +24,5 @@ VALUES
(8, '00436 Add txid column to timetable.execution_log'),
(9, '00534 Use cron_split_to_arrays() in cron domain check'),
(10, '00560 Alter txid column to bigint'),
(11, '00573 Add ability to start a chain with delay');
(11, '00573 Add ability to start a chain with delay'),
(12, '00575 Add on_error handling');
7 changes: 4 additions & 3 deletions internal/pgengine/sql/job_functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ CREATE OR REPLACE FUNCTION timetable.add_job(
job_live BOOLEAN DEFAULT TRUE,
job_self_destruct BOOLEAN DEFAULT FALSE,
job_ignore_errors BOOLEAN DEFAULT TRUE,
job_exclusive BOOLEAN DEFAULT FALSE
job_exclusive BOOLEAN DEFAULT FALSE,
job_on_error TEXT DEFAULT NULL
) RETURNS BIGINT AS $$
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution)
VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive)
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution, on_error)
VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive, job_on_error)
RETURNING chain_id
),
cte_task(v_task_id) AS (
Expand Down
2 changes: 2 additions & 0 deletions internal/pgengine/sql/migrations/00575.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE timetable.chain
ADD COLUMN on_error TEXT;
13 changes: 7 additions & 6 deletions internal/pgengine/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import (

// Chain structure used to represent tasks chains
type Chain struct {
ChainID int `db:"chain_id"`
ChainName string `db:"chain_name"`
SelfDestruct bool `db:"self_destruct"`
ExclusiveExecution bool `db:"exclusive_execution"`
MaxInstances int `db:"max_instances"`
Timeout int `db:"timeout"`
ChainID int `db:"chain_id"`
ChainName string `db:"chain_name"`
SelfDestruct bool `db:"self_destruct"`
ExclusiveExecution bool `db:"exclusive_execution"`
MaxInstances int `db:"max_instances"`
Timeout int `db:"timeout"`
OnErrorSQL pgtype.Text `db:"on_error"`
}

// IntervalChain structure used to represent repeated chains.
Expand Down
48 changes: 35 additions & 13 deletions internal/scheduler/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,31 +174,52 @@ func getTimeoutContext(ctx context.Context, t1 int, t2 int) (context.Context, co
return ctx, nil
}

func (sch *Scheduler) executeOnErrorHandler(ctx context.Context, chain Chain) {
if ctx.Err() != nil || !chain.OnErrorSQL.Valid {
return
}
l := sch.l.WithField("chain", chain.ChainID)
tx, txid, err := sch.pgengine.StartTransaction(ctx, chain.ChainID)
if err != nil {
l.WithError(err).Error("Cannot start error handling transaction")
return
}
l = l.WithField("txid", txid)
l.Info("Starting error handling")
ctx = log.WithLogger(ctx, l)
if _, err := tx.Exec(ctx, chain.OnErrorSQL.String); err != nil {
sch.pgengine.RollbackTransaction(ctx, tx)
l.Info("Error handler failed")
return
}
sch.pgengine.CommitTransaction(ctx, tx)
l.Info("Error handler executed successfully")
}

/* execute a chain of tasks */
func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
var ChainTasks []pgengine.ChainTask
var bctx context.Context
var cancel context.CancelFunc
var txid int64

ctx, cancel = getTimeoutContext(ctx, sch.Config().Resource.ChainTimeout, chain.Timeout)
chainCtx, cancel := getTimeoutContext(ctx, sch.Config().Resource.ChainTimeout, chain.Timeout)
if cancel != nil {
defer cancel()
}

chainL := sch.l.WithField("chain", chain.ChainID)

tx, txid, err := sch.pgengine.StartTransaction(ctx, chain.ChainID)
tx, txid, err := sch.pgengine.StartTransaction(chainCtx, chain.ChainID)
if err != nil {
chainL.WithError(err).Error("Cannot start transaction")
return
}
chainL = chainL.WithField("txid", txid)

err = sch.pgengine.GetChainElements(ctx, &ChainTasks, chain.ChainID)
err = sch.pgengine.GetChainElements(chainCtx, &ChainTasks, chain.ChainID)
if err != nil {
chainL.WithError(err).Error("Failed to retrieve chain elements")
sch.pgengine.RollbackTransaction(ctx, tx)
sch.pgengine.RollbackTransaction(chainCtx, tx)
return
}

Expand All @@ -208,30 +229,32 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
task.Txid = txid
l := chainL.WithField("task", task.TaskID)
l.Info("Starting task")
ctx = log.WithLogger(ctx, l)
retCode := sch.executeСhainElement(ctx, tx, &task)
taskCtx := log.WithLogger(chainCtx, l)
retCode := sch.executeСhainElement(taskCtx, tx, &task)

// we use background context here because current one (ctx) might be cancelled
bctx = log.WithLogger(context.Background(), l)
// we use background context here because current one (chainCtx) might be cancelled
bctx = log.WithLogger(ctx, l)
if retCode != 0 {
if !task.IgnoreError {
chainL.Error("Chain failed")
sch.pgengine.RemoveChainRunStatus(bctx, chain.ChainID)
sch.pgengine.RollbackTransaction(bctx, tx)
sch.executeOnErrorHandler(bctx, chain)
return
}
l.Info("Ignoring task failure")
}
}
bctx = log.WithLogger(context.Background(), chainL)
bctx = log.WithLogger(chainCtx, chainL)
sch.pgengine.CommitTransaction(bctx, tx)
chainL.Info("Chain executed successfully")
sch.pgengine.RemoveChainRunStatus(bctx, chain.ChainID)
if chain.SelfDestruct {
sch.pgengine.DeleteChainConfig(bctx, chain.ChainID)
sch.pgengine.DeleteChain(bctx, chain.ChainID)
}
}

/* execute a task */
func (sch *Scheduler) executeСhainElement(ctx context.Context, tx pgx.Tx, task *pgengine.ChainTask) int {
var (
paramValues []string
Expand All @@ -242,7 +265,6 @@ func (sch *Scheduler) executeСhainElement(ctx context.Context, tx pgx.Tx, task
)

l := log.GetLogger(ctx)

err = sch.pgengine.GetChainParamValues(ctx, &paramValues, task)
if err != nil {
l.WithError(err).Error("cannot fetch parameters values for chain: ", err)
Expand Down Expand Up @@ -278,6 +300,6 @@ func (sch *Scheduler) executeСhainElement(ctx context.Context, tx pgx.Tx, task
} else {
l.Info("Task executed successfully")
}
sch.pgengine.LogChainElementExecution(context.Background(), task, retCode, out)
sch.pgengine.LogTaskExecution(context.Background(), task, retCode, out)
return retCode
}
2 changes: 1 addition & 1 deletion internal/scheduler/interval_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (sch *Scheduler) isValid(ichain IntervalChain) bool {

func (sch *Scheduler) reschedule(ctx context.Context, ichain IntervalChain) {
if ichain.SelfDestruct {
sch.pgengine.DeleteChainConfig(ctx, ichain.ChainID)
sch.pgengine.DeleteChain(ctx, ichain.ChainID)
return
}
log.GetLogger(ctx).Debug("Sleeping before next execution of interval chain")
Expand Down
11 changes: 11 additions & 0 deletions samples/ErrorHandling.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SELECT timetable.add_job(
job_name => 'fail',
job_schedule => '* * * * *',
job_command => 'SELECT 42/0',
job_kind => 'SQL'::timetable.command_kind,
job_live => TRUE,
job_ignore_errors => FALSE,
job_on_error => $$SELECT pg_notify('monitoring',
format('{"ConfigID": %s, "Message": "Something bad happened"}',
current_setting('pg_timetable.current_chain_id')::bigint))$$
)

0 comments on commit 162da88

Please sign in to comment.