Skip to content

Commit

Permalink
[+] add contexts to task and logging functions
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub committed Jun 19, 2020
1 parent 2c458a3 commit 56f00f0
Show file tree
Hide file tree
Showing 20 changed files with 175 additions and 163 deletions.
22 changes: 11 additions & 11 deletions internal/pgengine/access.go
Expand Up @@ -29,33 +29,33 @@ func FixSchedulerCrash(ctx context.Context) {
GROUP BY 1
HAVING count(*) < 2 ) AS abc`, ClientName)
if err != nil {
LogToDB("ERROR", "Error occurred during reverting from the scheduler crash: ", err)
LogToDB(ctx, "ERROR", "Error occurred during reverting from the scheduler crash: ", err)
}
}

// CanProceedChainExecution checks if particular chain can be exeuted in parallel
func CanProceedChainExecution(ctx context.Context, chainConfigID int, maxInstances int) bool {
const sqlProcCount = "SELECT count(*) FROM timetable.get_running_jobs($1) AS (id BIGINT, status BIGINT) GROUP BY id"
var procCount int
LogToDB("DEBUG", fmt.Sprintf("Checking if can proceed with chaing config ID: %d", chainConfigID))
LogToDB(ctx, "DEBUG", fmt.Sprintf("Checking if can proceed with chaing config ID: %d", chainConfigID))
err := ConfigDb.GetContext(ctx, &procCount, sqlProcCount, chainConfigID)
switch {
case err == sql.ErrNoRows:
return true
case err == nil:
return procCount < maxInstances
default:
LogToDB("ERROR", "Cannot read information about concurrent running jobs: ", err)
LogToDB(ctx, "ERROR", "Cannot read information about concurrent running jobs: ", err)
return false
}
}

// DeleteChainConfig delete chaing configuration for self destructive chains
func DeleteChainConfig(ctx context.Context, chainConfigID int) bool {
LogToDB("LOG", "Deleting self destructive chain configuration ID: ", chainConfigID)
LogToDB(ctx, "LOG", "Deleting self destructive chain configuration ID: ", chainConfigID)
res, err := ConfigDb.ExecContext(ctx, "DELETE FROM timetable.chain_execution_config WHERE chain_execution_config = $1 ", chainConfigID)
if err != nil {
LogToDB("ERROR", "Error occurred during deleting self destructive chains: ", err)
LogToDB(ctx, "ERROR", "Error occurred during deleting self destructive chains: ", err)
return false
}
rowsDeleted, err := res.RowsAffected()
Expand All @@ -68,19 +68,19 @@ func TryLockClientName(ctx context.Context) (res bool) {
adler32Int := adler32.Checksum([]byte(ClientName))
res = false
for !res {
LogToDB("DEBUG", fmt.Sprintf("Trying to get advisory lock for '%s' with hash 0x%x", ClientName, adler32Int))
LogToDB(ctx, "DEBUG", fmt.Sprintf("Trying to get advisory lock for '%s' with hash 0x%x", ClientName, adler32Int))
err := ConfigDb.GetContext(ctx, &res, "SELECT pg_try_advisory_lock($1, $2)", AppID, adler32Int)
if err != nil {
LogToDB("ERROR", "Error occurred during client name locking: ", err)
LogToDB(ctx, "ERROR", "Error occurred during client name locking: ", err)
}
if !res {
LogToDB("ERROR", "Another client is already connected to server with name: ", ClientName)
LogToDB(ctx, "ERROR", "Another client is already connected to server with name: ", ClientName)
}
select {
case <-time.After(time.Duration(wt) * time.Second):
case <-ctx.Done():
// If the request gets cancelled, log it
LogToDB("ERROR", "request cancelled\n")
LogToDB(ctx, "ERROR", "request cancelled\n")
return false
}
if wt < maxWaitTime {
Expand Down Expand Up @@ -118,7 +118,7 @@ RETURNING run_status`
var id int
err := ConfigDb.GetContext(ctx, &id, sqlInsertRunStatus, chainID, chainConfigID, ClientName)
if err != nil {
LogToDB("ERROR", "Cannot save information about the chain run status: ", err)
LogToDB(ctx, "ERROR", "Cannot save information about the chain run status: ", err)
}
return id
}
Expand All @@ -134,6 +134,6 @@ VALUES
_, err = ConfigDb.ExecContext(ctx, sqlInsertFinishStatus, chainElemExec.ChainID, status, chainElemExec.TaskID,
runStatusID, chainElemExec.ChainConfig, ClientName)
if err != nil {
LogToDB("ERROR", "Update Chain Status failed: ", err)
LogToDB(ctx, "ERROR", "Update Chain Status failed: ", err)
}
}
24 changes: 12 additions & 12 deletions internal/pgengine/bootstrap.go
Expand Up @@ -42,7 +42,7 @@ func InitAndTestConfigDBConnection(ctx context.Context, cmdOpts cmdparser.CmdOpt
ClientName = cmdOpts.ClientName
NoShellTasks = cmdOpts.NoShellTasks
VerboseLogLevel = cmdOpts.Verbose
LogToDB("DEBUG", fmt.Sprintf("Starting new session... %s", &cmdOpts))
LogToDB(ctx, "DEBUG", fmt.Sprintf("Starting new session... %s", &cmdOpts))
var wt int = WaitTime
var err error
connstr := fmt.Sprintf("application_name='pg_timetable' host='%s' port='%s' dbname='%s' sslmode='%s' user='%s' password='%s'",
Expand All @@ -54,29 +54,29 @@ func InitAndTestConfigDBConnection(ctx context.Context, cmdOpts cmdparser.CmdOpt
}
// Wrap the connector to simply print out the message
connector := pq.ConnectorWithNoticeHandler(base, func(notice *pq.Error) {
LogToDB("USER", "Severity: ", notice.Severity, "; Message: ", notice.Message)
LogToDB(ctx, "USER", "Severity: ", notice.Severity, "; Message: ", notice.Message)
})
db := OpenDB(connector)

err = db.PingContext(ctx)
for err != nil {
LogToDB("ERROR", err)
LogToDB("LOG", "Reconnecting in ", wt, " sec...")
LogToDB(ctx, "ERROR", err)
LogToDB(ctx, "LOG", "Reconnecting in ", wt, " sec...")
select {
case <-time.After(time.Duration(wt) * time.Second):
err = db.PingContext(ctx)
case <-ctx.Done():
LogToDB("ERROR", "Connection request cancelled: ", ctx.Err())
LogToDB(ctx, "ERROR", "Connection request cancelled: ", ctx.Err())
return false
}
if wt < maxWaitTime {
wt = wt * 2
}
}

LogToDB("DEBUG", "Connection string: ", connstr)
LogToDB("LOG", "Connection established...")
LogToDB("LOG", fmt.Sprintf("Proceeding as '%s' with client PID %d", ClientName, os.Getpid()))
LogToDB(ctx, "DEBUG", "Connection string: ", connstr)
LogToDB(ctx, "LOG", "Connection established...")
LogToDB(ctx, "LOG", fmt.Sprintf("Proceeding as '%s' with client PID %d", ClientName, os.Getpid()))
ConfigDb = sqlx.NewDb(db, "postgres")

if !executeSchemaScripts(ctx) {
Expand All @@ -103,7 +103,7 @@ func ExecuteCustomScripts(ctx context.Context, filename ...string) bool {
fmt.Printf(GetLogPrefixLn("PANIC"), err)
return false
}
LogToDB("LOG", "Script file executed: "+f)
LogToDB(ctx, "LOG", "Script file executed: "+f)
}
return true
}
Expand All @@ -127,9 +127,9 @@ func executeSchemaScripts(ctx context.Context) bool {
}
return false
}
LogToDB("LOG", "Schema file executed: "+sqlName)
LogToDB(ctx, "LOG", "Schema file executed: "+sqlName)
}
LogToDB("LOG", "Configuration schema created...")
LogToDB(ctx, "LOG", "Configuration schema created...")
}
return true
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func ReconnectDbAndFixLeftovers(ctx context.Context) bool {
return false
}
}
LogToDB("LOG", "Connection reestablished...")
LogToDB(ctx, "LOG", "Connection reestablished...")
FixSchedulerCrash(ctx)
return true
}
11 changes: 6 additions & 5 deletions internal/pgengine/log.go
@@ -1,6 +1,7 @@
package pgengine

