Skip to content

Commit

Permalink
Added maxConcurrency option for throttling job worker concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
mpscholten committed Oct 15, 2021
1 parent b56c0ea commit cac85fd
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 34 deletions.
66 changes: 34 additions & 32 deletions IHP/Job/Runner.hs
Expand Up @@ -18,6 +18,7 @@ import qualified Control.Concurrent.Async as Async
import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified System.Timeout as Timeout
import qualified Control.Concurrent.Async.Pool as Pool

runJobWorkers :: [JobWorker] -> Script
runJobWorkers jobWorkers = do
Expand All @@ -36,19 +37,19 @@ runJobWorkers jobWorkers = do
modifyIORef exitSignalsCount ((+) 1)
allJobs' <- readIORef allJobs
allJobsCompleted <- allJobs'
|> mapM Async.poll
|> mapM Pool.poll
>>= pure . filter isNothing
>>= pure . null
if allJobsCompleted
then Concurrent.throwTo threadId Exit.ExitSuccess
else if exitSignalsCount' == 0
then do
putStrLn "Waiting for jobs to complete. CTRL+C again to force exit"
forEach allJobs' Async.wait
forEach allJobs' Pool.wait
Concurrent.throwTo threadId Exit.ExitSuccess
else if exitSignalsCount' == 1 then do
putStrLn "Canceling all running jobs. CTRL+C again to force exit"
forEach allJobs' Async.cancel
forEach allJobs' Pool.cancel
Concurrent.throwTo threadId Exit.ExitSuccess
else Concurrent.throwTo threadId Exit.ExitSuccess

Expand Down Expand Up @@ -107,32 +108,33 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do
let ?context = frameworkConfig
let ?modelContext = modelContext

-- This loop schedules all jobs that are in the queue.
-- It will be initally be called when first starting up this job worker
-- and after that it will be called when something has been inserted into the queue (or changed to retry)
let startLoop = do
asyncJob <- Async.async do
Exception.mask $ \restore -> do
maybeJob <- Queue.fetchNextJob @job workerId
case maybeJob of
Just job -> do
putStrLn ("Starting job: " <> tshow job)
let ?job = job
let timeout :: Int = fromMaybe (-1) (timeoutInMicroseconds @job)
resultOrException <- Exception.try (Timeout.timeout timeout $ restore (perform job))
case resultOrException of
Left exception -> Queue.jobDidFail job exception
Right Nothing -> Queue.jobDidTimeout job
Right (Just _) -> Queue.jobDidSucceed job

startLoop
Nothing -> pure ()
modifyIORef allJobs (asyncJob:)

-- Start all jobs in the queue
startLoop

-- Start a job when a new job is added to the table or when it's set to retry
watcher <- Queue.watchForJob (tableName @job) (queuePollInterval @job) startLoop

pure watcher
Pool.withTaskGroup (maxConcurrency @job) \taskGroup -> do
-- This loop schedules all jobs that are in the queue.
-- It will be initally be called when first starting up this job worker
-- and after that it will be called when something has been inserted into the queue (or changed to retry)
let startLoop = do
asyncJob <- Pool.async taskGroup do
Exception.mask $ \restore -> do
maybeJob <- Queue.fetchNextJob @job workerId
case maybeJob of
Just job -> do
putStrLn ("Starting job: " <> tshow job)
let ?job = job
let timeout :: Int = fromMaybe (-1) (timeoutInMicroseconds @job)
resultOrException <- Exception.try (Timeout.timeout timeout $ restore (perform job))
case resultOrException of
Left exception -> Queue.jobDidFail job exception
Right Nothing -> Queue.jobDidTimeout job
Right (Just _) -> Queue.jobDidSucceed job

startLoop
Nothing -> pure ()
modifyIORef allJobs (asyncJob:)

-- Start all jobs in the queue
startLoop

-- Start a job when a new job is added to the table or when it's set to retry
watcher <- Queue.watchForJob (tableName @job) (queuePollInterval @job) startLoop

pure watcher
13 changes: 11 additions & 2 deletions IHP/Job/Types.hs
Expand Up @@ -11,6 +11,7 @@ where
import IHP.Prelude
import IHP.FrameworkConfig
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.Async.Pool as Pool

class Job job where
perform :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => job -> IO ()
Expand All @@ -28,14 +29,22 @@ class Job job where
queuePollInterval :: Int
queuePollInterval = 60 * 1000000

-- | How many jobs of this type can be executed at the same time
--
-- This limit only applies to the running haskell process. If you run @N@ multiple
-- independent processes of the job runner, the limit will be @N * maxConcurrency@
maxConcurrency :: Int
maxConcurrency = 16

class Worker application where
workers :: application -> [JobWorker]

data JobWorkerArgs = JobWorkerArgs
{ allJobs :: IORef [Async.Async ()]
{ allJobs :: IORef [Pool.Async ()]
, workerId :: UUID
, modelContext :: ModelContext
, frameworkConfig :: FrameworkConfig }
, frameworkConfig :: FrameworkConfig
}

newtype JobWorker = JobWorker (JobWorkerArgs -> IO (Async.Async ()))

Expand Down
1 change: 1 addition & 0 deletions ihp.cabal
Expand Up @@ -98,6 +98,7 @@ common shared-properties
, wai-cors
, lens
, random
, async-pool
default-extensions:
OverloadedStrings
, NoImplicitPrelude
Expand Down
2 changes: 2 additions & 0 deletions ihp.nix
Expand Up @@ -59,6 +59,7 @@
, wai-cors
, lens
, random
, async-pool
}:
mkDerivation {
pname = "ihp";
Expand Down Expand Up @@ -124,6 +125,7 @@ mkDerivation {
wai-cors
lens
random
async-pool
];
license = lib.licenses.mit;
postInstall = ''
Expand Down

0 comments on commit cac85fd

Please sign in to comment.