Skip to content

Commit

Permalink
refactor 'pollQuery' to have a hook to process 'PollDetails'
Browse files Browse the repository at this point in the history
  • Loading branch information
ecthiender committed Jul 16, 2020
1 parent 0dddbe9 commit 54a040e
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 26 deletions.
6 changes: 3 additions & 3 deletions server/src-exec/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ runApp env (HGEOptionsG rci hgeCmd) =
HCServe serveOptions -> do
(initCtx, initTime) <- initialiseCtx env hgeCmd rci
let shutdownApp = return ()
-- Catches the SIGTERM signal and initiates a graceful shutdown.
-- Catches the SIGTERM signal and initiates a graceful shutdown.
-- Graceful shutdown for regular HTTP requests is already implemented in
-- Warp, and is triggered by invoking the 'closeSocket' callback.
-- We only catch the SIGTERM signal once, that is, if the user hits CTRL-C
Expand All @@ -50,7 +50,7 @@ runApp env (HGEOptionsG rci hgeCmd) =
Signals.sigTERM
(Signals.CatchOnce (shutdownGracefully initCtx))
Nothing
runHGEServer env serveOptions initCtx Nothing initTime shutdownApp
runHGEServer env serveOptions initCtx Nothing initTime shutdownApp Nothing

HCExport -> do
(initCtx, _) <- initialiseCtx env hgeCmd rci
Expand All @@ -68,7 +68,7 @@ runApp env (HGEOptionsG rci hgeCmd) =
let sqlGenCtx = SQLGenCtx False
res <- runAsAdmin _icPgPool sqlGenCtx _icHttpManager $ do
schemaCache <- buildRebuildableSchemaCache env
execQuery env queryBs
execQuery env queryBs
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
& runHasSystemDefinedT (SystemDefined False)
& runCacheRWT schemaCache
Expand Down
7 changes: 5 additions & 2 deletions server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import Hasura.Session

import qualified Hasura.Tracing as Tracing
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL

data ExitCode
= InvalidEnvironmentVariableOptionsError
Expand Down Expand Up @@ -305,8 +306,9 @@ runHGEServer
-- ^ start time
-> IO ()
-- ^ shutdown function
-> EL.ProcessLiveQueryMetrics
-> m ()
runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = do
runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp processLQMetrics = do
-- Comment this to enable expensive assertions from "GHC.AssertNF". These
-- will log lines to STDOUT containing "not in normal form". In the future we
-- could try to integrate this into our tests. For now this is a development
Expand All @@ -323,7 +325,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = d

authMode <- either (printErrExit AuthConfigurationError . T.unpack) return authModeRes

_idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $
_idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $
ourIdleGC logger (seconds 0.3) (seconds 10) (seconds 60)

HasuraApp app cacheRef cacheInitTime stopWsServer <- flip onException (flushLogger loggerCtx) $
Expand All @@ -346,6 +348,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = d
soLiveQueryOpts
soPlanCacheOptions
soResponseInternalErrorsConfig
processLQMetrics
_icSchemaCache