import (
"context"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -46,7 +47,7 @@ func GetLogPrefixLn(level string) string {
const logTemplate = `INSERT INTO timetable.log(pid, client_name, log_level, message) VALUES ($1, $2, $3, $4)`

// LogToDB performs logging to configuration database ConfigDB initiated during bootstrap
func LogToDB(level string, msg ...interface{}) {
func LogToDB(ctx context.Context, level string, msg ...interface{}) {
if !VerboseLogLevel {
switch level {
case
Expand All @@ -57,16 +58,16 @@ func LogToDB(level string, msg ...interface{}) {
s := fmt.Sprintf(GetLogPrefix(level), fmt.Sprint(msg...))
fmt.Println(s)
if ConfigDb != nil {
_, err := ConfigDb.Exec(logTemplate, os.Getpid(), ClientName, level, fmt.Sprint(msg...))
_, err := ConfigDb.ExecContext(ctx, logTemplate, os.Getpid(), ClientName, level, fmt.Sprint(msg...))
if err != nil {
fmt.Printf(GetLogPrefixLn("ERROR"), fmt.Sprint("Cannot log to the database: ", err))
}
}
}

// LogChainElementExecution will log current chain element execution status including retcode
func LogChainElementExecution(chainElemExec *ChainElementExecution, retCode int, output string) {
_, err := ConfigDb.Exec("INSERT INTO timetable.execution_log (chain_execution_config, chain_id, task_id, name, script, "+
func LogChainElementExecution(ctx context.Context, chainElemExec *ChainElementExecution, retCode int, output string) {
_, err := ConfigDb.ExecContext(ctx, "INSERT INTO timetable.execution_log (chain_execution_config, chain_id, task_id, name, script, "+
"kind, last_run, finished, returncode, pid, output, client_name) "+
"VALUES ($1, $2, $3, $4, $5, $6, clock_timestamp() - $7 :: interval, clock_timestamp(), $8, $9, "+
"NULLIF($10, ''), $11)",
Expand All @@ -75,6 +76,6 @@ func LogChainElementExecution(chainElemExec *ChainElementExecution, retCode int,
fmt.Sprintf("%d microsecond", chainElemExec.Duration),
retCode, os.Getpid(), output, ClientName)
if err != nil {
LogToDB("ERROR", "Error occurred during logging current chain element execution status including retcode: ", err)
LogToDB(ctx, "ERROR", "Error occurred during logging current chain element execution status including retcode: ", err)
}
}
7 changes: 4 additions & 3 deletions internal/pgengine/log_test.go
@@ -1,6 +1,7 @@
package pgengine_test

import (
"context"
"database/sql"
"errors"
"testing"
Expand All @@ -17,14 +18,14 @@ func TestLogToDb(t *testing.T) {

t.Run("Check LogToDB in terse mode", func(t *testing.T) {
pgengine.VerboseLogLevel = false
pgengine.LogToDB("DEBUG", "Test DEBUG message")
pgengine.LogToDB(context.TODO(), "DEBUG", "Test DEBUG message")

})

t.Run("Check LogToDB in verbose mode", func(t *testing.T) {
pgengine.VerboseLogLevel = true
mock.ExpectExec("INSERT INTO timetable\\.log").WillReturnError(sql.ErrConnDone)
pgengine.LogToDB("DEBUG", "Test DEBUG message")
pgengine.LogToDB(context.TODO(), "DEBUG", "Test DEBUG message")
})

assert.NoError(t, mock.ExpectationsWereMet(), "there were unfulfilled expectations")
Expand All @@ -38,7 +39,7 @@ func TestLogChainElementExecution(t *testing.T) {
t.Run("Check LogChainElementExecution if sql fails", func(t *testing.T) {
mock.ExpectExec("INSERT INTO timetable\\.execution_log").WillReturnError(errors.New("error"))
mock.ExpectExec("INSERT INTO timetable\\.log").WillReturnResult(sqlmock.NewResult(0, 1))
pgengine.LogChainElementExecution(&pgengine.ChainElementExecution{}, 0, "STATUS")
pgengine.LogChainElementExecution(context.TODO(), &pgengine.ChainElementExecution{}, 0, "STATUS")
})

assert.NoError(t, mock.ExpectationsWereMet(), "there were unfulfilled expectations")
Expand Down
14 changes: 7 additions & 7 deletions internal/pgengine/migration.go
Expand Up @@ -11,23 +11,23 @@ var m *migrator.Migrator

// MigrateDb upgrades database with all migrations
func MigrateDb(ctx context.Context) bool {
LogToDB("LOG", "Upgrading database...")
LogToDB(ctx, "LOG", "Upgrading database...")
if err := m.Migrate(ctx, ConfigDb.DB); err != nil {
LogToDB("PANIC", err)
LogToDB(ctx, "PANIC", err)
return false
}
return true
}

// CheckNeedMigrateDb checks need of upgrading database and throws error if that's true
func CheckNeedMigrateDb(ctx context.Context) (bool, error) {
LogToDB("DEBUG", "Check need of upgrading database...")
LogToDB(ctx, "DEBUG", "Check need of upgrading database...")
upgrade, err := m.NeedUpgrade(ctx, ConfigDb.DB)
if upgrade {
LogToDB("PANIC", "You need to upgrade your database before proceeding, use --upgrade option")
LogToDB(ctx, "PANIC", "You need to upgrade your database before proceeding, use --upgrade option")
}
if err != nil {
LogToDB("PANIC", err)
LogToDB(ctx, "PANIC", err)
}
return upgrade, err
}
Expand All @@ -37,7 +37,7 @@ func init() {
m, err = migrator.New(
migrator.TableName("timetable.migrations"),
migrator.SetNotice(func(s string) {
LogToDB("LOG", s)
LogToDB(context.TODO(), "LOG", s)
}),
migrator.Migrations(
&migrator.Migration{
Expand Down Expand Up @@ -79,7 +79,7 @@ func init() {
),
)
if err != nil {
LogToDB("ERROR", err)
LogToDB(context.TODO(), "ERROR", err)
}
}

Expand Down
20 changes: 10 additions & 10 deletions internal/pgengine/pgengine_test.go
Expand Up @@ -94,7 +94,7 @@ func TestInitAndTestConfigDBConnection(t *testing.T) {
pgengine.ConfigDb.MustExec("TRUNCATE timetable.log")
for _, logLevel := range logLevels {
assert.NotPanics(t, func() {
pgengine.LogToDB(logLevel, logLevel)
pgengine.LogToDB(ctx, logLevel, logLevel)
}, "LogToDB panicked")

if !pgengine.VerboseLogLevel {
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestSchedulerFunctions(t *testing.T) {
assert.NoError(t, err, "Should start transaction")
assert.True(t, pgengine.GetChainElements(ctx, tx, &chains, 0), "Should no error in clean database")
assert.Empty(t, chains, "Should be empty in clean database")
pgengine.MustCommitTransaction(tx)
pgengine.MustCommitTransaction(ctx, tx)
})

t.Run("Check GetChainParamValues funсtion", func(t *testing.T) {
Expand All @@ -167,7 +167,7 @@ func TestSchedulerFunctions(t *testing.T) {
ChainID: 0,
ChainConfig: 0}), "Should no error in clean database")
assert.Empty(t, paramVals, "Should be empty in clean database")
pgengine.MustCommitTransaction(tx)
pgengine.MustCommitTransaction(ctx, tx)
})

t.Run("Check InsertChainRunStatus funсtion", func(t *testing.T) {
Expand All @@ -181,7 +181,7 @@ func TestSchedulerFunctions(t *testing.T) {
tx, err := pgengine.StartTransaction(ctx)
assert.NoError(t, err, "Should start transaction")
assert.NotNil(t, pgengine.GetConnectionString(ctx, databaseConnection), "Should no error in clean database")
pgengine.MustCommitTransaction(tx)
pgengine.MustCommitTransaction(ctx, tx)
})

t.Run("Check ExecuteSQLCommand function", func(t *testing.T) {
Expand All @@ -195,7 +195,7 @@ func TestSchedulerFunctions(t *testing.T) {
assert.NoError(t, pgengine.ExecuteSQLCommand(ctx, tx, "SELECT $1", []string{"[42]", `["hey"]`}), "Simple query with doubled parameters")
assert.NoError(t, pgengine.ExecuteSQLCommand(ctx, tx, "SELECT $1, $2", []string{`[42, "hey"]`}), "Simple query with two parameters")

pgengine.MustCommitTransaction(tx)
pgengine.MustCommitTransaction(ctx, tx)
})

}
Expand All @@ -215,15 +215,15 @@ func TestGetRemoteDBTransaction(t *testing.T) {
teardownTestCase := testutils.SetupTestCase(t)
defer teardownTestCase(t)

ctx := context.Background()

remoteDb, tx, err := setupTestRemoteDBFunc()
defer pgengine.FinalizeRemoteDBConnection(remoteDb)
defer pgengine.FinalizeRemoteDBConnection(ctx, remoteDb)
require.NoError(t, err, "remoteDB should be initialized")
require.NotNil(t, remoteDb, "remoteDB should be initialized")

ctx := context.Background()

t.Run("Check connection closing", func(t *testing.T) {
pgengine.FinalizeRemoteDBConnection(remoteDb)
pgengine.FinalizeRemoteDBConnection(ctx, remoteDb)
assert.NotNil(t, remoteDb, "Connection isn't closed properly")
})

Expand All @@ -237,7 +237,7 @@ func TestGetRemoteDBTransaction(t *testing.T) {
assert.NotPanics(t, func() { pgengine.ResetRole(ctx, tx) }, "Reset Role failed")
})

pgengine.MustCommitTransaction(tx)
pgengine.MustCommitTransaction(ctx, tx)
}

func TestSamplesScripts(t *testing.T) {
Expand Down

0 comments on commit 56f00f0

Please sign in to comment.