Skip to content

Commit

Permalink
[+] add context to scheduler.Run()
Browse files Browse the repository at this point in the history
[+] add context to pgengine.ReconnectDbAndFixLeftovers() and use channels
[+] pass derived contexts to chain workers goroutines
[-] fix "return" to "continue" in scheduler.intervalChainWorker()
[*] make pgengine.WaitTime public const
  • Loading branch information
pashagolub committed Apr 13, 2020
1 parent 828f8f2 commit 032ab3e
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 70 deletions.
16 changes: 10 additions & 6 deletions internal/pgengine/access.go
@@ -1,6 +1,7 @@
package pgengine

import (
"context"
"database/sql"
"fmt"
"hash/adler32"
Expand All @@ -17,8 +18,8 @@ const AppID = 0x204F04EE

/*FixSchedulerCrash make sure that task chains which are not complete due to a scheduler crash are "fixed"
and marked as stopped at a certain point */
func FixSchedulerCrash() {
_, err := ConfigDb.Exec(`
func FixSchedulerCrash(ctx context.Context) {
_, err := ConfigDb.ExecContext(ctx, `
INSERT INTO timetable.run_status (execution_status, started, last_status_update, start_status, chain_execution_config, client_name)
SELECT 'DEAD', now(), now(), start_status, 0, $1 FROM (
SELECT start_status
Expand All @@ -32,11 +33,11 @@ func FixSchedulerCrash() {
}

// CanProceedChainExecution checks if particular chain can be exeuted in parallel
func CanProceedChainExecution(chainConfigID int, maxInstances int) bool {
func CanProceedChainExecution(ctx context.Context, chainConfigID int, maxInstances int) bool {
const sqlProcCount = "SELECT count(*) FROM timetable.get_running_jobs($1) AS (id BIGINT, status BIGINT) GROUP BY id"
var procCount int
LogToDB("DEBUG", fmt.Sprintf("Checking if can proceed with chaing config ID: %d", chainConfigID))
err := ConfigDb.Get(&procCount, sqlProcCount, chainConfigID)
err := ConfigDb.GetContext(ctx, &procCount, sqlProcCount, chainConfigID)
switch {
case err == sql.ErrNoRows:
return true
Expand All @@ -49,9 +50,9 @@ func CanProceedChainExecution(chainConfigID int, maxInstances int) bool {
}

// DeleteChainConfig delete chaing configuration for self destructive chains
func DeleteChainConfig(chainConfigID int) bool {
func DeleteChainConfig(ctx context.Context, chainConfigID int) bool {
LogToDB("LOG", "Deleting self destructive chain configuration ID: ", chainConfigID)
res, err := ConfigDb.Exec("DELETE FROM timetable.chain_execution_config WHERE chain_execution_config = $1 ", chainConfigID)
res, err := ConfigDb.ExecContext(ctx, "DELETE FROM timetable.chain_execution_config WHERE chain_execution_config = $1 ", chainConfigID)
if err != nil {
LogToDB("ERROR", "Error occurred during deleting self destructive chains: ", err)
}
Expand All @@ -67,6 +68,9 @@ func TryLockClientName() (res bool) {
if err != nil {
LogToDB("ERROR", "Error occurred during client name locking: ", err)
}
if !res {
LogToDB("ERROR", "Another client is already connected to server with name: ", ClientName)
}
return
}

Expand Down
32 changes: 17 additions & 15 deletions internal/pgengine/bootstrap.go
Expand Up @@ -13,11 +13,11 @@ import (
"github.com/lib/pq"
)

// wait for 5 sec before reconnecting to DB
const waitTime = 5
// WaitTime specifies amount of time in seconds to wait before reconnecting to DB
const WaitTime = 5

// maximum wait time before reconnect attempts
const maxWaitTime = waitTime * 16
const maxWaitTime = WaitTime * 16

// ConfigDb is the global database object
var ConfigDb *sqlx.DB
Expand Down Expand Up @@ -55,7 +55,7 @@ var sqlNames = []string{"DDL", "JSON Schema", "Built-in Tasks", "Job Functions"}

// InitAndTestConfigDBConnection opens connection and creates schema
func InitAndTestConfigDBConnection(ctx context.Context) bool {
var wt int = waitTime
var wt int = WaitTime
var err error
connstr := fmt.Sprintf("application_name=pg_timetable host='%s' port='%s' dbname='%s' sslmode='%s' user='%s' password='%s'",
Host, Port, DbName, SSLMode, User, Password)
Expand Down Expand Up @@ -129,17 +129,19 @@ func FinalizeConfigDBConnection() {
}

//ReconnectDbAndFixLeftovers keeps trying reconnecting every `waitTime` seconds till connection established
func ReconnectDbAndFixLeftovers() {
var err error
for {
fmt.Printf(GetLogPrefixLn("REPAIR"), fmt.Sprintf("Connection to the server was lost. Waiting for %d sec...", waitTime))
time.Sleep(waitTime * time.Second)
fmt.Printf(GetLogPrefix("REPAIR"), "Reconnecting...\n")
err = ConfigDb.Ping()
if err == nil {
LogToDB("LOG", "Connection reestablished...")
FixSchedulerCrash()
break
func ReconnectDbAndFixLeftovers(ctx context.Context) bool {
for ConfigDb.PingContext(ctx) != nil {
fmt.Printf(GetLogPrefixLn("REPAIR"),
fmt.Sprintf("Connection to the server was lost. Waiting for %d sec...", WaitTime))
select {
case <-time.After(WaitTime * time.Second):
fmt.Printf(GetLogPrefix("REPAIR"), "Reconnecting...\n")
case <-ctx.Done():
fmt.Printf(GetLogPrefixLn("ERROR"), fmt.Sprintf("request cancelled: %v", ctx.Err()))
return false
}
}
LogToDB("LOG", "Connection reestablished...")
FixSchedulerCrash(ctx)
return true
}
10 changes: 5 additions & 5 deletions internal/pgengine/log.go
@@ -1,6 +1,7 @@
package pgengine

import (
"context"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -58,11 +59,10 @@ func LogToDB(level string, msg ...interface{}) {
fmt.Println(s)
if ConfigDb != nil {
_, err := ConfigDb.Exec(logTemplate, os.Getpid(), ClientName, level, fmt.Sprint(msg...))
for err != nil && ConfigDb.Ping() != nil {
// If there is DB outage, reconnect and write missing log
ReconnectDbAndFixLeftovers()
_, err = ConfigDb.Exec(logTemplate, os.Getpid(), ClientName, level, fmt.Sprint(msg...))
level = "ERROR" //we don't want panic in case of disconnect
if err != nil && ConfigDb.Ping() != nil {
if ReconnectDbAndFixLeftovers(context.TODO()) {
_, _ = ConfigDb.Exec(logTemplate, os.Getpid(), ClientName, level, fmt.Sprint(msg...))
}
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions internal/pgengine/pgengine_test.go
Expand Up @@ -225,7 +225,8 @@ func TestInitAndTestConfigDBConnection(t *testing.T) {
})

t.Run("Check Reconnecting Database", func(t *testing.T) {
assert.NotPanics(t, pgengine.ReconnectDbAndFixLeftovers, "Does not panics")
assert.Equal(t, true, pgengine.ReconnectDbAndFixLeftovers(context.Background()),
"Should succeed for reconnect")
})

t.Run("Check TryLockClientName()", func(t *testing.T) {
Expand All @@ -241,16 +242,18 @@ func TestSchedulerFunctions(t *testing.T) {
teardownTestCase := setupTestCase(t)
defer teardownTestCase(t)

ctx := context.Background()

t.Run("Check FixSchedulerCrash function", func(t *testing.T) {
assert.NotPanics(t, pgengine.FixSchedulerCrash, "Fix scheduler crash failed")
assert.NotPanics(t, func() { pgengine.FixSchedulerCrash(ctx) }, "Fix scheduler crash failed")
})

t.Run("Check CanProceedChainExecution funсtion", func(t *testing.T) {
assert.Equal(t, true, pgengine.CanProceedChainExecution(0, 0), "Should proceed with clean database")
assert.Equal(t, true, pgengine.CanProceedChainExecution(ctx, 0, 0), "Should proceed with clean database")
})

t.Run("Check DeleteChainConfig funсtion", func(t *testing.T) {
assert.Equal(t, false, pgengine.DeleteChainConfig(0), "Should not delete in clean database")
assert.Equal(t, false, pgengine.DeleteChainConfig(ctx, 0), "Should not delete in clean database")
})

t.Run("Check GetChainElements funсtion", func(t *testing.T) {
Expand Down
17 changes: 7 additions & 10 deletions internal/scheduler/interval_chain.go
@@ -1,6 +1,7 @@
package scheduler

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -74,15 +75,13 @@ func retriveIntervalChainsAndRun(sql string) {
mutex.Unlock()
}

func intervalChainWorker(ichains <-chan IntervalChain) {
func intervalChainWorker(ctx context.Context, ichains <-chan IntervalChain) {

for ichain := range ichains {
pgengine.LogToDB("DEBUG", fmt.Sprintf("Calling process interval chain for %s", ichain))

if !ichain.isValid() { // chain not in the list of active chains
return
continue
}

pgengine.LogToDB("DEBUG", fmt.Sprintf("Calling process interval chain for %s", ichain))
if !ichain.RepeatAfter {
go func() {
pgengine.LogToDB("DEBUG", fmt.Sprintf("Sleeping before next execution in %ds for chain %s", ichain.Interval, ichain))
Expand All @@ -92,16 +91,14 @@ func intervalChainWorker(ichains <-chan IntervalChain) {
}
}()
}

if !pgengine.CanProceedChainExecution(ichain.ChainExecutionConfigID, ichain.MaxInstances) {
if !pgengine.CanProceedChainExecution(ctx, ichain.ChainExecutionConfigID, ichain.MaxInstances) {
pgengine.LogToDB("LOG", fmt.Sprintf("Cannot proceed with chain ID: %d; configuration ID: %d",
ichain.ChainID, ichain.ChainExecutionConfigID))
return
}

executeChain(ichain.ChainExecutionConfigID, ichain.ChainID)
executeChain(ctx, ichain.ChainExecutionConfigID, ichain.ChainID)
if ichain.SelfDestruct {
pgengine.DeleteChainConfig(ichain.ChainExecutionConfigID)
pgengine.DeleteChainConfig(ctx, ichain.ChainExecutionConfigID)
} else if ichain.RepeatAfter {
go func() {
pgengine.LogToDB("DEBUG", fmt.Sprintf("Sleeping before next execution in %ds for chain %s", ichain.Interval, ichain))
Expand Down
77 changes: 48 additions & 29 deletions internal/scheduler/scheduler.go
@@ -1,6 +1,7 @@
package scheduler

import (
"context"
"encoding/json"
"fmt"
"strings"
Expand Down Expand Up @@ -54,69 +55,87 @@ func (chain Chain) String() string {
}

//Run executes jobs
func Run() {
func Run(ctx context.Context) bool {
for !pgengine.TryLockClientName() {
pgengine.LogToDB("ERROR", "Another client is already connected to server with name: ", pgengine.ClientName)
time.Sleep(refetchTimeout * time.Second)
select {
case <-time.After(refetchTimeout * time.Second):
case <-ctx.Done():
// If the request gets cancelled, log it
pgengine.LogToDB("ERROR", "request cancelled\n")
return false
}
}
// create sleeping workers waiting data on channel
for w := 1; w <= workersNumber; w++ {
go chainWorker(chains)
go intervalChainWorker(intervalChainsChan)
chainCtx, cancel := context.WithCancel(ctx)
defer cancel()
go chainWorker(chainCtx, chains)
chainCtx, cancel = context.WithCancel(ctx)
defer cancel()
go intervalChainWorker(chainCtx, intervalChainsChan)
}
/* set maximum connection to workersNumber + 1 for system calls */
pgengine.ConfigDb.SetMaxOpenConns(workersNumber + 1)
/* cleanup potential database leftovers */
pgengine.FixSchedulerCrash()
pgengine.FixSchedulerCrash(ctx)
pgengine.LogToDB("LOG", "Checking for @reboot task chains...")
retriveChainsAndRun(sqlSelectRebootChains)
retriveChainsAndRun(ctx, sqlSelectRebootChains)
/* loop forever or until we ask it to stop */
for {
pgengine.LogToDB("LOG", "Checking for task chains...")
retriveChainsAndRun(sqlSelectChains)
retriveChainsAndRun(ctx, sqlSelectChains)
pgengine.LogToDB("LOG", "Checking for interval task chains...")
retriveIntervalChainsAndRun(sqlSelectIntervalChains)
/* wait for the next full minute to show up */
time.Sleep(refetchTimeout * time.Second)
select {
case <-time.After(refetchTimeout * time.Second):
case <-ctx.Done():
// If the request gets cancelled, log it
pgengine.LogToDB("ERROR", "request cancelled\n")
return false
}
}
}

func retriveChainsAndRun(sql string) {
func retriveChainsAndRun(ctx context.Context, sql string) {
headChains := []Chain{}
err := pgengine.ConfigDb.Select(&headChains, sql, pgengine.ClientName)
err := pgengine.ConfigDb.SelectContext(ctx, &headChains, sql, pgengine.ClientName)
if err != nil {
pgengine.LogToDB("ERROR", "Could not query pending tasks: ", err)
} else {
headChainsCount := len(headChains)
pgengine.LogToDB("LOG", "Number of chains to be executed: ", headChainsCount)
/* now we can loop through so chains */
for _, headChain := range headChains {
if headChainsCount > maxChainsThreshold {
time.Sleep(time.Duration(refetchTimeout*1000/headChainsCount) * time.Millisecond)
}
pgengine.LogToDB("DEBUG", fmt.Sprintf("Putting head chain %s to the execution channel", headChain))
chains <- headChain
return
}
headChainsCount := len(headChains)
pgengine.LogToDB("LOG", "Number of chains to be executed: ", headChainsCount)
/* now we can loop through so chains */
for _, headChain := range headChains {
if headChainsCount > maxChainsThreshold {
time.Sleep(time.Duration(refetchTimeout*1000/headChainsCount) * time.Millisecond)
}
pgengine.LogToDB("DEBUG", fmt.Sprintf("Putting head chain %s to the execution channel", headChain))
chains <- headChain
}
}

func chainWorker(chains <-chan Chain) {
func chainWorker(ctx context.Context, chains <-chan Chain) {
for chain := range chains {
pgengine.LogToDB("DEBUG", fmt.Sprintf("Calling process chain for %s", chain))
for !pgengine.CanProceedChainExecution(chain.ChainExecutionConfigID, chain.MaxInstances) {
for !pgengine.CanProceedChainExecution(ctx, chain.ChainExecutionConfigID, chain.MaxInstances) {
pgengine.LogToDB("DEBUG", fmt.Sprintf("Cannot proceed with chain %s. Sleeping...", chain))
time.Sleep(3 * time.Second)
select {
case <-time.After(time.Duration(pgengine.WaitTime) * time.Second):
case <-ctx.Done():
pgengine.LogToDB("ERROR", "request cancelled\n")
return
}
}

executeChain(chain.ChainExecutionConfigID, chain.ChainID)
executeChain(ctx, chain.ChainExecutionConfigID, chain.ChainID)
if chain.SelfDestruct {
pgengine.DeleteChainConfig(chain.ChainExecutionConfigID)
pgengine.DeleteChainConfig(ctx, chain.ChainExecutionConfigID)
}
}
}

/* execute a chain of tasks */
func executeChain(chainConfigID int, chainID int) {
func executeChain(ctx context.Context, chainConfigID int, chainID int) {
var ChainElements []pgengine.ChainElementExecution

tx := pgengine.StartTransaction()
Expand Down
2 changes: 1 addition & 1 deletion main.go
Expand Up @@ -35,5 +35,5 @@ func main() {
}
defer pgengine.FinalizeConfigDBConnection()
pgengine.SetupCloseHandler()
scheduler.Run()
scheduler.Run(context.Background())
}

0 comments on commit 032ab3e

Please sign in to comment.