Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ runMain options = do
maybe (return ()) (\threadState -> Logic.enqueueEvent (projectThreadQueue threadState) event) $
Map.lookup projectInfo projectThreadState

let tryEnqueueProjectEvent projectInfo event = do
-- Call the corresponding enqueue function if the project exists,
-- otherwise drop the event on the floor.
maybe (return False) (\threadState -> Logic.tryEnqueueEvent (projectThreadQueue threadState) event) $
Map.lookup projectInfo projectThreadState

-- Start a worker thread to put the GitHub webhook events in the right queue.
ghThread <- Async.async $ runStdoutLoggingT $ runGithubEventLoop ghQueue enqueueEvent

Expand Down Expand Up @@ -249,7 +255,7 @@ runMain options = do
-- TODO: Do this in a cleaner way.
infos = getProjectInfo <$> Config.projects config
putStrLn $ "Listening for webhooks on port " ++ show port ++ "."
runServer <- fst <$> buildServer port tlsConfig infos secret ghTryEnqueue getProjectState getOwnerState
runServer <- fst <$> buildServer port tlsConfig infos secret ghTryEnqueue tryEnqueueProjectEvent getProjectState getOwnerState
serverThread <- Async.async runServer
metricsThread <- runMetricsThread config

Expand Down
147 changes: 96 additions & 51 deletions src/Logic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Logic (
RetrieveEnvironment (..),
dequeueEvent,
enqueueEvent,
tryEnqueueEvent,
enqueueStopSignal,
ensureCloned,
handleEvent,
Expand All @@ -36,7 +37,7 @@ module Logic (
)
where

import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, readTBQueue, writeTBQueue)
import Control.Concurrent.STM.TBQueue (TBQueue, isFullTBQueue, newTBQueue, readTBQueue, writeTBQueue)
import Control.Concurrent.STM.TMVar (TMVar, newTMVarIO, readTMVar, swapTMVar)
import Control.Exception (assert)
import Control.Monad (foldM, unless, void, when, (>=>))
Expand Down Expand Up @@ -92,9 +93,11 @@ import Project (
MergeCommand (..),
MergeWindow (..),
Priority (..),
ProjectState,
ProjectState (..),
PullRequest,
PullRequestStatus (..),
pause,
resume,
summarize,
supersedes,
)
Expand Down Expand Up @@ -346,6 +349,8 @@ data Event
BuildStatusChanged Sha Context BuildStatus
| Synchronize
| ClockTick UTCTime
| Pause
| Resume
deriving (Eq, Show)

type EventQueue = TBQueue (Maybe Event)
Expand All @@ -359,6 +364,17 @@ newEventQueue capacity = atomically $ newTBQueue capacity
enqueueEvent :: EventQueue -> Event -> IO ()
enqueueEvent queue event = atomically $ writeTBQueue queue $ Just event

-- Enqueues the event if the queue is not full. Returns whether the event has
-- been enqueued. This function does not block.
tryEnqueueEvent :: EventQueue -> Event -> IO Bool
tryEnqueueEvent queue event = atomically $ do
isFull <- isFullTBQueue queue
if isFull
then return False
else -- Normally writeTBQueue would block if the queue is full, but at this point
-- we know that the queue is not full, so it will return immediately.
writeTBQueue queue (Just event) >> return True

-- Signals the event loop to stop after processing all events
-- currently in the queue.
enqueueStopSignal :: EventQueue -> IO ()
Expand Down Expand Up @@ -417,6 +433,8 @@ handleEventInternal triggerConfig mergeWindowExemption featureFreezeWindow timeo
PushPerformed branch sha -> handleTargetChanged branch sha
Synchronize -> synchronizeState
ClockTick currTime -> handleClockTickUpdate timeouts currTime
Pause -> handlePause
Resume -> handleResume timeouts

handlePullRequestOpenedByUser
:: forall es
Expand Down Expand Up @@ -478,54 +496,69 @@ handlePullRequestCommitChanged prId newSha state =

