Skip to content

Commit

Permalink
Merge pull request #1966 from digitallyinduced/recover-crashed-jobs
Browse files Browse the repository at this point in the history
Job: Restart jobs that should have timed out, usually when the worker…
  • Loading branch information
mpscholten committed Jun 8, 2024
2 parents 9dc3b9f + c33034c commit ff34aa3
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 15 deletions.
28 changes: 16 additions & 12 deletions IHP/Job/Queue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ fetchNextJob :: forall job.
, Show (PrimaryKey (GetTableName job))
, PG.FromField (PrimaryKey (GetTableName job))
, Table job
) => BackoffStrategy -> UUID -> IO (Maybe job)
fetchNextJob backoffStrategy workerId = do
let query = PG.Query ("UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id")
let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds)
) => Maybe Int -> BackoffStrategy -> UUID -> IO (Maybe job)
fetchNextJob timeoutInMicroseconds backoffStrategy workerId = do
let query = PG.Query ("UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE (((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW()) " <> timeoutCondition timeoutInMicroseconds <> " ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id")
let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds, timeoutInMicroseconds)

result :: [PG.Only (Id job)] <- sqlQuery query params
case result of
Expand All @@ -68,12 +68,12 @@ fetchNextJob backoffStrategy workerId = do
-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@
-- You will see that @"Something changed in the projects table"@ is printed onto the screen.
--
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ())
watchForJob pgListener tableName pollInterval backoffStrategy onNewJob = do
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ())
watchForJob pgListener tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob = do
let tableNameBS = cs tableName
sqlExec (createNotificationTrigger tableNameBS) ()

poller <- pollForJob tableName pollInterval backoffStrategy onNewJob
poller <- pollForJob tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob
subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const (Concurrent.putMVar onNewJob JobAvailable))

pure (subscription, poller)
Expand All @@ -86,10 +86,10 @@ watchForJob pgListener tableName pollInterval backoffStrategy onNewJob = do
--
-- This function returns a Async. Call 'cancel' on the async to stop polling the database.
--
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ())
pollForJob tableName pollInterval backoffStrategy onNewJob = do
let query = PG.Query ("SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1")
let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds)
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ())
pollForJob tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob = do
let query = PG.Query ("SELECT COUNT(*) FROM ? WHERE (((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW()) " <> timeoutCondition timeoutInMicroseconds <> " LIMIT 1")
let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds, timeoutInMicroseconds)
Async.asyncBound do
forever do
count :: Int <- sqlQueryScalar query params
Expand Down Expand Up @@ -258,4 +258,8 @@ instance IHP.Controller.Param.ParamReader JobStatus where

retryQuery :: BackoffStrategy -> ByteString
retryQuery LinearBackoff {} = "updated_at < NOW() + (interval '1 second' * ?)"
retryQuery ExponentialBackoff {} = "updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count)"
retryQuery ExponentialBackoff {} = "updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count)"

timeoutCondition :: Maybe Int -> ByteString
timeoutCondition (Just timeoutInMicroseconds) = "OR (status = 'job_status_running' AND locked_by IS NOT NULL AND locked_at + ((? + 1000000) || 'microseconds')::interval < NOW())" -- Add 1000000 here to avoid race condition with the Haskell based timeout mechanism
timeoutCondition Nothing = "AND (? IS NULL)"
4 changes: 2 additions & 2 deletions IHP/Job/Runner.hs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do

case receivedAction of
JobAvailable -> do
maybeJob <- Queue.fetchNextJob @job (backoffStrategy @job) workerId
maybeJob <- Queue.fetchNextJob @job (timeoutInMicroseconds @job) (backoffStrategy @job) workerId
case maybeJob of
Just job -> do
Log.info ("Starting job: " <> tshow job)
Expand All @@ -191,7 +191,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do

loop

(subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (backoffStrategy @job) action
(subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (timeoutInMicroseconds @job) (backoffStrategy @job) action


pure JobWorkerProcess { runners, subscription, poller, action }
2 changes: 1 addition & 1 deletion IHP/Job/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Job job where
maxAttempts :: (?job :: job) => Int
maxAttempts = 10

timeoutInMicroseconds :: (?job :: job) => Maybe Int
timeoutInMicroseconds :: Maybe Int
timeoutInMicroseconds = Nothing

-- | While jobs are typically fetch using pg_notiy, we have to poll the queue table
Expand Down

0 comments on commit ff34aa3

Please sign in to comment.