-- log inconsistent schema objects
Expand Down
25 changes: 16 additions & 9 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll (
, PollDetails(..)
, BatchExecutionDetails(..)
, CohortExecutionDetails(..)
, ProcessLiveQueryMetrics

-- * Cohorts
, Cohort(..)
Expand Down Expand Up @@ -382,6 +383,8 @@ pollDetailMinimal (PollDetails{..}) =
instance L.ToEngineLog PollDetails L.Hasura where
toEngineLog pl = (L.LevelInfo, L.ELTLivequeryPollerLog, pollDetailMinimal pl)

type ProcessLiveQueryMetrics = Maybe (PollDetails -> IO ())

-- | Where the magic happens: the top-level action run periodically by each
-- active 'Poller'. This needs to be async exception safe.
pollQuery
Expand All @@ -391,8 +394,9 @@ pollQuery
-> PGExecCtx
-> MultiplexedQuery
-> CohortMap
-> ProcessLiveQueryMetrics
-> IO ()
pollQuery logger pollerId lqOpts pgExecCtx pgQuery cohortMap = do
pollQuery logger pollerId lqOpts pgExecCtx pgQuery cohortMap processLQMetrics = do
(totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do

-- snapshot the current cohorts and split them into batches
Expand Down Expand Up @@ -428,14 +432,17 @@ pollQuery logger pollerId lqOpts pgExecCtx pgQuery cohortMap = do

pure (snapshotTime, batchesDetails)

L.unLogger logger $ PollDetails
{ _pdPollerId = pollerId
, _pdGeneratedSql = unMultiplexedQuery pgQuery
, _pdSnapshotTime = snapshotTime
, _pdBatches = batchesDetails
, _pdLiveQueryOptions = lqOpts
, _pdTotalTime = totalTime
}
let pollDetails = PollDetails
{ _pdPollerId = pollerId
, _pdGeneratedSql = unMultiplexedQuery pgQuery
, _pdSnapshotTime = snapshotTime
, _pdBatches = batchesDetails
, _pdLiveQueryOptions = lqOpts
, _pdTotalTime = totalTime
}
case processLQMetrics of
Nothing -> L.unLogger logger pollDetails
Just processMetrics -> processMetrics pollDetails
where
LiveQueriesOptions batchSize _ = lqOpts

Expand Down
5 changes: 3 additions & 2 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ addLiveQuery
-> LiveQueryPlan
-> OnChange
-- ^ the action to be executed when result changes
-> ProcessLiveQueryMetrics
-> IO LiveQueryId
addLiveQuery logger subscriberMetadata lqState plan onResultAction = do
addLiveQuery logger subscriberMetadata lqState plan onResultAction processLQMetrics = do
-- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here!

-- disposable UUIDs:
Expand Down Expand Up @@ -100,7 +101,7 @@ addLiveQuery logger subscriberMetadata lqState plan onResultAction = do
onJust handlerM $ \handler -> do
pollerId <- PollerId <$> UUID.nextRandom
threadRef <- forkImmortal ("pollQuery." <> show pollerId) logger $ forever $ do
pollQuery logger pollerId lqOpts pgExecCtx query $ _pCohorts handler
pollQuery logger pollerId lqOpts pgExecCtx query (_pCohorts handler) processLQMetrics
sleep $ unRefetchInterval refetchInterval
let !pState = PollerIOState threadRef pollerId
$assertNFHere pState -- so we don't write thunks to mutable vars
Expand Down
30 changes: 22 additions & 8 deletions server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,21 @@ onConn (L.Logger logger) corsPolicy wsId requestHead ipAddress = do
<> "HASURA_GRAPHQL_WS_READ_COOKIE to force read cookie when CORS is disabled."

onStart
:: forall m. (HasVersion, MonadIO m, E.MonadGQLExecutionCheck m, MonadQueryLog m, Tracing.MonadTrace m, MonadExecuteQuery m)
=> Env.Environment -> WSServerEnv -> WSConn -> StartMsg -> m ()
onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
:: forall m.
( HasVersion
, MonadIO m
, E.MonadGQLExecutionCheck m
, MonadQueryLog m
, Tracing.MonadTrace m
, MonadExecuteQuery m
)
=> Env.Environment
-> LQ.ProcessLiveQueryMetrics
-> WSServerEnv
-> WSConn
-> StartMsg
-> m ()
onStart env processLQMetrics serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
timerTot <- startTimer
opM <- liftIO $ STM.atomically $ STMMap.lookup opId opMap

Expand Down Expand Up @@ -372,7 +384,7 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
]
-- NOTE!: we mask async exceptions higher in the call stack, but it's
-- crucial we don't lose lqId after addLiveQuery returns successfully.
!lqId <- liftIO $ LQ.addLiveQuery logger subscriberMetadata lqMap lqOp liveQOnChange
!lqId <- liftIO $ LQ.addLiveQuery logger subscriberMetadata lqMap lqOp liveQOnChange processLQMetrics
let !opName = _grOperationName q
liftIO $ $assertNFHere $! (lqId, opName) -- so we don't write thunks to mutable vars

Expand Down Expand Up @@ -514,10 +526,11 @@ onMessage
, MonadExecuteQuery m
)
=> Env.Environment
-> LQ.ProcessLiveQueryMetrics
-> AuthMode
-> WSServerEnv
-> WSConn -> BL.ByteString -> m ()
onMessage env authMode serverEnv wsConn msgRaw = Tracing.runTraceT "websocket" do
onMessage env processLQMetrics authMode serverEnv wsConn msgRaw = Tracing.runTraceT "websocket" do
case J.eitherDecode msgRaw of
Left e -> do
let err = ConnErrMsg $ "parsing ClientMessage failed: " <> T.pack e
Expand All @@ -528,7 +541,7 @@ onMessage env authMode serverEnv wsConn msgRaw = Tracing.runTraceT "websocket" d
CMConnInit params -> onConnInit (_wseLogger serverEnv)
(_wseHManager serverEnv)
wsConn authMode params
CMStart startMsg -> onStart env serverEnv wsConn startMsg
CMStart startMsg -> onStart env processLQMetrics serverEnv wsConn startMsg
CMStop stopMsg -> liftIO $ onStop serverEnv wsConn stopMsg
-- The idea is cleanup will be handled by 'onClose', but...
-- NOTE: we need to close the websocket connection when we receive the
Expand Down Expand Up @@ -699,18 +712,19 @@ createWSServerApp
, MonadExecuteQuery m
)
=> Env.Environment
-> LQ.ProcessLiveQueryMetrics
-> AuthMode
-> WSServerEnv
-> WS.HasuraServerApp m
-- -- ^ aka generalized 'WS.ServerApp'
createWSServerApp env authMode serverEnv = \ !ipAddress !pendingConn ->
createWSServerApp env processLQMetrics authMode serverEnv = \ !ipAddress !pendingConn ->
WS.createServerApp (_wseServer serverEnv) handlers ipAddress pendingConn
where
handlers =
WS.WSHandlers
-- Mask async exceptions during event processing to help maintain integrity of mutable vars:
(\rid rh ip -> mask_ $ onConn (_wseLogger serverEnv) (_wseCorsPolicy serverEnv) rid rh ip)
(\conn bs -> mask_ $ onMessage env authMode serverEnv conn bs)
(\conn bs -> mask_ $ onMessage env processLQMetrics authMode serverEnv conn bs)
(\conn -> mask_ $ onClose (_wseLogger serverEnv) (_wseLiveQMap serverEnv) conn)