-- | Try to push the final result of a pull request to the target branch.
tryPromotePullRequest :: (Action :> es, RetrieveEnvironment :> es, TimeOperation :> es) => PullRequest -> PullRequestId -> ProjectState -> Eff es ProjectState
tryPromotePullRequest pullRequest prId state = do
pushResult <- case Pr.integrationStatus pullRequest of
-- If we only need to promote, we can just try pushing.
Pr.Promote _ sha -> tryPromote sha
-- If we also want to tag the PR we additionally need to handle the result of promoting and tagging.
-- Specifically, we need to leave a comment about the result of the tag.
Pr.PromoteAndTag _ sha@(Sha shaText) tagName tagMessage -> do
(tagResult, pushResult) <- tryPromoteWithTag sha tagName tagMessage
let
approval = fromJust $ Pr.approval pullRequest
Username approvedBy = approver approval
approvalKind = Pr.approvedFor approval
config <- getProjectConfig
when (pushResult == PushOk) $
leaveComment prId . (<>) ("@" <> approvedBy <> " ") $
case tagResult of
Left err -> "Sorry, I could not tag your PR. " <> err
Right (TagName t) -> do
let link = format "[{}](https://github.com/{}/{}/releases/tag/{})" (t, owner config, repository config, t)
"I tagged your PR with "
<> link
<> ". "
<> if Pr.needsDeploy approvalKind
then "It is scheduled for autodeploy!"
else Text.concat ["Please wait for the build of ", shaText, " to pass and don't forget to deploy it!"]
pure pushResult
_ -> error ""
case pushResult of
-- If the push worked, then this was the final stage of the pull request.
-- GitHub will mark the pull request as closed, and when we receive that
-- event, we delete the pull request from the state. Until then, reset
-- the integration candidate, so we proceed with the next pull request.
PushOk -> do
cleanupTestBranch prId
registerMergedPR
currTime <- getDateTime
pure $
Pr.updatePullRequests (unspeculateConflictsAfter pullRequest) $
Pr.updatePullRequests (unspeculateFailuresAfter pullRequest) $
Pr.addPromotedPullRequest pullRequest currTime $
Pr.setIntegrationStatus prId Promoted state
-- If something was pushed to the target branch while the candidate was
-- being tested, try to integrate again and hope that next time the push
-- succeeds. We also cancel integrations in the merge train.
-- These should be automatically restarted when we 'proceed'.
PushRejected _why ->
tryIntegratePullRequest prId $
unintegrateAfter prId state
tryPromotePullRequest pullRequest prId state =
let
approval = fromJust $ Pr.approval pullRequest
priority = Pr.approvalPriority approval
in
-- We don't promote during pauses, with the exception for high priority PRs. After resumption,
-- we retry all promotions
if paused state && priority == Normal
then do
config <- getProjectConfig
let message = "Your PR is ready to be merged into " <> Config.branch config <> ", but merging has been paused"
case Pr.lookupPullRequest prId state of
Just pr | not (Pr.pausedMessageSent pr) -> do
leaveComment prId message
return (Pr.updatePullRequest prId (\pr' -> pr'{Pr.pausedMessageSent = True}) state)
_ -> pure state
else do
pushResult <- case Pr.integrationStatus pullRequest of
-- If we only need to promote, we can just try pushing.
Pr.Promote _ sha -> tryPromote sha
-- If we also want to tag the PR we additionally need to handle the result of promoting and tagging.
-- Specifically, we need to leave a comment about the result of the tag.
Pr.PromoteAndTag _ sha@(Sha shaText) tagName tagMessage -> do
(tagResult, pushResult) <- tryPromoteWithTag sha tagName tagMessage
let
Username approvedBy = approver approval
approvalKind = Pr.approvedFor approval
config <- getProjectConfig
when (pushResult == PushOk) $
leaveComment prId . (<>) ("@" <> approvedBy <> " ") $
case tagResult of
Left err -> "Sorry, I could not tag your PR. " <> err
Right (TagName t) -> do
let link = format "[{}](https://github.com/{}/{}/releases/tag/{})" (t, owner config, repository config, t)
"I tagged your PR with "
<> link
<> ". "
<> if Pr.needsDeploy approvalKind
then "It is scheduled for autodeploy!"
else Text.concat ["Please wait for the build of ", shaText, " to pass and don't forget to deploy it!"]
pure pushResult
_ -> error ""
case pushResult of
-- If the push worked, then this was the final stage of the pull request.
-- GitHub will mark the pull request as closed, and when we receive that
-- event, we delete the pull request from the state. Until then, reset
-- the integration candidate, so we proceed with the next pull request.
PushOk -> do
cleanupTestBranch prId
registerMergedPR
currTime <- getDateTime
pure $
Pr.updatePullRequests (unspeculateConflictsAfter pullRequest) $
Pr.updatePullRequests (unspeculateFailuresAfter pullRequest) $
Pr.addPromotedPullRequest pullRequest currTime $
Pr.setIntegrationStatus prId Promoted state
-- If something was pushed to the target branch while the candidate was
-- being tested, try to integrate again and hope that next time the push
-- succeeds. We also cancel integrations in the merge train.
-- These should be automatically restarted when we 'proceed'.
PushRejected _why ->
tryIntegratePullRequest prId $
unintegrateAfter prId state

