Skip to content

Commit

Permalink
server: metadata storage abstraction for scheduled triggers
Browse files Browse the repository at this point in the history
An incremental PR towards #5797

* metadata storage abstraction for scheduled triggers

Co-authored-by: rakeshkky <12475069+rakeshkky@users.noreply.github.com>
Co-authored-by: Rakesh Emmadi <12475069+rakeshkky@users.noreply.github.com>
Co-authored-by: Auke Booij <auke@hasura.io>
GITHUB_PR_NUMBER: 6131
GITHUB_PR_URL: #6131

* update pro server code

Co-authored-by: rakeshkky <12475069+rakeshkky@users.noreply.github.com>
Co-authored-by: Auke Booij <auke@hasura.io>
GitOrigin-RevId: 17244a4
  • Loading branch information
3 people committed Nov 25, 2020
1 parent 6fe5daa commit 29925eb
Show file tree
Hide file tree
Showing 25 changed files with 872 additions and 623 deletions.
3 changes: 3 additions & 0 deletions server/graphql-engine.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ library
, Data.Time.Clock.Units
, Data.URL.Template
, Hasura.App
, Hasura.Metadata.Class

, Hasura.Backends.Postgres.Connection
, Hasura.Backends.Postgres.Execute.Mutation
Expand Down Expand Up @@ -334,6 +335,7 @@ library
, Hasura.Server.Logging
, Hasura.Server.Migrate
, Hasura.Server.Compression
, Hasura.Server.Types
, Hasura.Server.API.PGDump
, Hasura.Prelude

Expand Down Expand Up @@ -476,6 +478,7 @@ library
, Hasura.Eventing.HTTP
, Hasura.Eventing.EventTrigger
, Hasura.Eventing.ScheduledTrigger
, Hasura.Eventing.ScheduledTrigger.Types
, Hasura.Eventing.Common
, Data.GADT.Compare.Extended
, Data.Tuple.Extended
Expand Down
7 changes: 4 additions & 3 deletions server/src-exec/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ main = do
tryExit $ do
args <- parseArgs
env <- Env.getEnvironment
unAppM (runApp env args)
runApp env args
where
tryExit io = try io >>= \case
Left (ExitException _code msg) -> BC.putStrLn msg >> Sys.exitFailure
Right r -> return r

runApp :: Env.Environment -> HGEOptions Hasura -> AppM ()
runApp :: Env.Environment -> HGEOptions Hasura -> IO ()
runApp env (HGEOptionsG rci hgeCmd) = do
initTime <- liftIO getCurrentTime
globalCtx@GlobalCtx{..} <- initGlobalCtx rci
Expand Down Expand Up @@ -69,7 +69,8 @@ runApp env (HGEOptionsG rci hgeCmd) = do
Signals.sigTERM
(Signals.CatchOnce (shutdownGracefully $ _scShutdownLatch serveCtx))
Nothing
runHGEServer env serveOptions serveCtx Nothing initTime shutdownApp Nothing ekgStore
flip runPGMetadataStorageApp (_scPgPool serveCtx) $
runHGEServer env serveOptions serveCtx Nothing initTime shutdownApp Nothing ekgStore

HCExport -> do
res <- runTxWithMinimalPool _gcConnInfo fetchMetadataFromHdbTables
Expand Down
107 changes: 66 additions & 41 deletions server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Control.Exception (throwIO)
import Control.Monad.Base
import Control.Monad.Catch (Exception, MonadCatch, MonadMask,
MonadThrow, onException)
import Control.Monad.Morph (hoist)
import Control.Monad.Stateless
import Control.Monad.STM (atomically)
import Control.Monad.Trans.Control (MonadBaseControl (..))
Expand Down Expand Up @@ -54,15 +55,10 @@ import Hasura.GraphQL.Logging (MonadQueryLog (..),
import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery (..))
import Hasura.GraphQL.Transport.HTTP.Protocol (toParsed)
import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema.Cache
import Hasura.RQL.Types (CacheRWM, Code (..), HasHttpManager,
HasSQLGenCtx, HasSystemDefined,
QErr (..), SQLGenCtx (..),
SchemaCache (..), UserInfoM,
buildSchemaCacheStrict, decodeValue,
throw400, withPathK)

import Hasura.RQL.Types
import Hasura.RQL.Types.Run
import Hasura.Server.API.Query (requiresAdmin, runQueryM)
import Hasura.Server.App
Expand All @@ -73,6 +69,7 @@ import Hasura.Server.Logging
import Hasura.Server.Migrate (migrateCatalog)
import Hasura.Server.SchemaUpdate
import Hasura.Server.Telemetry
import Hasura.Server.Types
import Hasura.Server.Version
import Hasura.Session