stopWSServerApp :: WSServerEnv -> IO ()
Expand Down
8 changes: 6 additions & 2 deletions server/src-lib/Hasura/Server/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import Hasura.SQL.Types

import qualified Hasura.GraphQL.Execute as E
import qualified Hasura.GraphQL.Execute.LiveQuery as EL
import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL
import qualified Hasura.GraphQL.Explain as GE
import qualified Hasura.GraphQL.Transport.HTTP as GH
import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH
Expand Down Expand Up @@ -578,10 +579,12 @@ mkWaiApp
-> EL.LiveQueriesOptions
-> E.PlanCacheOptions
-> ResponseInternalErrorsConfig
-> EL.ProcessLiveQueryMetrics
-- ^ function to pass to live query poller
-> (RebuildableSchemaCache Run, Maybe UTCTime)
-> m HasuraApp
mkWaiApp env isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpManager mode corsCfg enableConsole consoleAssetsDir
enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig (schemaCache, cacheBuiltTime) = do
enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig processLQMetrics (schemaCache, cacheBuiltTime) = do

(planCache, schemaCacheRef) <- initialiseCache
let getSchemaCache = first lastBuiltSchemaCache <$> readIORef (_scrCache schemaCacheRef)
Expand Down Expand Up @@ -621,7 +624,8 @@ mkWaiApp env isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpMana
Spock.spockAsApp $ Spock.spockT lowerIO $
httpApp corsCfg serverCtx enableConsole consoleAssetsDir enableTelemetry

let wsServerApp = WS.createWSServerApp env mode wsServerEnv -- TODO: Lyndon: Can we pass environment through wsServerEnv?
-- TODO: Lyndon: Can we pass environment through wsServerEnv?
let wsServerApp = WS.createWSServerApp env processLQMetrics mode wsServerEnv
stopWSServer = WS.stopWSServerApp wsServerEnv

waiApp <- liftWithStateless $ \lowerIO ->
Expand Down

0 comments on commit 54a040e

Please sign in to comment.