-- | Describe what caused the PR to close.
prClosingMessage :: PRCloseCause -> Text
Expand Down Expand Up @@ -612,7 +645,10 @@ handleTargetChanged (BaseBranch baseBranch) sha state
handleTargetChanged _ _ state = pure state

handleClockTickUpdate :: (Action :> es, RetrieveEnvironment :> es, TimeOperation :> es) => Timeouts -> UTCTime -> ProjectState -> Eff es ProjectState
handleClockTickUpdate timeouts currTime state = do
handleClockTickUpdate = handleStalePromotions

handleStalePromotions :: (Action :> es, RetrieveEnvironment :> es, TimeOperation :> es) => Timeouts -> UTCTime -> ProjectState -> Eff es ProjectState
handleStalePromotions timeouts currTime state = do
let prsToPromote = Pr.filterPullRequestsBy Pr.awaitingPromotion state
state' <- foldM update state prsToPromote
pure $ Pr.filterRecentlyPromoted ((<) currTime . flip Time.addTime (Config.rememberTimeout timeouts)) state'
Expand All @@ -626,6 +662,15 @@ handleClockTickUpdate timeouts currTime state = do
then tryPromotePullRequest pr prId state'
else pure state'

handlePause :: Action :> es => ProjectState -> Eff es ProjectState
handlePause = pure . pause

-- set paused to false and handle stale promotions
handleResume :: (Action :> es, RetrieveEnvironment :> es, TimeOperation :> es) => Timeouts -> ProjectState -> Eff es ProjectState
handleResume timeouts state = do
currentTime <- getDateTime
handleStalePromotions timeouts currentTime (resume state)

-- Mark the pull request as approved, and leave a comment to acknowledge that.
approvePullRequest :: PullRequestId -> Approval -> ProjectState -> Eff es ProjectState
approvePullRequest pr approval =
Expand Down
12 changes: 12 additions & 0 deletions src/Project.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ module Project (
getQueuePosition,
insertPullRequest,
integrationSha,
pause,
resume,
promotionSha,
promotionTime,
awaitingPromotion,
Expand Down Expand Up @@ -256,6 +258,7 @@ data PullRequest = PullRequest
, integrationStatus :: IntegrationStatus
, integrationAttempts :: [Sha]
, needsFeedback :: Bool
, pausedMessageSent :: Bool
}
deriving (Eq, Show, Generic)

Expand All @@ -264,6 +267,7 @@ data ProjectState = ProjectState
, pullRequestApprovalIndex :: Int
, mandatoryChecks :: MandatoryChecks
, recentlyPromoted :: [PromotedPullRequest]
, paused :: Bool
}
deriving (Eq, Show, Generic)

Expand Down Expand Up @@ -341,6 +345,7 @@ emptyProjectState =
, pullRequestApprovalIndex = 0
, mandatoryChecks = mempty
, recentlyPromoted = []
, paused = False
}

-- Inserts a new pull request into the project, with approval set to Nothing,
Expand All @@ -366,6 +371,7 @@ insertPullRequest (PullRequestId n) prBranch bsBranch prSha prTitle prAuthor sta
, integrationStatus = NotIntegrated
, integrationAttempts = []
, needsFeedback = False
, pausedMessageSent = False
}
in state{pullRequests = IntMap.insert n pullRequest $ pullRequests state}

Expand Down Expand Up @@ -744,3 +750,9 @@ isBuildFailed _ = Nothing
isBuildStarted :: BuildStatus -> Maybe BuildStatus
isBuildStarted s@(BuildStarted _) = Just s
isBuildStarted _ = Nothing

pause :: ProjectState -> ProjectState
pause state = state{paused = True}

resume :: ProjectState -> ProjectState
resume state = updatePullRequests (\pr -> pr{pausedMessageSent = False}) state{paused = False}
50 changes: 45 additions & 5 deletions src/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import Network.Wai.Handler.Warp qualified as Warp
import Network.Wai.Handler.WarpTLS qualified as Warp

import Configuration (TlsConfiguration)
import Logic (Event (..))
import Project (Owner, ProjectInfo (ProjectInfo), ProjectState)

