Skip to content

Commit

Permalink
Merge pull request #382 from cybertec-postgresql/381_active_chain_han…
Browse files Browse the repository at this point in the history
…dling

Rewrite active chain handling
  • Loading branch information
pashagolub committed Feb 1, 2022
2 parents fe2f499 + 81b1f3b commit 7b385d7
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 150 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Set up Golang
uses: actions/setup-go@v2
with:
go-version: '1.16'
go-version: '1.17'

# despite the fact docker will build binary internally
# we want to stop workflow in case of any error before pushing to registry
Expand Down
50 changes: 13 additions & 37 deletions internal/pgengine/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,11 @@ import (
"os"

"github.com/georgysavva/scany/pgxscan"
"github.com/jackc/pgx/v4"
)

// InvalidOid specifies value for non-existent objects
const InvalidOid = 0

/*FixSchedulerCrash make sure that task chains which are not complete due to a scheduler crash are "fixed"
and marked as stopped at a certain point */
func (pge *PgEngine) FixSchedulerCrash(ctx context.Context) {
_, err := pge.ConfigDb.Exec(ctx, `SELECT timetable.health_check($1)`, pge.ClientName)
if err != nil {
pge.l.WithError(err).Error("Failed to perform health check")
}
}

// DeleteChainConfig delete chain configuration for self destructive chains
func (pge *PgEngine) DeleteChainConfig(ctx context.Context, chainID int) bool {
pge.l.WithField("chain", chainID).Info("Deleting self destructive chain configuration")
Expand Down Expand Up @@ -51,40 +41,26 @@ VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $
}

// InsertChainRunStatus inits the execution run log, which will be use to effectively control scheduler concurrency
func (pge *PgEngine) InsertChainRunStatus(ctx context.Context, chainID int) int {
const sqlInsertRunStatus = `INSERT INTO timetable.run_status
(execution_status, chain_id, client_name)
SELECT 'CHAIN_STARTED', c.chain_id, $2
FROM timetable.chain c
WHERE
c.chain_id = $1 AND
func (pge *PgEngine) InsertChainRunStatus(ctx context.Context, chainID int, maxInstances int) bool {
const sqlInsertRunStatus = `INSERT INTO timetable.active_chain (chain_id, client_name)
SELECT $1, $2 WHERE
(
SELECT COALESCE(count(*) < c.max_instances, TRUE)
FROM timetable.get_chain_running_statuses(c.chain_id)
)
RETURNING run_status_id;`
id := -1
err := pge.ConfigDb.QueryRow(ctx, sqlInsertRunStatus, chainID, pge.ClientName).Scan(&id)
SELECT COALESCE(count(*) < $3, TRUE)
FROM timetable.active_chain ac WHERE ac.chain_id = $1
)`
res, err := pge.ConfigDb.Exec(ctx, sqlInsertRunStatus, chainID, pge.ClientName, maxInstances)
if err != nil {
if err == pgx.ErrNoRows {
return -1
}
pge.l.WithError(err).Error("Cannot save information about the chain run status")
return false
}
return id
return res.RowsAffected() == 1
}

// AddChainRunStatus inserts status information about running chain elements
func (pge *PgEngine) AddChainRunStatus(ctx context.Context, task *ChainTask, runStatusID int, status string) {
const sqlInsertFinishStatus = `INSERT INTO timetable.run_status
(task_id, execution_status, start_status_id, chain_id, client_name)
VALUES
($1, $2, $3, $4, $5)`
var err error
_, err = pge.ConfigDb.Exec(ctx, sqlInsertFinishStatus,
task.TaskID, status, runStatusID, task.ChainID, pge.ClientName)
func (pge *PgEngine) RemoveChainRunStatus(ctx context.Context, chainID int) {
const sqlRemoveRunStatus = `DELETE FROM timetable.active_chain WHERE chain_id = $1 and client_name = $2`
_, err := pge.ConfigDb.Exec(ctx, sqlRemoveRunStatus, chainID, pge.ClientName)
if err != nil {
pge.l.WithError(err).Error("Update chain status failed")
pge.l.WithError(err).Error("Cannot save information about the chain run status")
}
}

Expand Down
39 changes: 11 additions & 28 deletions internal/pgengine/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,47 +33,30 @@ func TestDeleteChainConfig(t *testing.T) {
assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
}

func TestFixSchedulerCrash(t *testing.T) {
initmockdb(t)
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")
defer mockPool.Close()

t.Run("Check FixSchedulerCrash if sql fails", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime*time.Second+2)
defer cancel()
mockPool.ExpectExec(`SELECT timetable\.health_check`).WillReturnError(errors.New("error"))
pge.FixSchedulerCrash(ctx)
})

assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
}

func TestInsertChainRunStatus(t *testing.T) {
initmockdb(t)
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")
pge.ClientName = "test_client"
defer mockPool.Close()

t.Run("Check InsertChainRunStatus if sql fails", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime*time.Second+2)
defer cancel()
mockPool.ExpectQuery("INSERT INTO timetable\\.run_status").WillReturnError(errors.New("error"))
pge.InsertChainRunStatus(ctx, 0)
})
mockPool.ExpectExec("INSERT INTO timetable\\.active_chain").
WithArgs(0, pge.ClientName, 1).
WillReturnError(errors.New("error"))
pge.InsertChainRunStatus(context.Background(), 0, 1)

assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
}

func TestUpdateChainRunStatus(t *testing.T) {
func TestRemoveChainRunStatus(t *testing.T) {
initmockdb(t)
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")
pge.ClientName = "test_client"
defer mockPool.Close()

t.Run("Check UpdateChainRunStatus if sql fails", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime*time.Second+2)
defer cancel()
mockPool.ExpectExec("INSERT INTO timetable\\.run_status").WillReturnError(errors.New("error"))
pge.AddChainRunStatus(ctx, &pgengine.ChainTask{}, 0, "STATUS")
})
mockPool.ExpectExec("DELETE FROM timetable\\.active_chain").
WithArgs(0, pge.ClientName).
WillReturnError(errors.New("error"))
pge.RemoveChainRunStatus(context.Background(), 0)

assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
}
Expand Down
9 changes: 5 additions & 4 deletions internal/pgengine/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,18 @@ func (pge *PgEngine) ExecuteSchemaScripts(ctx context.Context) error {
// Finalize closes session
func (pge *PgEngine) Finalize() {
pge.l.Info("Closing session")
_, err := pge.ConfigDb.Exec(context.Background(), "DELETE FROM timetable.active_session WHERE client_pid = $1 AND client_name = $2", os.Getpid(), pge.ClientName)
sql := `WITH del_ch AS (DELETE FROM timetable.active_chain WHERE client_name = $1)
DELETE FROM timetable.active_session WHERE client_name = $1`
_, err := pge.ConfigDb.Exec(context.Background(), sql, pge.ClientName)
if err != nil {
pge.l.WithError(err).Error("Cannot finalize database session")
}
pge.ConfigDb.Close()
pge.ConfigDb = nil
}

//ReconnectAndFixLeftovers keeps trying reconnecting every `waitTime` seconds till connection established
func (pge *PgEngine) ReconnectAndFixLeftovers(ctx context.Context) bool {
//Reconnect keeps trying reconnecting every `waitTime` seconds till connection established
func (pge *PgEngine) Reconnect(ctx context.Context) bool {
for pge.ConfigDb.Ping(ctx) != nil {
pge.l.Info("Connection to the server was lost. Waiting for ", WaitTime, " sec...")
select {
Expand All @@ -273,6 +275,5 @@ func (pge *PgEngine) ReconnectAndFixLeftovers(ctx context.Context) bool {
}
}
pge.l.Info("Connection reestablished...")
pge.FixSchedulerCrash(ctx)
return true
}
5 changes: 2 additions & 3 deletions internal/pgengine/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,15 @@ func TestReconnectAndFixLeftovers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
mockPool.ExpectPing()
mockPool.ExpectExec(`SELECT timetable\.health_check`).WillReturnResult(pgxmock.NewResult("EXECUTE", 0))
assert.True(t, mockpge.ReconnectAndFixLeftovers(ctx))
assert.True(t, mockpge.Reconnect(ctx))
})

t.Run("Check ReconnectAndFixLeftovers if error returned", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), (pgengine.WaitTime+2)*time.Second)
defer cancel()
mockPool.ExpectPing().WillReturnError(errors.New("expected"))
mockPool.ExpectPing().WillDelayFor(pgengine.WaitTime * time.Second * 2)
assert.False(t, mockpge.ReconnectAndFixLeftovers(ctx))
assert.False(t, mockpge.Reconnect(ctx))
})
assert.NoError(t, mockPool.ExpectationsWereMet())
}
Expand Down
6 changes: 6 additions & 0 deletions internal/pgengine/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ var Migrations func() migrator.Option = func() migrator.Option {
return ExecuteMigrationScript(ctx, tx, "00334.sql")
},
},
&migrator.Migration{
Name: "00381 Rewrite active chain handling",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00381.sql")
},
},
// adding new migration here, update "timetable"."migration" in "sql/ddl.sql"!

