Skip to content

Commit

Permalink
server: refactor 'pollQuery' to have a hook to process 'PollDetails' (#…
Browse files Browse the repository at this point in the history
…5391)

Co-authored-by: Vamshi Surabhi <0x777@users.noreply.github.com>
  • Loading branch information
ecthiender and 0x777 committed Jul 16, 2020
1 parent d6dab9d commit 1eb36bb
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 23 deletions.
6 changes: 3 additions & 3 deletions server/src-exec/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ runApp env (HGEOptionsG rci hgeCmd) =
HCServe serveOptions -> do
(initCtx, initTime) <- initialiseCtx env hgeCmd rci
let shutdownApp = return ()
-- Catches the SIGTERM signal and initiates a graceful shutdown.
-- Catches the SIGTERM signal and initiates a graceful shutdown.
-- Graceful shutdown for regular HTTP requests is already implemented in
-- Warp, and is triggered by invoking the 'closeSocket' callback.
-- We only catch the SIGTERM signal once, that is, if the user hits CTRL-C
Expand All @@ -50,7 +50,7 @@ runApp env (HGEOptionsG rci hgeCmd) =
Signals.sigTERM
(Signals.CatchOnce (shutdownGracefully initCtx))
Nothing
runHGEServer env serveOptions initCtx Nothing initTime shutdownApp
runHGEServer env serveOptions initCtx Nothing initTime shutdownApp Nothing

HCExport -> do
(initCtx, _) <- initialiseCtx env hgeCmd rci
Expand All @@ -68,7 +68,7 @@ runApp env (HGEOptionsG rci hgeCmd) =
let sqlGenCtx = SQLGenCtx False
res <- runAsAdmin _icPgPool sqlGenCtx _icHttpManager $ do
schemaCache <- buildRebuildableSchemaCache env
execQuery env queryBs
execQuery env queryBs
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
& runHasSystemDefinedT (SystemDefined False)
& runCacheRWT schemaCache
Expand Down
7 changes: 5 additions & 2 deletions server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import Hasura.Session

import qualified Hasura.Tracing as Tracing
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL

data ExitCode
= InvalidEnvironmentVariableOptionsError
Expand Down Expand Up @@ -305,8 +306,9 @@ runHGEServer
-- ^ start time
-> IO ()
-- ^ shutdown function
-> Maybe EL.LiveQueryPostPollHook
-> m ()
runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = do
runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp postPollHook = do
-- Comment this to enable expensive assertions from "GHC.AssertNF". These
-- will log lines to STDOUT containing "not in normal form". In the future we
-- could try to integrate this into our tests. For now this is a development
Expand All @@ -323,7 +325,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = d

authMode <- either (printErrExit AuthConfigurationError . T.unpack) return authModeRes

_idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $
_idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $
ourIdleGC logger (seconds 0.3) (seconds 10) (seconds 60)

HasuraApp app cacheRef cacheInitTime stopWsServer <- flip onException (flushLogger loggerCtx) $
Expand All @@ -346,6 +348,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = d
soLiveQueryOpts
soPlanCacheOptions
soResponseInternalErrorsConfig
postPollHook
_icSchemaCache

