diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index 37717646f69cf..a7bb293f400ae 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -41,7 +41,7 @@ runApp env (HGEOptionsG rci hgeCmd) = HCServe serveOptions -> do (initCtx, initTime) <- initialiseCtx env hgeCmd rci let shutdownApp = return () - -- Catches the SIGTERM signal and initiates a graceful shutdown. + -- Catches the SIGTERM signal and initiates a graceful shutdown. -- Graceful shutdown for regular HTTP requests is already implemented in -- Warp, and is triggered by invoking the 'closeSocket' callback. -- We only catch the SIGTERM signal once, that is, if the user hits CTRL-C @@ -50,7 +50,7 @@ runApp env (HGEOptionsG rci hgeCmd) = Signals.sigTERM (Signals.CatchOnce (shutdownGracefully initCtx)) Nothing - runHGEServer env serveOptions initCtx Nothing initTime shutdownApp + runHGEServer env serveOptions initCtx Nothing initTime shutdownApp Nothing HCExport -> do (initCtx, _) <- initialiseCtx env hgeCmd rci @@ -68,7 +68,7 @@ runApp env (HGEOptionsG rci hgeCmd) = let sqlGenCtx = SQLGenCtx False res <- runAsAdmin _icPgPool sqlGenCtx _icHttpManager $ do schemaCache <- buildRebuildableSchemaCache env - execQuery env queryBs + execQuery env queryBs & Tracing.runTraceTWithReporter Tracing.noReporter "execute" & runHasSystemDefinedT (SystemDefined False) & runCacheRWT schemaCache diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index 175f1244f4653..2897707bc78c6 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -73,6 +73,7 @@ import Hasura.Session import qualified Hasura.Tracing as Tracing import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS +import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL data ExitCode = InvalidEnvironmentVariableOptionsError @@ -305,8 +306,9 @@ runHGEServer -- ^ start time -> IO () -- ^ shutdown function + -> Maybe EL.LiveQueryPostPollHook -> m () -runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = do +runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp postPollHook = do -- Comment this to enable expensive assertions from "GHC.AssertNF". These -- will log lines to STDOUT containing "not in normal form". In the future we -- could try to integrate this into our tests. For now this is a development @@ -323,7 +325,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = d authMode <- either (printErrExit AuthConfigurationError . T.unpack) return authModeRes - _idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $ + _idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $ ourIdleGC logger (seconds 0.3) (seconds 10) (seconds 60) HasuraApp app cacheRef cacheInitTime stopWsServer <- flip onException (flushLogger loggerCtx) $ @@ -346,6 +348,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = d soLiveQueryOpts soPlanCacheOptions soResponseInternalErrorsConfig + postPollHook _icSchemaCache -- log inconsistent schema objects diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs index b68a49d28c025..5019168b97b42 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs @@ -13,6 +13,8 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll ( , PollDetails(..) , BatchExecutionDetails(..) , CohortExecutionDetails(..) + , LiveQueryPostPollHook + , defaultLiveQueryPostPollHook -- * Cohorts , Cohort(..) @@ -28,6 +30,7 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll ( , newSubscriberId , SubscriberMetadata , mkSubscriberMetadata + , unSubscriberMetadata , SubscriberMap , OnChange , LGQResponse @@ -382,17 +385,23 @@ pollDetailMinimal (PollDetails{..}) = instance L.ToEngineLog PollDetails L.Hasura where toEngineLog pl = (L.LevelInfo, L.ELTLivequeryPollerLog, pollDetailMinimal pl) +type LiveQueryPostPollHook = PollDetails -> IO () + +-- the default LiveQueryPostPollHook +defaultLiveQueryPostPollHook :: L.Logger L.Hasura -> LiveQueryPostPollHook +defaultLiveQueryPostPollHook logger pd = L.unLogger logger pd + -- | Where the magic happens: the top-level action run periodically by each -- active 'Poller'. This needs to be async exception safe. pollQuery - :: L.Logger L.Hasura - -> PollerId + :: PollerId -> LiveQueriesOptions -> PGExecCtx -> MultiplexedQuery -> CohortMap + -> LiveQueryPostPollHook -> IO () -pollQuery logger pollerId lqOpts pgExecCtx pgQuery cohortMap = do +pollQuery pollerId lqOpts pgExecCtx pgQuery cohortMap postPollHook = do (totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do -- snapshot the current cohorts and split them into batches @@ -428,14 +437,15 @@ pollQuery logger pollerId lqOpts pgExecCtx pgQuery cohortMap = do pure (snapshotTime, batchesDetails) - L.unLogger logger $ PollDetails - { _pdPollerId = pollerId - , _pdGeneratedSql = unMultiplexedQuery pgQuery - , _pdSnapshotTime = snapshotTime - , _pdBatches = batchesDetails - , _pdLiveQueryOptions = lqOpts - , _pdTotalTime = totalTime - } + let pollDetails = PollDetails + { _pdPollerId = pollerId + , _pdGeneratedSql = unMultiplexedQuery pgQuery + , _pdSnapshotTime = snapshotTime + , _pdBatches = batchesDetails + , _pdLiveQueryOptions = lqOpts + , _pdTotalTime = totalTime + } + postPollHook pollDetails where LiveQueriesOptions batchSize _ = lqOpts diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs index c01f297dda4b4..d9b9a7d661ff3 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs @@ -7,6 +7,7 @@ module Hasura.GraphQL.Execute.LiveQuery.State , dumpLiveQueriesState , LiveQueryId + , LiveQueryPostPollHook , addLiveQuery , removeLiveQuery ) where @@ -41,13 +42,16 @@ data LiveQueriesState { _lqsOptions :: !LiveQueriesOptions , _lqsPGExecTx :: !PGExecCtx , _lqsLiveQueryMap :: !PollerMap + , _lqsPostPollHook :: !LiveQueryPostPollHook + -- ^ A hook function which is run after each fetch cycle } -initLiveQueriesState :: LiveQueriesOptions -> PGExecCtx -> IO LiveQueriesState -initLiveQueriesState options pgCtx = LiveQueriesState options pgCtx <$> STMMap.newIO +initLiveQueriesState :: LiveQueriesOptions -> PGExecCtx -> LiveQueryPostPollHook -> IO LiveQueriesState +initLiveQueriesState options pgCtx pollHook = + LiveQueriesState options pgCtx <$> STMMap.newIO <*> pure pollHook dumpLiveQueriesState :: Bool -> LiveQueriesState -> IO J.Value -dumpLiveQueriesState extended (LiveQueriesState opts _ lqMap) = do +dumpLiveQueriesState extended (LiveQueriesState opts _ lqMap _) = do lqMapJ <- dumpPollerMap extended lqMap return $ J.object [ "options" J..= opts @@ -100,7 +104,7 @@ addLiveQuery logger subscriberMetadata lqState plan onResultAction = do onJust handlerM $ \handler -> do pollerId <- PollerId <$> UUID.nextRandom threadRef <- forkImmortal ("pollQuery." <> show pollerId) logger $ forever $ do - pollQuery logger pollerId lqOpts pgExecCtx query $ _pCohorts handler + pollQuery pollerId lqOpts pgExecCtx query (_pCohorts handler) postPollHook sleep $ unRefetchInterval refetchInterval let !pState = PollerIOState threadRef pollerId $assertNFHere pState -- so we don't write thunks to mutable vars @@ -108,7 +112,7 @@ addLiveQuery logger subscriberMetadata lqState plan onResultAction = do pure $ LiveQueryId handlerId cohortKey subscriberId where - LiveQueriesState lqOpts pgExecCtx lqMap = lqState + LiveQueriesState lqOpts pgExecCtx lqMap postPollHook = lqState LiveQueriesOptions _ refetchInterval = lqOpts LiveQueryPlan (ParameterizedLiveQueryPlan role query) cohortKey = plan diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index 44f83d441ace0..eeaf485cdd8ca 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -62,6 +62,7 @@ import Hasura.SQL.Types import qualified Hasura.GraphQL.Execute as E import qualified Hasura.GraphQL.Execute.LiveQuery as EL +import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL import qualified Hasura.GraphQL.Explain as GE import qualified Hasura.GraphQL.Transport.HTTP as GH import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH @@ -578,18 +579,20 @@ mkWaiApp -> EL.LiveQueriesOptions -> E.PlanCacheOptions -> ResponseInternalErrorsConfig + -> Maybe EL.LiveQueryPostPollHook -> (RebuildableSchemaCache Run, Maybe UTCTime) -> m HasuraApp mkWaiApp env isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpManager mode corsCfg enableConsole consoleAssetsDir - enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig (schemaCache, cacheBuiltTime) = do + enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig liveQueryHook (schemaCache, cacheBuiltTime) = do (planCache, schemaCacheRef) <- initialiseCache let getSchemaCache = first lastBuiltSchemaCache <$> readIORef (_scrCache schemaCacheRef) let corsPolicy = mkDefaultCorsPolicy corsCfg pgExecCtx = fromMaybe (mkPGExecCtx isoLevel pool) pgExecCtxCustom + postPollHook = fromMaybe (EL.defaultLiveQueryPostPollHook logger) liveQueryHook - lqState <- liftIO $ EL.initLiveQueriesState lqOpts pgExecCtx + lqState <- liftIO $ EL.initLiveQueriesState lqOpts pgExecCtx postPollHook wsServerEnv <- WS.createWSServerEnv logger pgExecCtx lqState getSchemaCache httpManager corsPolicy sqlGenCtx enableAL planCache