Skip to content

Commit

Permalink
Fix issue with not updating statusLogs (long lived tx, when we agress…
Browse files Browse the repository at this point in the history
…ively prune connections)
  • Loading branch information
Lee Hambley committed Dec 14, 2017
1 parent 8228d48 commit 1560c2b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 38 deletions.
2 changes: 1 addition & 1 deletion api/src/github.com/harrowio/harrow/cmd/runner/main.go
Expand Up @@ -47,7 +47,7 @@ func Main() {
// Get configuration from ENV (see config package)
config := config.GetConfig()

// we use this to emit some activitiyes when things happen
// we use this to emit some activities when things happen
activityBus := activity.NewAMQPTransport(config.AmqpConnectionString(), fmt.Sprintf("runner-%s", connStr))
defer activityBus.Close()

Expand Down
53 changes: 20 additions & 33 deletions api/src/github.com/harrowio/harrow/cmd/runner/runnable.go
Expand Up @@ -3,6 +3,7 @@ package runner
import (
"database/sql"
"fmt"
"regexp"
"time"

"github.com/harrowio/harrow/cast"
Expand Down Expand Up @@ -95,19 +96,33 @@ func (ofdob *OperationFromDbOrBus) Next() (*domain.Operation, error) {
if err != nil {
return nil, errors.Wrap(err, "could not start database transaction")
}
defer tx.Commit()
defer tx.Rollback()

ofdob.log.Info().Msg("getting next unstarted operation from database")

var ops []domain.Operation = []domain.Operation{}
var op *domain.Operation = &domain.Operation{}
var opStore *stores.DbOperationStore = stores.NewDbOperationStore(tx)

// started_at is our only "start" field
// and the other five are "stop" fields
// we're looking for anything unstarted that hasn't been stopped
// for any reason.
query := ` SELECT * FROM operations WHERE (started_at IS NULL) AND (canceled_at IS NULL AND timed_out_at IS NULL AND failed_at IS NULL AND finished_at IS NULL AND archived_at IS NULL) ORDER BY created_at ASC;`
err = tx.Select(&ops, query)
query := `
SELECT *
FROM operations
WHERE (started_at IS NULL)
AND (canceled_at IS NULL
AND timed_out_at IS NULL
AND failed_at IS NULL
AND finished_at IS NULL
AND archived_at IS NULL)
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1;
`
re := regexp.MustCompile("[\n\t]")
ofdob.log.Debug().Msg(re.ReplaceAllString(query, " "))
err = tx.Get(op, query)
if err == sql.ErrNoRows {
ofdob.log.Debug().Msg("no rows found, but no errors")
return nil, nil
Expand All @@ -116,35 +131,6 @@ func (ofdob *OperationFromDbOrBus) Next() (*domain.Operation, error) {
return nil, errors.Wrap(err, "could not select next unstarted operation from database")
}

ofdob.log.Info().Int("unstarted_operations", len(ops)).Msg("unstarted operations")

// The operations are sorted by the time at which they were created
// which means we should start with the oldest and try and lock it
var op *domain.Operation
for _, o := range ops {
var gotLock bool
var query string = fmt.Sprintf(`SELECT pg_try_advisory_lock(('x' || lpad('%s', 8, '0'))::bit(32)::int);`, o.Uuid)
err := tx.Get(&gotLock, query)
if err != nil {
return nil, errors.Wrap(err, "error attempting lock")
}
if gotLock == true {
ofdob.log.Debug().Msgf("got lock, %#v", o)
op = &o
break
} else {
ofdob.log.Debug().Msg("failed to get lock, trying again")
}
}

// Did we exit the loop because of a successful lock or
if op == nil {
ofdob.log.Info().Msg("failed to lock a record, returning for another shot on the next tick")
return nil, nil
} else {
ofdob.log.Info().Msgf("locked operation uuid %s, proceeding", op.Uuid)
}

// TODO: risky, pointer dereference for a possibly nil field? (ttl calc and age to domain.Operation)
ofdob.log.Info().Msgf("found operation %s (age: %s), checking ttl", op.Uuid, time.Now().UTC().Sub(*op.CreatedAt))

Expand All @@ -164,6 +150,7 @@ func (ofdob *OperationFromDbOrBus) Next() (*domain.Operation, error) {
if err := appendStatusLog(ofdob.log, tx, op.Uuid, "vm.reserved", fmt.Sprintf("Reserved, will be started (wait time %s)", time.Now().UTC().Sub(*op.CreatedAt))); err != nil {
return nil, errors.Wrap(err, "could not append vm.reserved message to operation status logs")
}
tx.Commit()

ofdob.log.Info().Str("runnable", "Next()").Msg("returning")
return op, nil
Expand Down
6 changes: 2 additions & 4 deletions api/src/github.com/harrowio/harrow/cmd/runner/runner.go
Expand Up @@ -205,9 +205,6 @@ func (r *Runner) runOperation(op *domain.Operation) {
if err != nil {
r.errs <- errors.Wrap(err, "could not start database transaction")
}
defer tx.Commit()

var opStore *stores.DbOperationStore = stores.NewDbOperationStore(tx)

r.log.Info().Msgf("operation to run is: %s", op.Uuid)
o := Operation{
Expand All @@ -219,7 +216,8 @@ func (r *Runner) runOperation(op *domain.Operation) {
}

r.log.Info().Msg("marking operation as proceeding to run it")
opStore.MarkAsStarted(op.Uuid)
stores.NewDbOperationStore(tx).MarkAsStarted(op.Uuid)
tx.Commit()

switch op.IsUserJob() {
case true:
Expand Down

0 comments on commit 1560c2b

Please sign in to comment.