-- log inconsistent schema objects
Expand Down
32 changes: 21 additions & 11 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll (
, PollDetails(..)
, BatchExecutionDetails(..)
, CohortExecutionDetails(..)
, LiveQueryPostPollHook
, defaultLiveQueryPostPollHook

-- * Cohorts
, Cohort(..)
Expand All @@ -28,6 +30,7 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll (
, newSubscriberId
, SubscriberMetadata
, mkSubscriberMetadata
, unSubscriberMetadata
, SubscriberMap
, OnChange
, LGQResponse
Expand Down Expand Up @@ -382,17 +385,23 @@ pollDetailMinimal (PollDetails{..}) =
instance L.ToEngineLog PollDetails L.Hasura where
toEngineLog pl = (L.LevelInfo, L.ELTLivequeryPollerLog, pollDetailMinimal pl)

type LiveQueryPostPollHook = PollDetails -> IO ()

-- the default LiveQueryPostPollHook
defaultLiveQueryPostPollHook :: L.Logger L.Hasura -> LiveQueryPostPollHook
defaultLiveQueryPostPollHook logger pd = L.unLogger logger pd

-- | Where the magic happens: the top-level action run periodically by each
-- active 'Poller'. This needs to be async exception safe.
pollQuery
:: L.Logger L.Hasura
-> PollerId
:: PollerId
-> LiveQueriesOptions
-> PGExecCtx
-> MultiplexedQuery
-> CohortMap
-> LiveQueryPostPollHook
-> IO ()
pollQuery logger pollerId lqOpts pgExecCtx pgQuery cohortMap = do
pollQuery pollerId lqOpts pgExecCtx pgQuery cohortMap postPollHook = do
(totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do

-- snapshot the current cohorts and split them into batches
Expand Down Expand Up @@ -428,14 +437,15 @@ pollQuery logger pollerId lqOpts pgExecCtx pgQuery cohortMap = do

pure (snapshotTime, batchesDetails)

L.unLogger logger $ PollDetails
{ _pdPollerId = pollerId
, _pdGeneratedSql = unMultiplexedQuery pgQuery
, _pdSnapshotTime = snapshotTime
, _pdBatches = batchesDetails
, _pdLiveQueryOptions = lqOpts
, _pdTotalTime = totalTime
}
let pollDetails = PollDetails
{ _pdPollerId = pollerId
, _pdGeneratedSql = unMultiplexedQuery pgQuery
, _pdSnapshotTime = snapshotTime
, _pdBatches = batchesDetails
, _pdLiveQueryOptions = lqOpts
, _pdTotalTime = totalTime
}
postPollHook pollDetails
where
LiveQueriesOptions batchSize _ = lqOpts

Expand Down
14 changes: 9 additions & 5 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Hasura.GraphQL.Execute.LiveQuery.State
, dumpLiveQueriesState

, LiveQueryId
, LiveQueryPostPollHook
, addLiveQuery
, removeLiveQuery
) where
Expand Down Expand Up @@ -41,13 +42,16 @@ data LiveQueriesState
{ _lqsOptions :: !LiveQueriesOptions
, _lqsPGExecTx :: !PGExecCtx
, _lqsLiveQueryMap :: !PollerMap
, _lqsPostPollHook :: !LiveQueryPostPollHook
-- ^ A hook function which is run after each fetch cycle
}

initLiveQueriesState :: LiveQueriesOptions -> PGExecCtx -> IO LiveQueriesState
initLiveQueriesState options pgCtx = LiveQueriesState options pgCtx <$> STMMap.newIO
initLiveQueriesState :: LiveQueriesOptions -> PGExecCtx -> LiveQueryPostPollHook -> IO LiveQueriesState
initLiveQueriesState options pgCtx pollHook =
LiveQueriesState options pgCtx <$> STMMap.newIO <*> pure pollHook

dumpLiveQueriesState :: Bool -> LiveQueriesState -> IO J.Value
dumpLiveQueriesState extended (LiveQueriesState opts _ lqMap) = do
dumpLiveQueriesState extended (LiveQueriesState opts _ lqMap _) = do
lqMapJ <- dumpPollerMap extended lqMap
return $ J.object
[ "options" J..= opts
Expand Down Expand Up @@ -100,15 +104,15 @@ addLiveQuery logger subscriberMetadata lqState plan onResultAction = do
onJust handlerM $ \handler -> do
pollerId <- PollerId <$> UUID.nextRandom
threadRef <- forkImmortal ("pollQuery." <> show pollerId) logger $ forever $ do
pollQuery logger pollerId lqOpts pgExecCtx query $ _pCohorts handler
pollQuery pollerId lqOpts pgExecCtx query (_pCohorts handler) postPollHook
sleep $ unRefetchInterval refetchInterval
let !pState = PollerIOState threadRef pollerId
$assertNFHere pState -- so we don't write thunks to mutable vars
STM.atomically $ STM.putTMVar (_pIOState handler) pState

pure $ LiveQueryId handlerId cohortKey subscriberId
where
LiveQueriesState lqOpts pgExecCtx lqMap = lqState
LiveQueriesState lqOpts pgExecCtx lqMap postPollHook = lqState
LiveQueriesOptions _ refetchInterval = lqOpts
LiveQueryPlan (ParameterizedLiveQueryPlan role query) cohortKey = plan

Expand Down
7 changes: 5 additions & 2 deletions server/src-lib/Hasura/Server/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import Hasura.SQL.Types

import qualified Hasura.GraphQL.Execute as E
import qualified Hasura.GraphQL.Execute.LiveQuery as EL
import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL
import qualified Hasura.GraphQL.Explain as GE
import qualified Hasura.GraphQL.Transport.HTTP as GH
import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH
Expand Down Expand Up @@ -578,18 +579,20 @@ mkWaiApp
-> EL.LiveQueriesOptions
-> E.PlanCacheOptions
-> ResponseInternalErrorsConfig
-> Maybe EL.LiveQueryPostPollHook
-> (RebuildableSchemaCache Run, Maybe UTCTime)
-> m HasuraApp
mkWaiApp env isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpManager mode corsCfg enableConsole consoleAssetsDir
enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig (schemaCache, cacheBuiltTime) = do
enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig liveQueryHook (schemaCache, cacheBuiltTime) = do

(planCache, schemaCacheRef) <- initialiseCache
let getSchemaCache = first lastBuiltSchemaCache <$> readIORef (_scrCache schemaCacheRef)

let corsPolicy = mkDefaultCorsPolicy corsCfg
pgExecCtx = fromMaybe (mkPGExecCtx isoLevel pool) pgExecCtxCustom
postPollHook = fromMaybe (EL.defaultLiveQueryPostPollHook logger) liveQueryHook

lqState <- liftIO $ EL.initLiveQueriesState lqOpts pgExecCtx
lqState <- liftIO $ EL.initLiveQueriesState lqOpts pgExecCtx postPollHook
wsServerEnv <- WS.createWSServerEnv logger pgExecCtx lqState getSchemaCache httpManager
corsPolicy sqlGenCtx enableAL planCache

Expand Down

0 comments on commit 1eb36bb

Please sign in to comment.