// &migrator.Migration{
Expand Down
16 changes: 5 additions & 11 deletions internal/pgengine/pgengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestInitAndTestConfigDBConnection(t *testing.T) {

t.Run("Check timetable tables", func(t *testing.T) {
var oid int
tableNames := []string{"task", "chain", "parameter", "log", "execution_log", "run_status"}
tableNames := []string{"task", "chain", "parameter", "log", "execution_log", "active_session", "active_chain"}
for _, tableName := range tableNames {
err := pge.ConfigDb.QueryRow(ctx, fmt.Sprintf("SELECT COALESCE(to_regclass('timetable.%s'), 0) :: int", tableName)).Scan(&oid)
assert.NoError(t, err, fmt.Sprintf("Query for %s existence failed", tableName))
Expand All @@ -80,8 +80,6 @@ func TestInitAndTestConfigDBConnection(t *testing.T) {
var oid int
funcNames := []string{"_validate_json_schema_type(text, jsonb)",
"validate_json_schema(jsonb, jsonb, jsonb)",
"get_chain_running_statuses(bigint)",
"health_check(TEXT)",
"add_task(timetable.command_kind, TEXT, BIGINT, DOUBLE PRECISION)",
"add_job(TEXT, timetable.cron, TEXT, JSONB, timetable.command_kind, TEXT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN, BOOLEAN)",
"is_cron_in_time(timetable.cron, timestamptz)"}
Expand Down Expand Up @@ -121,7 +119,7 @@ func TestInitAndTestConfigDBConnection(t *testing.T) {
})

t.Run("Check Reconnecting Database", func(t *testing.T) {
assert.Equal(t, true, pge.ReconnectAndFixLeftovers(ctx),
assert.Equal(t, true, pge.Reconnect(ctx),
"Should succeed for reconnect")
})
}
Expand All @@ -140,10 +138,6 @@ func TestSchedulerFunctions(t *testing.T) {

ctx := context.Background()

t.Run("Check FixSchedulerCrash function", func(t *testing.T) {
assert.NotPanics(t, func() { pge.FixSchedulerCrash(ctx) }, "Fix scheduler crash failed")
})

t.Run("Check DeleteChainConfig funсtion", func(t *testing.T) {
assert.Equal(t, false, pge.DeleteChainConfig(ctx, 0), "Should not delete in clean database")
})
Expand All @@ -169,10 +163,10 @@ func TestSchedulerFunctions(t *testing.T) {
})

t.Run("Check InsertChainRunStatus funсtion", func(t *testing.T) {
var id int
assert.NotPanics(t, func() { id = pge.InsertChainRunStatus(ctx, 0) },
var res bool
assert.NotPanics(t, func() { res = pge.InsertChainRunStatus(ctx, 0, 1) },
"Should no error in clean database")
assert.NotZero(t, id, "Run status id should be greater then 0")
assert.True(t, res, "Active chain should be inserted")
})

t.Run("Check ExecuteSQLCommand function", func(t *testing.T) {
Expand Down
33 changes: 19 additions & 14 deletions internal/pgengine/sql/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ VALUES
(1, '00305 Fix timetable.is_cron_in_time'),
(2, '00323 Append timetable.delete_job function'),
(3, '00329 Migration required for some new added functions'),
(4, '00334 Refactor timetable.task as plain schema without tree-like dependencies');
(4, '00334 Refactor timetable.task as plain schema without tree-like dependencies'),
(5, '00381 Rewrite active chain handling');

CREATE DOMAIN timetable.cron AS TEXT CHECK(
substr(VALUE, 1, 6) IN ('@every', '@after') AND (substr(VALUE, 7) :: INTERVAL) IS NOT NULL
Expand Down Expand Up @@ -110,7 +111,6 @@ CREATE UNLOGGED TABLE timetable.active_session(
COMMENT ON TABLE timetable.active_session IS
'Stores information about active sessions';


CREATE TYPE timetable.log_type AS ENUM ('DEBUG', 'NOTICE', 'LOG', 'ERROR', 'PANIC', 'USER');

CREATE OR REPLACE FUNCTION timetable.get_client_name(integer) RETURNS TEXT AS
Expand All @@ -129,7 +129,9 @@ CREATE TABLE timetable.log
message_data jsonb
);

-- log timetable related action
COMMENT ON TABLE timetable.log IS
'Stores log entries of active sessions';

CREATE TABLE timetable.execution_log (
chain_id BIGINT,
task_id BIGINT,
Expand All @@ -143,19 +145,17 @@ CREATE TABLE timetable.execution_log (
client_name TEXT NOT NULL
);

CREATE TYPE timetable.execution_status AS ENUM ('CHAIN_STARTED', 'CHAIN_FAILED', 'CHAIN_DONE', 'TASK_STARTED', 'TASK_DONE', 'DEAD');

CREATE TABLE timetable.run_status (
run_status_id BIGSERIAL PRIMARY KEY,
start_status_id BIGINT REFERENCES timetable.run_status(run_status_id)
ON UPDATE CASCADE ON DELETE CASCADE,
chain_id BIGINT,
task_id BIGINT,
created_at TIMESTAMPTZ DEFAULT clock_timestamp(),
execution_status timetable.execution_status,
client_name TEXT NOT NULL
COMMENT ON TABLE timetable.execution_log IS
'Stores log entries of executed tasks and chains';

CREATE UNLOGGED TABLE timetable.active_chain(
chain_id BIGINT NOT NULL,
client_name TEXT NOT NULL
);

COMMENT ON TABLE timetable.active_chain IS
'Stores information about active chains within session';

CREATE OR REPLACE FUNCTION timetable.try_lock_client_name(worker_pid BIGINT, worker_name TEXT)
RETURNS bool AS
$CODE$
Expand All @@ -172,6 +172,11 @@ BEGIN
FROM pg_catalog.pg_stat_activity
WHERE application_name = 'pg_timetable'
);
DELETE
FROM timetable.active_chain
WHERE client_name NOT IN (
SELECT client_name FROM timetable.active_session
);
-- check if there any active sessions with the client name but different client pid
PERFORM 1
FROM timetable.active_session s
Expand Down
33 changes: 0 additions & 33 deletions internal/pgengine/sql/job_functions.sql
Original file line number Diff line number Diff line change
@@ -1,36 +1,3 @@
CREATE OR REPLACE FUNCTION timetable.get_chain_running_statuses(chain_id BIGINT) RETURNS SETOF BIGINT AS $$
SELECT start_status.run_status_id
FROM timetable.run_status start_status
WHERE start_status.execution_status = 'CHAIN_STARTED'
AND start_status.chain_id = $1
AND NOT EXISTS (
SELECT 1
FROM timetable.run_status finish_status
WHERE start_status.run_status_id = finish_status.start_status_id
AND finish_status.execution_status IN ('CHAIN_FAILED', 'CHAIN_DONE', 'DEAD')
)
ORDER BY 1
$$ LANGUAGE SQL STRICT;

COMMENT ON FUNCTION timetable.get_chain_running_statuses(chain_id BIGINT) IS
'Returns a set of active run status IDs for a given chain';

CREATE OR REPLACE FUNCTION timetable.health_check(client_name TEXT) RETURNS void AS $$
INSERT INTO timetable.run_status
(execution_status, start_status_id, client_name)
SELECT
'DEAD', start_status.run_status_id, $1
FROM timetable.run_status start_status
WHERE start_status.execution_status = 'CHAIN_STARTED'
AND start_status.client_name = $1
AND NOT EXISTS (
SELECT 1
FROM timetable.run_status finish_status
WHERE start_status.run_status_id = finish_status.start_status_id
AND finish_status.execution_status IN ('CHAIN_FAILED', 'CHAIN_DONE', 'DEAD')
)
$$ LANGUAGE SQL STRICT;

-- add_task() will add a task to the same chain as the task with `parent_id`
CREATE OR REPLACE FUNCTION timetable.add_task(
IN kind timetable.command_kind,
Expand Down
Loading

0 comments on commit 7b385d7

Please sign in to comment.