Expand Down Expand Up @@ -193,12 +190,14 @@ data Loggers
, _lsPgLogger :: !Q.PGLogger
}

newtype AppM a = AppM { unAppM :: IO a }
-- | An application with Postgres database as a metadata storage
newtype PGMetadataStorageApp a
= PGMetadataStorageApp {runPGMetadataStorageApp :: Q.PGPool -> IO a}
deriving ( Functor, Applicative, Monad
, MonadIO, MonadBase IO
, MonadBaseControl IO
, MonadIO, MonadBase IO, MonadBaseControl IO
, MonadCatch, MonadThrow, MonadMask
)
, MonadUnique, MonadReader Q.PGPool
) via (ReaderT Q.PGPool IO)

-- | Initializes or migrates the catalog and returns the context required to start the server.
initialiseServeCtx
Expand Down Expand Up @@ -310,6 +309,7 @@ runHGEServer
, MonadExecuteQuery m
, Tracing.HasReporter m
, MonadQueryInstrumentation m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment
-> ServeOptions impl
Expand Down Expand Up @@ -402,15 +402,15 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
asyncActionsProcessor env logger (_scrCache cacheRef) _scPgPool _scHttpManager

-- start a background thread to create new cron events
cronEventsThread <- liftIO $ C.forkImmortal "runCronEventsGenerator" logger $
runCronEventsGenerator logger _scPgPool (getSCFromRef cacheRef)
cronEventsThread <- C.forkImmortal "runCronEventsGenerator" logger $
runCronEventsGenerator logger (getSCFromRef cacheRef)

-- prepare scheduled triggers
prepareScheduledEvents _scPgPool logger
prepareScheduledEvents logger

-- start a background thread to deliver the scheduled events
scheduledEventsThread <- C.forkImmortal "processScheduledTriggers" logger $
processScheduledTriggers env logger logEnvHeaders _scHttpManager _scPgPool (getSCFromRef cacheRef) lockedEventsCtx
processScheduledTriggers env logger logEnvHeaders _scHttpManager (getSCFromRef cacheRef) lockedEventsCtx

-- start a background thread to check for updates
updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $
Expand Down Expand Up @@ -443,10 +443,14 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime
unLogger logger $
mkGenericLog LevelInfo "server" $ StartupTimeInfo "starting API server" apiInitTime

shutdownHandler' <- liftWithStateless $ \lowerIO ->
pure $ shutdownHandler _scLoggers immortalThreads stopWsServer lockedEventsCtx _scPgPool $
\a b -> hoist lowerIO $ unlockScheduledEvents a b
let warpSettings = Warp.setPort soPort
. Warp.setHost soHost
. Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown
. Warp.setInstallShutdownHandler (shutdownHandler _scLoggers immortalThreads stopWsServer lockedEventsCtx _scPgPool)
. Warp.setInstallShutdownHandler shutdownHandler'
$ Warp.defaultSettings
liftIO $ Warp.runSettings warpSettings app

