Skip to content

Commit

Permalink
Fixed live reloading sometimes broken after running a job in the back…
Browse files Browse the repository at this point in the history
…ground worker
  • Loading branch information
mpscholten committed Oct 29, 2021
1 parent 59ac1c4 commit f269bc9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
3 changes: 2 additions & 1 deletion IHP/Job/Queue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ watchForJob tableName pollInterval handleJob = do
poller <- pollForJob tableName pollInterval handleJob

-- When the watcher is stopped, we also want to stop the poller
Async.link2Only (const True) watcher poller
Async.link watcher
Async.link poller

pure watcher

Expand Down
32 changes: 21 additions & 11 deletions IHP/Job/Runner.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ import qualified System.Timeout as Timeout
import qualified Control.Concurrent.Async.Pool as Pool

runJobWorkers :: [JobWorker] -> Script
runJobWorkers jobWorkers = do
runJobWorkers jobWorkers = runJobWorkersWithExitHandler jobWorkers waitExitHandler

runJobWorkersKillOnExit :: [JobWorker] -> Script
runJobWorkersKillOnExit jobWorkers = runJobWorkersWithExitHandler jobWorkers stopExitHandler

runJobWorkersWithExitHandler :: [JobWorker] -> (JobWorkerArgs -> IO () -> IO ()) -> Script
runJobWorkersWithExitHandler jobWorkers withExitHandler = do
workerId <- UUID.nextRandom
allJobs <- newIORef []
let oneSecond = 1000000
Expand All @@ -30,8 +36,16 @@ runJobWorkers jobWorkers = do

let jobWorkerArgs = JobWorkerArgs { allJobs, workerId, modelContext = ?modelContext, frameworkConfig = ?context}

withExitHandler jobWorkerArgs do
listenAndRuns <- jobWorkers
|> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)

forEach listenAndRuns Async.link

pure ()

waitExitHandler JobWorkerArgs { .. } main = do
threadId <- Concurrent.myThreadId
jobWorkersLoops <- Concurrent.newEmptyMVar
exitSignalsCount <- newIORef 0
let catchHandler = do
exitSignalsCount' <- readIORef exitSignalsCount
Expand All @@ -52,25 +66,20 @@ runJobWorkers jobWorkers = do
putStrLn "Canceling all running jobs. CTRL+C again to force exit"
forEach allJobs' Pool.cancel

loops <- Concurrent.readMVar jobWorkersLoops
forEach loops Async.cancel
Concurrent.throwTo threadId Exit.ExitSuccess
else Concurrent.throwTo threadId Exit.ExitSuccess


Signals.installHandler Signals.sigINT (Signals.Catch catchHandler) Nothing
Signals.installHandler Signals.sigTERM (Signals.Catch catchHandler) Nothing

listenAndRuns <- jobWorkers
|> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)

Concurrent.putMVar jobWorkersLoops listenAndRuns

Async.waitAnyCancel listenAndRuns
main

pure ()


stopExitHandler JobWorkerArgs { .. } main = main

worker :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FilterPrimaryKey (GetTableName job)
Expand Down Expand Up @@ -132,14 +141,15 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do
putStrLn ("Starting job: " <> tshow job)
let ?job = job
let timeout :: Int = fromMaybe (-1) (timeoutInMicroseconds @job)
resultOrException <- Exception.try (Timeout.timeout timeout $ restore (perform job))
resultOrException <- Exception.try (Timeout.timeout timeout (restore (perform job)))
case resultOrException of
Left exception -> Queue.jobDidFail job exception
Right Nothing -> Queue.jobDidTimeout job
Right (Just _) -> Queue.jobDidSucceed job

startLoop
Nothing -> pure ()
Pool.link asyncJob
modifyIORef allJobs (asyncJob:)

-- Start all jobs in the queue
Expand Down
6 changes: 5 additions & 1 deletion IHP/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import qualified Control.Concurrent.Async as Async
import qualified Data.List as List
import qualified Data.ByteString.Char8 as ByteString
import qualified Network.Wai.Middleware.Cors as Cors
import qualified Control.Exception as Exception

import qualified System.Environment as Env
import qualified System.Directory as Directory
Expand Down Expand Up @@ -69,7 +70,10 @@ withBackgroundWorkers frameworkConfig app = do
let jobWorkers = Job.workers RootApplication
let isDevelopment = get #environment frameworkConfig == Env.Development
if isDevelopment && not (isEmpty jobWorkers)
then Async.withAsync (let ?context = frameworkConfig in Job.runJobWorkers jobWorkers) (\_ -> app)
then do
workerAsync <- async (let ?context = frameworkConfig in Job.runJobWorkersKillOnExit jobWorkers)
Async.link workerAsync
app
else app

-- | Returns a middleware that returns files stored in the app's @static/@ directory and IHP's own @static/@ directory
Expand Down

0 comments on commit f269bc9

Please sign in to comment.