import Configuration qualified as Config
import GHC.IO.Encoding ()
import Github qualified
import WebInterface qualified

Expand All @@ -43,17 +45,21 @@ router
:: [ProjectInfo]
-> Text
-> (Github.WebhookEvent -> ActionM ())
-> ActionM ()
-> ActionM ()
-> (ProjectInfo -> Maybe (IO ProjectState))
-> (Owner -> IO [(ProjectInfo, ProjectState)])
-> ScottyM ()
router infos ghSecret serveEnqueueEvent getProjectState getOwnerState = do
router infos ghSecret serveEnqueueEvent servePauseEvent serveResumeEvent getProjectState getOwnerState = do
get "/" $ serveIndex infos
get styleRoute $ serveStyles
post "/hook/github" $ withSignatureCheck ghSecret $ serveGithubWebhook serveEnqueueEvent
get "/hook/github" $ serveWebhookDocs
get "/:owner" $ serveWebInterfaceOwner getOwnerState
get "/:owner/:repo" $ serveWebInterfaceProject getProjectState
get "/api/:owner/:repo" $ serveAPIproject getProjectState
post "/pause/:owner/:repo" $ servePauseEvent
post "/resume/:owner/:repo" $ serveResumeEvent
notFound $ serveNotFound

styleRoute :: RoutePattern
Expand Down Expand Up @@ -220,6 +226,33 @@ serveAPIproject getProjectState = do
setHeader "Content-Type" "application/json; charset=utf-8"
raw $ Aeson.encode state

servePause :: (ProjectInfo -> Event -> IO Bool) -> ActionM ()
servePause enqueueProjectEvent = do
owner <- captureParam "owner"
repo <- captureParam "repo"
let info = ProjectInfo owner repo

enqueueSuccess <- liftIO $ enqueueProjectEvent info Pause

if enqueueSuccess
then text "OK"
else do
status serviceUnavailable503
text "error: action queue is full"

serveResume :: (ProjectInfo -> Event -> IO Bool) -> ActionM ()
serveResume enqueueProjectEvent = do
owner <- captureParam "owner"
repo <- captureParam "repo"
let info = ProjectInfo owner repo
enqueueSuccess <- liftIO $ enqueueProjectEvent info Resume

if enqueueSuccess
then text "OK"
else do
status serviceUnavailable503
text "error: action queue is full"

serveNotFound :: ActionM ()
serveNotFound = do
status notFound404
Expand Down Expand Up @@ -259,10 +292,11 @@ buildServer
-> [ProjectInfo]
-> Text
-> (Github.WebhookEvent -> IO Bool)
-> (ProjectInfo -> Event -> IO Bool)
-> (ProjectInfo -> Maybe (IO ProjectState))
-> (Owner -> IO [(ProjectInfo, ProjectState)])
-> IO (IO (), IO ())
buildServer port tlsConfig infos ghSecret tryEnqueueEvent getProjectState getOwnerState = do
buildServer port tlsConfig infos ghSecret tryEnqueueGithubEvent tryEnqueueProjectEvent getProjectState getOwnerState = do
-- Create a semaphore that will be signalled when the server is ready.
readySem <- atomically $ newTSem 0
let
Expand All @@ -273,13 +307,19 @@ buildServer port tlsConfig infos ghSecret tryEnqueueEvent getProjectState getOwn
-- Make Warp signal the semaphore when it is ready to serve requests.
settings = warpSettings port signalReady

serveEnqueueEvent :: Github.WebhookEvent -> ActionM ()
serveEnqueueEvent = serveTryEnqueueEvent tryEnqueueEvent
serveEnqueueGithubEvent :: Github.WebhookEvent -> ActionM ()
serveEnqueueGithubEvent = serveTryEnqueueEvent tryEnqueueGithubEvent

servePauseEvent :: ActionM ()
servePauseEvent = servePause tryEnqueueProjectEvent

serveResumeEvent :: ActionM ()
serveResumeEvent = serveResume tryEnqueueProjectEvent

-- Build the Scotty app, but do not start serving yet, as that would never
-- return, so we wouldn't have the opportunity to return the 'blockUntilReady'
-- function to the caller.
app <- scottyApp $ router infos ghSecret serveEnqueueEvent getProjectState getOwnerState
app <- scottyApp $ router infos ghSecret serveEnqueueGithubEvent servePauseEvent serveResumeEvent getProjectState getOwnerState
let runServer = runServerMaybeTls tlsConfig settings app

-- Return two IO actions: one that will run the server (and never return),
Expand Down
Loading