Expand All @@ -464,10 +468,10 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
-- There is another hasura instance which is processing events and
-- it will lock events to process them.
-- So, unlocking all the locked events might re-deliver an event(due to #2).
prepareScheduledEvents pool (Logger logger) = do
prepareScheduledEvents (Logger logger) = do
liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "preparing data"
res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllLockedScheduledEvents
onLeft res (printErrJExit EventSubSystemError)
res <- runMetadataStorageT unlockAllLockedScheduledEvents
onLeft res $ printErrJExit EventSubSystemError

-- | shutdownEvents will be triggered when a graceful shutdown has been inititiated, it will
-- get the locked events from the event engine context and the scheduled event engine context
Expand All @@ -477,28 +481,30 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
-- and will be processed when the events are proccessed next time.
shutdownEvents
:: Q.PGPool
-> (ScheduledEventType -> [ScheduledEventId] -> MetadataStorageT IO Int)
-> Logger Hasura
-> LockedEventsCtx
-> IO ()
shutdownEvents pool hasuraLogger@(Logger logger) LockedEventsCtx {..} = do
shutdownEvents pool unlockScheduledEvents' hasuraLogger@(Logger logger) LockedEventsCtx {..} = do
liftIO $ logger $ mkGenericStrLog LevelInfo "event_triggers" "unlocking events that are locked by the HGE"
unlockEventsForShutdown pool hasuraLogger "event_triggers" "" unlockEvents leEvents
let unlockEvents' =
liftEitherM . liftIO . runTx pool (Q.ReadCommitted, Nothing) . unlockEvents
unlockEventsForShutdown hasuraLogger "event_triggers" "" unlockEvents' leEvents
liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "unlocking scheduled events that are locked by the HGE"
unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "cron events" unlockCronEvents leCronEvents
unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "scheduled events" unlockCronEvents leOneOffEvents
unlockEventsForShutdown hasuraLogger "scheduled_triggers" "cron events" (unlockScheduledEvents' Cron) leCronEvents
unlockEventsForShutdown hasuraLogger "scheduled_triggers" "scheduled events" (unlockScheduledEvents' OneOff) leOneOffEvents

unlockEventsForShutdown
:: Q.PGPool
-> Logger Hasura
:: Logger Hasura
-> Text -- ^ trigger type
-> Text -- ^ event type
-> ([eventId] -> Q.TxE QErr Int)
-> ([eventId] -> MetadataStorageT IO Int)
-> TVar (Set.Set eventId)
-> IO ()
unlockEventsForShutdown pool (Logger logger) triggerType eventType doUnlock lockedIdsVar = do
unlockEventsForShutdown (Logger logger) triggerType eventType doUnlock lockedIdsVar = do
lockedIds <- readTVarIO lockedIdsVar
unless (Set.null lockedIds) $ do
result <- runTx pool (Q.ReadCommitted, Nothing) (doUnlock $ toList lockedIds)
result <- runMetadataStorageT $ doUnlock $ toList lockedIds
case result of
Left err -> logger $ mkGenericStrLog LevelWarn triggerType $
"Error while unlocking " ++ T.unpack eventType ++ " events: " ++ show err
Expand All @@ -520,14 +526,15 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
-- ^ the stop websocket server function
-> LockedEventsCtx
-> Q.PGPool
-> (ScheduledEventType -> [ScheduledEventId] -> MetadataStorageT IO Int)
-> IO ()
-- ^ the closeSocket callback
-> IO ()
shutdownHandler (Loggers loggerCtx (Logger logger) _) immortalThreads stopWsServer leCtx pool closeSocket =
shutdownHandler (Loggers loggerCtx (Logger logger) _) immortalThreads stopWsServer leCtx pool unlockScheduledEvents' closeSocket =
LA.link =<< LA.async do
waitForShutdown _scShutdownLatch
logger $ mkGenericStrLog LevelInfo "server" "gracefully shutting down server"
shutdownEvents pool (Logger logger) leCtx
shutdownEvents pool unlockScheduledEvents' (Logger logger) leCtx
closeSocket
stopWsServer
-- kill all the background immortal threads
Expand Down Expand Up @@ -626,12 +633,12 @@ execQuery env queryBs = do
buildSchemaCacheStrict
encJToLBS <$> runQueryM env query

instance Tracing.HasReporter AppM
instance Tracing.HasReporter PGMetadataStorageApp

instance MonadQueryInstrumentation AppM where
instance MonadQueryInstrumentation PGMetadataStorageApp where
askInstrumentQuery _ = pure (id, noProfile)

instance HttpLog AppM where
instance HttpLog PGMetadataStorageApp where
logHttpError logger userInfoM reqId waiReq req qErr headers =
unLogger logger $ mkHttpLog $
mkHttpErrorLogContext userInfoM reqId waiReq req qErr Nothing Nothing headers
Expand All @@ -640,42 +647,60 @@ instance HttpLog AppM where
unLogger logger $ mkHttpLog $
mkHttpAccessLogContext userInfoM reqId waiReq compressedResponse qTime cType headers

instance MonadExecuteQuery AppM where
instance MonadExecuteQuery PGMetadataStorageApp where
cacheLookup _ _ = pure ([], Nothing)
cacheStore _ _ = pure ()

instance UserAuthentication (Tracing.TraceT AppM) where
instance UserAuthentication (Tracing.TraceT PGMetadataStorageApp) where
resolveUserInfo logger manager headers authMode =
runExceptT $ getUserInfoWithExpTime logger manager headers authMode

instance MetadataApiAuthorization AppM where
instance MetadataApiAuthorization PGMetadataStorageApp where
authorizeMetadataApi query userInfo = do
let currRole = _uiRole userInfo
when (requiresAdmin query && currRole /= adminRoleName) $
withPathK "args" $ throw400 AccessDenied errMsg
where
errMsg = "restricted access : admin only"

instance ConsoleRenderer AppM where
instance ConsoleRenderer PGMetadataStorageApp where
renderConsole path authMode enableTelemetry consoleAssetsDir =
return $ mkConsoleHTML path authMode enableTelemetry consoleAssetsDir

instance MonadGQLExecutionCheck AppM where
instance MonadGQLExecutionCheck PGMetadataStorageApp where
checkGQLExecution userInfo _ enableAL sc query = runExceptT $ do
req <- toParsed query
checkQueryInAllowlist enableAL userInfo req sc
return req

instance MonadConfigApiHandler AppM where
instance MonadConfigApiHandler PGMetadataStorageApp where
runConfigApiHandler = configApiGetHandler

instance MonadQueryLog AppM where
instance MonadQueryLog PGMetadataStorageApp where
logQueryLog logger query genSqlM reqId =
unLogger logger $ QueryLog query genSqlM reqId

instance WS.MonadWSLog AppM where
instance WS.MonadWSLog PGMetadataStorageApp where
logWSLog = unLogger

runInSeparateTx :: Q.TxE QErr a -> MetadataStorageT PGMetadataStorageApp a
runInSeparateTx tx = do
pool <- lift ask
liftEitherM $ liftIO $ runExceptT $ Q.runTx pool (Q.RepeatableRead, Nothing) tx

-- | Each of the function in the type class is executed in a totally separate transaction.
--
-- To learn more about why the instance is derived as following, see Note [Generic MetadataStorageT transformer]
instance MonadMetadataStorage (MetadataStorageT PGMetadataStorageApp) where

getDeprivedCronTriggerStats = runInSeparateTx getDeprivedCronTriggerStatsTx
getScheduledEventsForDelivery = runInSeparateTx getScheduledEventsForDeliveryTx
insertScheduledEvent = runInSeparateTx . insertScheduledEventTx
insertScheduledEventInvocation a b = runInSeparateTx $ insertInvocationTx a b
setScheduledEventOp a b c = runInSeparateTx $ setScheduledEventOpTx a b c
unlockScheduledEvents a b = runInSeparateTx $ unlockScheduledEventsTx a b
unlockAllLockedScheduledEvents = runInSeparateTx unlockAllLockedScheduledEventsTx

--- helper functions ---

mkConsoleHTML :: HasVersion => Text -> AuthMode -> Bool -> Maybe Text -> Either String Text
Expand Down
3 changes: 3 additions & 0 deletions server/src-lib/Hasura/Backends/Postgres/SQL/DML.hs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ instance ToSQL Qual where
mkQIdentifier :: (IsIdentifier a, IsIdentifier b) => a -> b -> QIdentifier
mkQIdentifier q t = QIdentifier (QualifiedIdentifier (toIdentifier q) Nothing) (toIdentifier t)

mkQIdentifierTable :: (IsIdentifier a) => QualifiedTable -> a -> QIdentifier
mkQIdentifierTable q = QIdentifier (mkQual q) . toIdentifier

data QIdentifier
= QIdentifier !Qual !Identifier
deriving (Show, Eq, Generic, Data)
Expand Down
5 changes: 3 additions & 2 deletions server/src-lib/Hasura/Eventing/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ initLockedEventsCtx = do
-- | After the events are fetched from the DB, we store the locked events
-- in a hash set(order doesn't matter and look ups are faster) in the
-- event engine context
saveLockedEvents :: (MonadIO m) => [Text] -> TVar (Set.Set Text) -> m ()
saveLockedEvents :: (MonadIO m) => [EventId] -> TVar (Set.Set EventId) -> m ()
saveLockedEvents eventIds lockedEvents =
liftIO $ atomically $ do
lockedEventsVals <- readTVar lockedEvents
writeTVar lockedEvents $!
Set.union lockedEventsVals $ Set.fromList eventIds

-- | Remove an event from the 'LockedEventsCtx' after it has been processed
removeEventFromLockedEvents :: MonadIO m => Text -> TVar (Set.Set Text) -> m ()
removeEventFromLockedEvents
:: MonadIO m => EventId -> TVar (Set.Set EventId) -> m ()
removeEventFromLockedEvents eventId lockedEvents =
liftIO $ atomically $ do
lockedEventsVals <- readTVar lockedEvents
Expand Down
2 changes: 1 addition & 1 deletion server/src-lib/Hasura/Eventing/EventTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ toInt64 = fromIntegral
newtype EventIdArray = EventIdArray { unEventIdArray :: [EventId]} deriving (Show, Eq)

instance Q.ToPrepArg EventIdArray where
toPrepVal (EventIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ l
toPrepVal (EventIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ map unEventId l
where
-- 25 is the OID value of TEXT, https://jdbc.postgresql.org/development/privateapi/constant-values.html
encoder = PE.array 25 . PE.dimensionArray foldl' (PE.encodingArray . PE.text_strict)
Expand Down
Loading

0 comments on commit 29925eb

Please sign in to comment.