From 29925eb08de0cbe373f90f6ca6a21d19780a486a Mon Sep 17 00:00:00 2001 From: hasura-bot Date: Wed, 25 Nov 2020 11:56:44 +0100 Subject: [PATCH] server: metadata storage abstraction for scheduled triggers An incremental PR towards https://github.com/hasura/graphql-engine/pull/5797 * metadata storage abstraction for scheduled triggers Co-authored-by: rakeshkky <12475069+rakeshkky@users.noreply.github.com> Co-authored-by: Rakesh Emmadi <12475069+rakeshkky@users.noreply.github.com> Co-authored-by: Auke Booij GITHUB_PR_NUMBER: 6131 GITHUB_PR_URL: https://github.com/hasura/graphql-engine/pull/6131 * update pro server code Co-authored-by: rakeshkky <12475069+rakeshkky@users.noreply.github.com> Co-authored-by: Auke Booij GitOrigin-RevId: 17244a47b3e8633acf2492e0b0734b72025f0a09 --- server/graphql-engine.cabal | 3 + server/src-exec/Main.hs | 7 +- server/src-lib/Hasura/App.hs | 107 ++- .../Hasura/Backends/Postgres/SQL/DML.hs | 3 + server/src-lib/Hasura/Eventing/Common.hs | 5 +- .../src-lib/Hasura/Eventing/EventTrigger.hs | 2 +- .../Hasura/Eventing/ScheduledTrigger.hs | 909 ++++++++---------- .../Hasura/Eventing/ScheduledTrigger/Types.hs | 69 ++ server/src-lib/Hasura/GraphQL/Execute.hs | 4 +- server/src-lib/Hasura/GraphQL/Logging.hs | 2 +- .../src-lib/Hasura/GraphQL/Transport/HTTP.hs | 2 +- .../Hasura/GraphQL/Transport/WebSocket.hs | 3 +- server/src-lib/Hasura/Metadata/Class.hs | 159 +++ .../Hasura/RQL/DDL/ScheduledTrigger.hs | 4 +- server/src-lib/Hasura/RQL/Types/Action.hs | 2 +- .../src-lib/Hasura/RQL/Types/EventTrigger.hs | 5 +- .../Hasura/RQL/Types/ScheduledTrigger.hs | 161 +++- server/src-lib/Hasura/Server/API/Query.hs | 2 +- server/src-lib/Hasura/Server/App.hs | 1 + server/src-lib/Hasura/Server/Init.hs | 11 +- server/src-lib/Hasura/Server/Logging.hs | 2 +- server/src-lib/Hasura/Server/SchemaUpdate.hs | 2 +- server/src-lib/Hasura/Server/Telemetry.hs | 2 +- server/src-lib/Hasura/Server/Types.hs | 23 + server/src-lib/Hasura/Server/Utils.hs | 5 +- 25 files changed, 872 insertions(+), 623 deletions(-) create mode 100644 server/src-lib/Hasura/Eventing/ScheduledTrigger/Types.hs create mode 100644 server/src-lib/Hasura/Metadata/Class.hs create mode 100644 server/src-lib/Hasura/Server/Types.hs diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index 2469a2d77241f..0d9ef182eab90 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -302,6 +302,7 @@ library , Data.Time.Clock.Units , Data.URL.Template , Hasura.App + , Hasura.Metadata.Class , Hasura.Backends.Postgres.Connection , Hasura.Backends.Postgres.Execute.Mutation @@ -334,6 +335,7 @@ library , Hasura.Server.Logging , Hasura.Server.Migrate , Hasura.Server.Compression + , Hasura.Server.Types , Hasura.Server.API.PGDump , Hasura.Prelude @@ -476,6 +478,7 @@ library , Hasura.Eventing.HTTP , Hasura.Eventing.EventTrigger , Hasura.Eventing.ScheduledTrigger + , Hasura.Eventing.ScheduledTrigger.Types , Hasura.Eventing.Common , Data.GADT.Compare.Extended , Data.Tuple.Extended diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index cb099523c1a48..229b8180cc505 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -34,13 +34,13 @@ main = do tryExit $ do args <- parseArgs env <- Env.getEnvironment - unAppM (runApp env args) + runApp env args where tryExit io = try io >>= \case Left (ExitException _code msg) -> BC.putStrLn msg >> Sys.exitFailure Right r -> return r -runApp :: Env.Environment -> HGEOptions Hasura -> AppM () +runApp :: Env.Environment -> HGEOptions Hasura -> IO () runApp env (HGEOptionsG rci hgeCmd) = do initTime <- liftIO getCurrentTime globalCtx@GlobalCtx{..} <- initGlobalCtx rci @@ -69,7 +69,8 @@ runApp env (HGEOptionsG rci hgeCmd) = do Signals.sigTERM (Signals.CatchOnce (shutdownGracefully $ _scShutdownLatch serveCtx)) Nothing - runHGEServer env serveOptions serveCtx Nothing initTime shutdownApp Nothing ekgStore + flip runPGMetadataStorageApp (_scPgPool serveCtx) $ + runHGEServer env serveOptions serveCtx Nothing initTime shutdownApp Nothing ekgStore HCExport -> do res <- runTxWithMinimalPool _gcConnInfo fetchMetadataFromHdbTables diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index 2d8e87225f91b..bdaef51247d36 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -8,6 +8,7 @@ import Control.Exception (throwIO) import Control.Monad.Base import Control.Monad.Catch (Exception, MonadCatch, MonadMask, MonadThrow, onException) +import Control.Monad.Morph (hoist) import Control.Monad.Stateless import Control.Monad.STM (atomically) import Control.Monad.Trans.Control (MonadBaseControl (..)) @@ -54,15 +55,10 @@ import Hasura.GraphQL.Logging (MonadQueryLog (..), import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery (..)) import Hasura.GraphQL.Transport.HTTP.Protocol (toParsed) import Hasura.Logging +import Hasura.Metadata.Class import Hasura.Prelude import Hasura.RQL.DDL.Schema.Cache -import Hasura.RQL.Types (CacheRWM, Code (..), HasHttpManager, - HasSQLGenCtx, HasSystemDefined, - QErr (..), SQLGenCtx (..), - SchemaCache (..), UserInfoM, - buildSchemaCacheStrict, decodeValue, - throw400, withPathK) - +import Hasura.RQL.Types import Hasura.RQL.Types.Run import Hasura.Server.API.Query (requiresAdmin, runQueryM) import Hasura.Server.App @@ -73,6 +69,7 @@ import Hasura.Server.Logging import Hasura.Server.Migrate (migrateCatalog) import Hasura.Server.SchemaUpdate import Hasura.Server.Telemetry +import Hasura.Server.Types import Hasura.Server.Version import Hasura.Session @@ -193,12 +190,14 @@ data Loggers , _lsPgLogger :: !Q.PGLogger } -newtype AppM a = AppM { unAppM :: IO a } +-- | An application with Postgres database as a metadata storage +newtype PGMetadataStorageApp a + = PGMetadataStorageApp {runPGMetadataStorageApp :: Q.PGPool -> IO a} deriving ( Functor, Applicative, Monad - , MonadIO, MonadBase IO - , MonadBaseControl IO + , MonadIO, MonadBase IO, MonadBaseControl IO , MonadCatch, MonadThrow, MonadMask - ) + , MonadUnique, MonadReader Q.PGPool + ) via (ReaderT Q.PGPool IO) -- | Initializes or migrates the catalog and returns the context required to start the server. initialiseServeCtx @@ -310,6 +309,7 @@ runHGEServer , MonadExecuteQuery m , Tracing.HasReporter m , MonadQueryInstrumentation m + , MonadMetadataStorage (MetadataStorageT m) ) => Env.Environment -> ServeOptions impl @@ -402,15 +402,15 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po asyncActionsProcessor env logger (_scrCache cacheRef) _scPgPool _scHttpManager -- start a background thread to create new cron events - cronEventsThread <- liftIO $ C.forkImmortal "runCronEventsGenerator" logger $ - runCronEventsGenerator logger _scPgPool (getSCFromRef cacheRef) + cronEventsThread <- C.forkImmortal "runCronEventsGenerator" logger $ + runCronEventsGenerator logger (getSCFromRef cacheRef) -- prepare scheduled triggers - prepareScheduledEvents _scPgPool logger + prepareScheduledEvents logger -- start a background thread to deliver the scheduled events scheduledEventsThread <- C.forkImmortal "processScheduledTriggers" logger $ - processScheduledTriggers env logger logEnvHeaders _scHttpManager _scPgPool (getSCFromRef cacheRef) lockedEventsCtx + processScheduledTriggers env logger logEnvHeaders _scHttpManager (getSCFromRef cacheRef) lockedEventsCtx -- start a background thread to check for updates updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $ @@ -443,10 +443,14 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime unLogger logger $ mkGenericLog LevelInfo "server" $ StartupTimeInfo "starting API server" apiInitTime + + shutdownHandler' <- liftWithStateless $ \lowerIO -> + pure $ shutdownHandler _scLoggers immortalThreads stopWsServer lockedEventsCtx _scPgPool $ + \a b -> hoist lowerIO $ unlockScheduledEvents a b let warpSettings = Warp.setPort soPort . Warp.setHost soHost . Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown - . Warp.setInstallShutdownHandler (shutdownHandler _scLoggers immortalThreads stopWsServer lockedEventsCtx _scPgPool) + . Warp.setInstallShutdownHandler shutdownHandler' $ Warp.defaultSettings liftIO $ Warp.runSettings warpSettings app @@ -464,10 +468,10 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po -- There is another hasura instance which is processing events and -- it will lock events to process them. -- So, unlocking all the locked events might re-deliver an event(due to #2). - prepareScheduledEvents pool (Logger logger) = do + prepareScheduledEvents (Logger logger) = do liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "preparing data" - res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllLockedScheduledEvents - onLeft res (printErrJExit EventSubSystemError) + res <- runMetadataStorageT unlockAllLockedScheduledEvents + onLeft res $ printErrJExit EventSubSystemError -- | shutdownEvents will be triggered when a graceful shutdown has been inititiated, it will -- get the locked events from the event engine context and the scheduled event engine context @@ -477,28 +481,30 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po -- and will be processed when the events are proccessed next time. shutdownEvents :: Q.PGPool + -> (ScheduledEventType -> [ScheduledEventId] -> MetadataStorageT IO Int) -> Logger Hasura -> LockedEventsCtx -> IO () - shutdownEvents pool hasuraLogger@(Logger logger) LockedEventsCtx {..} = do + shutdownEvents pool unlockScheduledEvents' hasuraLogger@(Logger logger) LockedEventsCtx {..} = do liftIO $ logger $ mkGenericStrLog LevelInfo "event_triggers" "unlocking events that are locked by the HGE" - unlockEventsForShutdown pool hasuraLogger "event_triggers" "" unlockEvents leEvents + let unlockEvents' = + liftEitherM . liftIO . runTx pool (Q.ReadCommitted, Nothing) . unlockEvents + unlockEventsForShutdown hasuraLogger "event_triggers" "" unlockEvents' leEvents liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "unlocking scheduled events that are locked by the HGE" - unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "cron events" unlockCronEvents leCronEvents - unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "scheduled events" unlockCronEvents leOneOffEvents + unlockEventsForShutdown hasuraLogger "scheduled_triggers" "cron events" (unlockScheduledEvents' Cron) leCronEvents + unlockEventsForShutdown hasuraLogger "scheduled_triggers" "scheduled events" (unlockScheduledEvents' OneOff) leOneOffEvents unlockEventsForShutdown - :: Q.PGPool - -> Logger Hasura + :: Logger Hasura -> Text -- ^ trigger type -> Text -- ^ event type - -> ([eventId] -> Q.TxE QErr Int) + -> ([eventId] -> MetadataStorageT IO Int) -> TVar (Set.Set eventId) -> IO () - unlockEventsForShutdown pool (Logger logger) triggerType eventType doUnlock lockedIdsVar = do + unlockEventsForShutdown (Logger logger) triggerType eventType doUnlock lockedIdsVar = do lockedIds <- readTVarIO lockedIdsVar unless (Set.null lockedIds) $ do - result <- runTx pool (Q.ReadCommitted, Nothing) (doUnlock $ toList lockedIds) + result <- runMetadataStorageT $ doUnlock $ toList lockedIds case result of Left err -> logger $ mkGenericStrLog LevelWarn triggerType $ "Error while unlocking " ++ T.unpack eventType ++ " events: " ++ show err @@ -520,14 +526,15 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po -- ^ the stop websocket server function -> LockedEventsCtx -> Q.PGPool + -> (ScheduledEventType -> [ScheduledEventId] -> MetadataStorageT IO Int) -> IO () -- ^ the closeSocket callback -> IO () - shutdownHandler (Loggers loggerCtx (Logger logger) _) immortalThreads stopWsServer leCtx pool closeSocket = + shutdownHandler (Loggers loggerCtx (Logger logger) _) immortalThreads stopWsServer leCtx pool unlockScheduledEvents' closeSocket = LA.link =<< LA.async do waitForShutdown _scShutdownLatch logger $ mkGenericStrLog LevelInfo "server" "gracefully shutting down server" - shutdownEvents pool (Logger logger) leCtx + shutdownEvents pool unlockScheduledEvents' (Logger logger) leCtx closeSocket stopWsServer -- kill all the background immortal threads @@ -626,12 +633,12 @@ execQuery env queryBs = do buildSchemaCacheStrict encJToLBS <$> runQueryM env query -instance Tracing.HasReporter AppM +instance Tracing.HasReporter PGMetadataStorageApp -instance MonadQueryInstrumentation AppM where +instance MonadQueryInstrumentation PGMetadataStorageApp where askInstrumentQuery _ = pure (id, noProfile) -instance HttpLog AppM where +instance HttpLog PGMetadataStorageApp where logHttpError logger userInfoM reqId waiReq req qErr headers = unLogger logger $ mkHttpLog $ mkHttpErrorLogContext userInfoM reqId waiReq req qErr Nothing Nothing headers @@ -640,15 +647,15 @@ instance HttpLog AppM where unLogger logger $ mkHttpLog $ mkHttpAccessLogContext userInfoM reqId waiReq compressedResponse qTime cType headers -instance MonadExecuteQuery AppM where +instance MonadExecuteQuery PGMetadataStorageApp where cacheLookup _ _ = pure ([], Nothing) cacheStore _ _ = pure () -instance UserAuthentication (Tracing.TraceT AppM) where +instance UserAuthentication (Tracing.TraceT PGMetadataStorageApp) where resolveUserInfo logger manager headers authMode = runExceptT $ getUserInfoWithExpTime logger manager headers authMode -instance MetadataApiAuthorization AppM where +instance MetadataApiAuthorization PGMetadataStorageApp where authorizeMetadataApi query userInfo = do let currRole = _uiRole userInfo when (requiresAdmin query && currRole /= adminRoleName) $ @@ -656,26 +663,44 @@ instance MetadataApiAuthorization AppM where where errMsg = "restricted access : admin only" -instance ConsoleRenderer AppM where +instance ConsoleRenderer PGMetadataStorageApp where renderConsole path authMode enableTelemetry consoleAssetsDir = return $ mkConsoleHTML path authMode enableTelemetry consoleAssetsDir -instance MonadGQLExecutionCheck AppM where +instance MonadGQLExecutionCheck PGMetadataStorageApp where checkGQLExecution userInfo _ enableAL sc query = runExceptT $ do req <- toParsed query checkQueryInAllowlist enableAL userInfo req sc return req -instance MonadConfigApiHandler AppM where +instance MonadConfigApiHandler PGMetadataStorageApp where runConfigApiHandler = configApiGetHandler -instance MonadQueryLog AppM where +instance MonadQueryLog PGMetadataStorageApp where logQueryLog logger query genSqlM reqId = unLogger logger $ QueryLog query genSqlM reqId -instance WS.MonadWSLog AppM where +instance WS.MonadWSLog PGMetadataStorageApp where logWSLog = unLogger +runInSeparateTx :: Q.TxE QErr a -> MetadataStorageT PGMetadataStorageApp a +runInSeparateTx tx = do + pool <- lift ask + liftEitherM $ liftIO $ runExceptT $ Q.runTx pool (Q.RepeatableRead, Nothing) tx + +-- | Each of the function in the type class is executed in a totally separate transaction. +-- +-- To learn more about why the instance is derived as following, see Note [Generic MetadataStorageT transformer] +instance MonadMetadataStorage (MetadataStorageT PGMetadataStorageApp) where + + getDeprivedCronTriggerStats = runInSeparateTx getDeprivedCronTriggerStatsTx + getScheduledEventsForDelivery = runInSeparateTx getScheduledEventsForDeliveryTx + insertScheduledEvent = runInSeparateTx . insertScheduledEventTx + insertScheduledEventInvocation a b = runInSeparateTx $ insertInvocationTx a b + setScheduledEventOp a b c = runInSeparateTx $ setScheduledEventOpTx a b c + unlockScheduledEvents a b = runInSeparateTx $ unlockScheduledEventsTx a b + unlockAllLockedScheduledEvents = runInSeparateTx unlockAllLockedScheduledEventsTx + --- helper functions --- mkConsoleHTML :: HasVersion => Text -> AuthMode -> Bool -> Maybe Text -> Either String Text diff --git a/server/src-lib/Hasura/Backends/Postgres/SQL/DML.hs b/server/src-lib/Hasura/Backends/Postgres/SQL/DML.hs index e42b9db6fd6b3..676a1102346aa 100644 --- a/server/src-lib/Hasura/Backends/Postgres/SQL/DML.hs +++ b/server/src-lib/Hasura/Backends/Postgres/SQL/DML.hs @@ -219,6 +219,9 @@ instance ToSQL Qual where mkQIdentifier :: (IsIdentifier a, IsIdentifier b) => a -> b -> QIdentifier mkQIdentifier q t = QIdentifier (QualifiedIdentifier (toIdentifier q) Nothing) (toIdentifier t) +mkQIdentifierTable :: (IsIdentifier a) => QualifiedTable -> a -> QIdentifier +mkQIdentifierTable q = QIdentifier (mkQual q) . toIdentifier + data QIdentifier = QIdentifier !Qual !Identifier deriving (Show, Eq, Generic, Data) diff --git a/server/src-lib/Hasura/Eventing/Common.hs b/server/src-lib/Hasura/Eventing/Common.hs index c9bb6970ee082..e8ceb14bdd419 100644 --- a/server/src-lib/Hasura/Eventing/Common.hs +++ b/server/src-lib/Hasura/Eventing/Common.hs @@ -25,7 +25,7 @@ initLockedEventsCtx = do -- | After the events are fetched from the DB, we store the locked events -- in a hash set(order doesn't matter and look ups are faster) in the -- event engine context -saveLockedEvents :: (MonadIO m) => [Text] -> TVar (Set.Set Text) -> m () +saveLockedEvents :: (MonadIO m) => [EventId] -> TVar (Set.Set EventId) -> m () saveLockedEvents eventIds lockedEvents = liftIO $ atomically $ do lockedEventsVals <- readTVar lockedEvents @@ -33,7 +33,8 @@ saveLockedEvents eventIds lockedEvents = Set.union lockedEventsVals $ Set.fromList eventIds -- | Remove an event from the 'LockedEventsCtx' after it has been processed -removeEventFromLockedEvents :: MonadIO m => Text -> TVar (Set.Set Text) -> m () +removeEventFromLockedEvents + :: MonadIO m => EventId -> TVar (Set.Set EventId) -> m () removeEventFromLockedEvents eventId lockedEvents = liftIO $ atomically $ do lockedEventsVals <- readTVar lockedEvents diff --git a/server/src-lib/Hasura/Eventing/EventTrigger.hs b/server/src-lib/Hasura/Eventing/EventTrigger.hs index 8331b40a8a1ed..1f1e3160e458f 100644 --- a/server/src-lib/Hasura/Eventing/EventTrigger.hs +++ b/server/src-lib/Hasura/Eventing/EventTrigger.hs @@ -508,7 +508,7 @@ toInt64 = fromIntegral newtype EventIdArray = EventIdArray { unEventIdArray :: [EventId]} deriving (Show, Eq) instance Q.ToPrepArg EventIdArray where - toPrepVal (EventIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ l + toPrepVal (EventIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ map unEventId l where -- 25 is the OID value of TEXT, https://jdbc.postgresql.org/development/privateapi/constant-values.html encoder = PE.array 25 . PE.dimensionArray foldl' (PE.encodingArray . PE.text_strict) diff --git a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs index ac998091f6399..6b1f502eba361 100644 --- a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs @@ -32,6 +32,10 @@ other. They can be split into different threads for a better performance. == Implementation +The scheduled triggers eventing is being implemented in the metadata storage. +All functions that make interaction to storage system are abstracted in +the @'MonadMetadataStorage' class. + During the startup, two threads are started: 1. Generator: Fetches the list of scheduled triggers from cache and generates @@ -61,283 +65,112 @@ During the startup, two threads are started: module Hasura.Eventing.ScheduledTrigger ( runCronEventsGenerator , processScheduledTriggers + , generateScheduleTimes , CronEventSeed(..) - , generateScheduleTimes - , insertCronEvents , initLockedEventsCtx , LockedEventsCtx(..) - , unlockCronEvents - , unlockOneOffScheduledEvents - , unlockAllLockedScheduledEvents + + -- * Database interactions + -- Following function names are similar to those present in + -- 'MonadMetadataStorage' type class. To avoid duplication, + -- 'Tx' is suffixed to identify as database transactions + , getDeprivedCronTriggerStatsTx + , getScheduledEventsForDeliveryTx + , insertInvocationTx + , setScheduledEventOpTx + , unlockScheduledEventsTx + , unlockAllLockedScheduledEventsTx + , insertScheduledEventTx ) where import Hasura.Prelude -import qualified Data.Aeson as J -import qualified Data.Aeson.Casing as J -import qualified Data.Aeson.TH as J -import qualified Data.ByteString.Lazy as BL -import qualified Data.Environment as Env -import qualified Data.HashMap.Strict as Map -import qualified Data.Set as Set -import qualified Data.TByteString as TBS -import qualified Data.Text as T -import qualified Database.PG.Query as Q -import qualified Database.PG.Query.PTI as PTI -import qualified Network.HTTP.Client as HTTP -import qualified PostgreSQL.Binary.Decoding as PD -import qualified PostgreSQL.Binary.Encoding as PE - -import Control.Arrow.Extended (dup) -import Control.Concurrent.Extended (sleep) +import qualified Data.Aeson as J +import qualified Data.ByteString.Lazy as BL +import qualified Data.Environment as Env +import qualified Data.HashMap.Strict as Map +import qualified Data.Set as Set +import qualified Data.TByteString as TBS +import qualified Data.Text as T +import qualified Database.PG.Query as Q +import qualified Network.HTTP.Client as HTTP +import qualified Text.Builder as TB + +import Control.Arrow.Extended (dup) +import Control.Concurrent.Extended (sleep) import Control.Concurrent.STM.TVar import Data.Has -import Data.Int (Int64) -import Data.List (unfoldr) +import Data.Int (Int64) +import Data.List (unfoldr) import Data.Time.Clock import System.Cron -import qualified Hasura.Logging as L -import qualified Hasura.Tracing as Tracing +import qualified Hasura.Logging as L +import qualified Hasura.Tracing as Tracing import Hasura.Backends.Postgres.SQL.DML import Hasura.Backends.Postgres.SQL.Types import Hasura.Eventing.Common import Hasura.Eventing.HTTP +import Hasura.Eventing.ScheduledTrigger.Types import Hasura.HTTP -import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf) +import Hasura.Metadata.Class +import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf) import Hasura.RQL.DDL.Headers import Hasura.RQL.Types +import Hasura.Server.Version (HasVersion) import Hasura.SQL.Types -import Hasura.Server.Version (HasVersion) - - -newtype ScheduledTriggerInternalErr - = ScheduledTriggerInternalErr QErr - deriving (Show, Eq) - -instance L.ToEngineLog ScheduledTriggerInternalErr L.Hasura where - toEngineLog (ScheduledTriggerInternalErr qerr) = - (L.LevelError, L.scheduledTriggerLogType, J.toJSON qerr) - -cronEventsTable :: QualifiedTable -cronEventsTable = - QualifiedObject - hdbCatalogSchema - (TableName $ T.pack "hdb_cron_events") - -data ScheduledEventStatus - = SESScheduled - | SESLocked - | SESDelivered - | SESError - | SESDead - deriving (Show, Eq) - -scheduledEventStatusToText :: ScheduledEventStatus -> Text -scheduledEventStatusToText SESScheduled = "scheduled" -scheduledEventStatusToText SESLocked = "locked" -scheduledEventStatusToText SESDelivered = "delivered" -scheduledEventStatusToText SESError = "error" -scheduledEventStatusToText SESDead = "dead" - -instance Q.ToPrepArg ScheduledEventStatus where - toPrepVal = Q.toPrepVal . scheduledEventStatusToText - -instance Q.FromCol ScheduledEventStatus where - fromCol bs = flip Q.fromColHelper bs $ PD.enum $ \case - "scheduled" -> Just SESScheduled - "locked" -> Just SESLocked - "delivered" -> Just SESDelivered - "error" -> Just SESError - "dead" -> Just SESDead - _ -> Nothing - -instance J.ToJSON ScheduledEventStatus where - toJSON = J.String . scheduledEventStatusToText - -type ScheduledEventId = Text - -data CronTriggerStats - = CronTriggerStats - { ctsName :: !TriggerName - , ctsUpcomingEventsCount :: !Int - , ctsMaxScheduledTime :: !UTCTime - } deriving (Show, Eq) - -data CronEventSeed - = CronEventSeed - { cesName :: !TriggerName - , cesScheduledTime :: !UTCTime - } deriving (Show, Eq) - -data CronEventPartial - = CronEventPartial - { cepId :: !CronEventId - , cepName :: !TriggerName - , cepScheduledTime :: !UTCTime - , cepTries :: !Int - , cepCreatedAt :: !UTCTime - -- ^ cepCreatedAt is the time at which the cron event generator - -- created the cron event - } deriving (Show, Eq) - -data ScheduledEventFull - = ScheduledEventFull - { sefId :: !ScheduledEventId - , sefName :: !(Maybe TriggerName) - -- ^ sefName is the name of the cron trigger. - -- A one-off scheduled event is not associated with a name, so in that - -- case, 'sefName' will be @Nothing@ - , sefScheduledTime :: !UTCTime - , sefTries :: !Int - , sefWebhook :: !Text - , sefPayload :: !J.Value - , sefRetryConf :: !STRetryConf - , sefHeaders :: ![EventHeaderInfo] - , sefComment :: !(Maybe Text) - , sefCreatedAt :: !UTCTime - } deriving (Show, Eq) -$(J.deriveToJSON (J.aesonDrop 3 J.snakeCase) {J.omitNothingFields = True} ''ScheduledEventFull) - -data OneOffScheduledEvent - = OneOffScheduledEvent - { ooseId :: !OneOffScheduledEventId - , ooseScheduledTime :: !UTCTime - , ooseTries :: !Int - , ooseWebhook :: !InputWebhook - , oosePayload :: !(Maybe J.Value) - , ooseRetryConf :: !STRetryConf - , ooseHeaderConf :: ![HeaderConf] - , ooseComment :: !(Maybe Text) - , ooseCreatedAt :: !UTCTime - } deriving (Show, Eq) -$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) {J.omitNothingFields = True} ''OneOffScheduledEvent) - --- | The 'ScheduledEventType' data type is needed to differentiate --- between a 'CronScheduledEvent' and 'OneOffScheduledEvent' scheduled --- event because they both have different configurations --- and they live in different tables. -data ScheduledEventType = - Cron - -- ^ A Cron scheduled event has a template defined which will - -- contain the webhook, header configuration, retry - -- configuration and a payload. Every cron event created - -- uses the above mentioned configurations defined in the template. - -- The configuration defined with the cron trigger is cached - -- and hence it's not fetched along the cron scheduled events. - | OneOff - -- ^ A One-off scheduled event doesn't have any template defined - -- so all the configuration is fetched along the scheduled events. - deriving (Eq, Show) - -data ScheduledEventWebhookPayload - = ScheduledEventWebhookPayload - { sewpId :: !Text - , sewpName :: !(Maybe TriggerName) - , sewpScheduledTime :: !UTCTime - , sewpPayload :: !J.Value - , sewpComment :: !(Maybe Text) - , sewpCreatedAt :: !(Maybe UTCTime) - -- ^ sewpCreatedAt is the time at which the event was created, - -- In case of one-off scheduled events, it's the time at which - -- the user created the event and in case of cron triggers, the - -- graphql-engine generator, generates the cron events, the - -- `created_at` is just an implementation detail, so we - -- don't send it - } deriving (Show, Eq) - -$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) {J.omitNothingFields = True} ''ScheduledEventWebhookPayload) -- | runCronEventsGenerator makes sure that all the cron triggers -- have an adequate buffer of cron events. -runCronEventsGenerator :: - L.Logger L.Hasura - -> Q.PGPool +runCronEventsGenerator + :: ( MonadIO m + , MonadMetadataStorage (MetadataStorageT m) + ) + => L.Logger L.Hasura -> IO SchemaCache - -> IO void -runCronEventsGenerator logger pgpool getSC = do + -> m void +runCronEventsGenerator logger getSC = do forever $ do - sc <- getSC + sc <- liftIO getSC -- get cron triggers from cache let cronTriggersCache = scCronTriggers sc -- get cron trigger stats from db - runExceptT - (Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadOnly) getDeprivedCronTriggerStats) >>= \case - Left err -> L.unLogger logger $ - ScheduledTriggerInternalErr $ err500 Unexpected (T.pack $ show err) - Right deprivedCronTriggerStats -> do - -- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@ - cronTriggersForHydrationWithStats <- - catMaybes <$> - mapM (withCronTrigger cronTriggersCache) deprivedCronTriggerStats - -- insert cron events for cron triggers that need hydration - runExceptT - (Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $ - insertCronEventsFor cronTriggersForHydrationWithStats) >>= \case - Right _ -> pure () - Left err -> - L.unLogger logger $ ScheduledTriggerInternalErr $ err500 Unexpected (T.pack $ show err) - sleep (minutes 1) - where - getDeprivedCronTriggerStats = liftTx $ do - map uncurryStats <$> - Q.listQE defaultTxErrorHandler - [Q.sql| - SELECT name, upcoming_events_count, max_scheduled_time - FROM hdb_catalog.hdb_cron_events_stats - WHERE upcoming_events_count < 100 - |] () True + eitherRes <- runMetadataStorageT $ do + deprivedCronTriggerStats <- getDeprivedCronTriggerStats + -- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@ + cronTriggersForHydrationWithStats <- + catMaybes <$> + mapM (withCronTrigger cronTriggersCache) deprivedCronTriggerStats + insertCronEventsFor cronTriggersForHydrationWithStats - uncurryStats (n, count, maxTs) = CronTriggerStats n count maxTs + onLeft eitherRes $ L.unLogger logger . + ScheduledTriggerInternalErr . err500 Unexpected . T.pack . show + liftIO $ sleep (minutes 1) + where withCronTrigger cronTriggerCache cronTriggerStat = do case Map.lookup (ctsName cronTriggerStat) cronTriggerCache of Nothing -> do L.unLogger logger $ ScheduledTriggerInternalErr $ - err500 Unexpected $ - "could not find scheduled trigger in the schema cache" + err500 Unexpected "could not find scheduled trigger in the schema cache" pure Nothing Just cronTrigger -> pure $ Just (cronTrigger, cronTriggerStat) -insertCronEventsFor :: [(CronTriggerInfo, CronTriggerStats)] -> Q.TxE QErr () +insertCronEventsFor + :: (MonadMetadataStorage m) + => [(CronTriggerInfo, CronTriggerStats)] + -> m () insertCronEventsFor cronTriggersWithStats = do let scheduledEvents = flip concatMap cronTriggersWithStats $ \(cti, stats) -> generateCronEventsFrom (ctsMaxScheduledTime stats) cti case scheduledEvents of [] -> pure () - events -> do - let insertCronEventsSql = toSQLTxt - SQLInsert - { siTable = cronEventsTable - , siCols = map unsafePGCol ["trigger_name", "scheduled_time"] - , siValues = ValuesExp $ map (toTupleExp . toArr) events - , siConflict = Just $ DoNothing Nothing - , siRet = Nothing - } - Q.unitQE defaultTxErrorHandler (Q.fromText insertCronEventsSql) () False - where - toArr (CronEventSeed n t) = [triggerNameToTxt n, formatTime' t] - toTupleExp = TupleExp . map SELit - -insertCronEvents :: [CronEventSeed] -> Q.TxE QErr () -insertCronEvents events = do - let insertCronEventsSql = toSQLTxt - SQLInsert - { siTable = cronEventsTable - , siCols = map unsafePGCol ["trigger_name", "scheduled_time"] - , siValues = ValuesExp $ map (toTupleExp . toArr) events - , siConflict = Just $ DoNothing Nothing - , siRet = Nothing - } - Q.unitQE defaultTxErrorHandler (Q.fromText insertCronEventsSql) () False - where - toArr (CronEventSeed n t) = [triggerNameToTxt n, formatTime' t] - toTupleExp = TupleExp . map SELit + events -> insertScheduledEvent $ SESCron events generateCronEventsFrom :: UTCTime -> CronTriggerInfo-> [CronEventSeed] generateCronEventsFrom startTime CronTriggerInfo{..} = @@ -351,221 +184,199 @@ generateScheduleTimes from n cron = take n $ go from go = unfoldr (fmap dup . nextMatch cron) processCronEvents - :: (HasVersion, MonadIO m, Tracing.HasReporter m) + :: ( HasVersion + , MonadIO m + , Tracing.HasReporter m + , MonadMetadataStorage (MetadataStorageT m) + ) => L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager - -> Q.PGPool + -> [CronEvent] -> IO SchemaCache -> TVar (Set.Set CronEventId) -> m () -processCronEvents logger logEnv httpMgr pgpool getSC lockedCronEvents = do +processCronEvents logger logEnv httpMgr cronEvents getSC lockedCronEvents = do cronTriggersInfo <- scCronTriggers <$> liftIO getSC - cronScheduledEvents <- - liftIO . runExceptT $ - Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) getPartialCronEvents - case cronScheduledEvents of - Right partialEvents -> do - -- save the locked cron events that have been fetched from the - -- database, the events stored here will be unlocked in case a - -- graceful shutdown is initiated in midst of processing these events - saveLockedEvents (map cepId partialEvents) lockedCronEvents - -- The `createdAt` of a cron event is the `created_at` of the cron trigger - for_ partialEvents $ \(CronEventPartial id' name st tries createdAt)-> do - case Map.lookup name cronTriggersInfo of - Nothing -> logInternalError $ - err500 Unexpected "could not find cron trigger in cache" - Just CronTriggerInfo{..} -> do - let webhook = unResolvedWebhook ctiWebhookInfo - payload' = fromMaybe J.Null ctiPayload - scheduledEvent = - ScheduledEventFull id' - (Just name) - st - tries - webhook - payload' - ctiRetryConf - ctiHeaders - ctiComment - createdAt - finally <- runExceptT $ - runReaderT (processScheduledEvent logEnv pgpool scheduledEvent Cron) (logger, httpMgr) - removeEventFromLockedEvents id' lockedCronEvents - either logInternalError pure finally - Left err -> logInternalError err + -- save the locked cron events that have been fetched from the + -- database, the events stored here will be unlocked in case a + -- graceful shutdown is initiated in midst of processing these events + saveLockedEvents (map _ceId cronEvents) lockedCronEvents + -- The `createdAt` of a cron event is the `created_at` of the cron trigger + for_ cronEvents $ \(CronEvent id' name st _ tries _ _)-> do + case Map.lookup name cronTriggersInfo of + Nothing -> logInternalError $ + err500 Unexpected "could not find cron trigger in cache" + Just CronTriggerInfo{..} -> do + let webhookUrl = unResolvedWebhook ctiWebhookInfo + payload = ScheduledEventWebhookPayload id' (Just name) st + (fromMaybe J.Null ctiPayload) ctiComment + Nothing + retryCtx = RetryContext tries ctiRetryConf + finally <- runMetadataStorageT $ flip runReaderT (logger, httpMgr) $ + processScheduledEvent logEnv id' ctiHeaders retryCtx + payload webhookUrl Cron + removeEventFromLockedEvents id' lockedCronEvents + onLeft finally logInternalError where logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err processOneOffScheduledEvents - :: (HasVersion, MonadIO m, Tracing.HasReporter m) + :: ( HasVersion + , MonadIO m + , Tracing.HasReporter m + , MonadMetadataStorage (MetadataStorageT m) + ) => Env.Environment -> L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager - -> Q.PGPool + -> [OneOffScheduledEvent] -> TVar (Set.Set OneOffScheduledEventId) -> m () -processOneOffScheduledEvents env logger logEnv httpMgr pgpool lockedOneOffScheduledEvents = do - oneOffScheduledEvents <- - liftIO . runExceptT $ - Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) getOneOffScheduledEvents - case oneOffScheduledEvents of - Right oneOffScheduledEvents' -> do - -- save the locked one-off events that have been fetched from the - -- database, the events stored here will be unlocked in case a - -- graceful shutdown is initiated in midst of processing these events - saveLockedEvents (map ooseId oneOffScheduledEvents') lockedOneOffScheduledEvents - for_ oneOffScheduledEvents' $ - \(OneOffScheduledEvent id' - scheduledTime - tries - webhookConf - payload - retryConf - headerConf - comment - createdAt) - -> do - webhookInfo <- liftIO . runExceptT $ resolveWebhook env webhookConf - headerInfo <- liftIO . runExceptT $ getHeaderInfosFromConf env headerConf - - case webhookInfo of - Right webhookInfo' -> do - case headerInfo of - Right headerInfo' -> do - let webhook = unResolvedWebhook webhookInfo' - payload' = fromMaybe J.Null payload - scheduledEvent = ScheduledEventFull id' - Nothing - scheduledTime - tries - webhook - payload' - retryConf - headerInfo' - comment - createdAt - finally <- runExceptT $ - runReaderT (processScheduledEvent logEnv pgpool scheduledEvent OneOff) $ - (logger, httpMgr) - removeEventFromLockedEvents id' lockedOneOffScheduledEvents - either logInternalError pure finally - - Left headerInfoErr -> logInternalError headerInfoErr - - Left webhookInfoErr -> logInternalError webhookInfoErr - - Left oneOffScheduledEventsErr -> logInternalError oneOffScheduledEventsErr +processOneOffScheduledEvents env logger logEnv httpMgr oneOffEvents lockedOneOffScheduledEvents = do + -- save the locked one-off events that have been fetched from the + -- database, the events stored here will be unlocked in case a + -- graceful shutdown is initiated in midst of processing these events + saveLockedEvents (map _ooseId oneOffEvents) lockedOneOffScheduledEvents + for_ oneOffEvents $ \OneOffScheduledEvent{..} -> do + (either logInternalError pure) =<< runMetadataStorageT do + webhookInfo <- resolveWebhook env _ooseWebhookConf + headerInfo <- getHeaderInfosFromConf env _ooseHeaderConf + let webhookUrl = unResolvedWebhook webhookInfo + payload = ScheduledEventWebhookPayload _ooseId Nothing + _ooseScheduledTime (fromMaybe J.Null _oosePayload) + _ooseComment (Just _ooseCreatedAt) + retryCtx = RetryContext _ooseTries _ooseRetryConf + + flip runReaderT (logger, httpMgr) $ + processScheduledEvent logEnv _ooseId headerInfo retryCtx payload webhookUrl OneOff + removeEventFromLockedEvents _ooseId lockedOneOffScheduledEvents where logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err processScheduledTriggers - :: (HasVersion, MonadIO m, Tracing.HasReporter m) + :: ( HasVersion + , MonadIO m + , Tracing.HasReporter m + , MonadMetadataStorage (MetadataStorageT m) + ) => Env.Environment -> L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager - -> Q.PGPool -> IO SchemaCache -> LockedEventsCtx -> m void -processScheduledTriggers env logger logEnv httpMgr pgpool getSC LockedEventsCtx {..} = +processScheduledTriggers env logger logEnv httpMgr getSC LockedEventsCtx {..} = forever $ do - processCronEvents logger logEnv httpMgr pgpool getSC leCronEvents - processOneOffScheduledEvents env logger logEnv httpMgr pgpool leOneOffEvents - liftIO $ sleep (minutes 1) + result <- runMetadataStorageT getScheduledEventsForDelivery + case result of + Left e -> logInternalError e + Right (cronEvents, oneOffEvents) -> do + processCronEvents logger logEnv httpMgr cronEvents getSC leCronEvents + processOneOffScheduledEvents env logger logEnv httpMgr oneOffEvents leOneOffEvents + liftIO $ sleep (minutes 1) + where + logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err -processScheduledEvent :: - ( MonadReader r m - , Has HTTP.Manager r - , Has (L.Logger L.Hasura) r - , HasVersion - , MonadIO m - , MonadError QErr m - , Tracing.HasReporter m - ) +processScheduledEvent + :: ( MonadReader r m + , Has HTTP.Manager r + , Has (L.Logger L.Hasura) r + , HasVersion + , MonadIO m + , Tracing.HasReporter m + , MonadMetadataStorage m + ) => LogEnvHeaders - -> Q.PGPool - -> ScheduledEventFull + -> ScheduledEventId + -> [EventHeaderInfo] + -> RetryContext + -> ScheduledEventWebhookPayload + -> Text -> ScheduledEventType -> m () -processScheduledEvent logEnv pgpool se@ScheduledEventFull {..} type' = Tracing.runTraceT traceNote do +processScheduledEvent logEnv eventId eventHeaders retryCtx payload webhookUrl type' + = Tracing.runTraceT traceNote do currentTime <- liftIO getCurrentTime - if convertDuration (diffUTCTime currentTime sefScheduledTime) - > unNonNegativeDiffTime (strcToleranceSeconds sefRetryConf) - then processDead pgpool se type' + let retryConf = _rctxConf retryCtx + scheduledTime = sewpScheduledTime payload + if convertDuration (diffUTCTime currentTime scheduledTime) + > unNonNegativeDiffTime (strcToleranceSeconds retryConf) + then processDead eventId type' else do let timeoutSeconds = round $ unNonNegativeDiffTime - $ strcTimeoutSeconds sefRetryConf + $ strcTimeoutSeconds retryConf httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000) - headers = addDefaultHeaders $ map encodeHeader sefHeaders - extraLogCtx = ExtraLogContext (Just currentTime) sefId - -- include `created_at` in the payload, only in one-off events - createdAt = bool Nothing (Just sefCreatedAt) $ type' == OneOff - webhookReqPayload = - ScheduledEventWebhookPayload - sefId sefName sefScheduledTime sefPayload sefComment createdAt - webhookReqBodyJson = J.toJSON webhookReqPayload + headers = addDefaultHeaders $ map encodeHeader eventHeaders + extraLogCtx = ExtraLogContext (Just currentTime) eventId + webhookReqBodyJson = J.toJSON payload webhookReqBody = J.encode webhookReqBodyJson requestDetails = RequestDetails $ BL.length webhookReqBody - res <- runExceptT $ tryWebhook headers httpTimeout webhookReqBody (T.unpack sefWebhook) - logHTTPForST res extraLogCtx requestDetails - let decodedHeaders = map (decodeHeader logEnv sefHeaders) headers - either - (processError pgpool se decodedHeaders type' webhookReqBodyJson) - (processSuccess pgpool se decodedHeaders type' webhookReqBodyJson) - res + eitherRes <- runExceptT $ tryWebhook headers httpTimeout webhookReqBody (T.unpack webhookUrl) + logHTTPForST eitherRes extraLogCtx requestDetails + let decodedHeaders = map (decodeHeader logEnv eventHeaders) headers + case eitherRes of + Left e -> processError eventId retryCtx decodedHeaders type' webhookReqBodyJson e + Right r -> processSuccess eventId decodedHeaders type' webhookReqBodyJson r where - traceNote = "Scheduled trigger" <> foldMap ((": " <>) . triggerNameToTxt) sefName + traceNote = "Scheduled trigger" <> foldMap ((": " <>) . triggerNameToTxt) (sewpName payload) processError - :: (MonadIO m, MonadError QErr m) - => Q.PGPool - -> ScheduledEventFull + :: ( MonadIO m + , MonadMetadataStorage m + ) + => ScheduledEventId + -> RetryContext -> [HeaderConf] -> ScheduledEventType -> J.Value -> HTTPErr a -> m () -processError pgpool se decodedHeaders type' reqJson err = do +processError eventId retryCtx decodedHeaders type' reqJson err = do let invocation = case err of HClient excp -> do let errMsg = TBS.fromLBS $ J.encode $ show excp - mkInvocation se 1000 decodedHeaders errMsg [] reqJson + mkInvocation eventId 1000 decodedHeaders errMsg [] reqJson HParse _ detail -> do let errMsg = TBS.fromLBS $ J.encode detail - mkInvocation se 1001 decodedHeaders errMsg [] reqJson + mkInvocation eventId 1001 decodedHeaders errMsg [] reqJson HStatus errResp -> do let respPayload = hrsBody errResp respHeaders = hrsHeaders errResp respStatus = hrsStatus errResp - mkInvocation se respStatus decodedHeaders respPayload respHeaders reqJson + mkInvocation eventId respStatus decodedHeaders respPayload respHeaders reqJson HOther detail -> do - let errMsg = TBS.fromLBS $ J.encode detail - mkInvocation se 500 decodedHeaders errMsg [] reqJson - liftExceptTIO $ - Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $ do - insertInvocation invocation type' - retryOrMarkError se err type' - -retryOrMarkError :: ScheduledEventFull -> HTTPErr a -> ScheduledEventType -> Q.TxE QErr () -retryOrMarkError se@ScheduledEventFull {..} err type' = do - let mRetryHeader = getRetryAfterHeaderFromHTTPErr err + let errMsg = (TBS.fromLBS $ J.encode detail) + mkInvocation eventId 500 decodedHeaders errMsg [] reqJson + insertScheduledEventInvocation invocation type' + retryOrMarkError eventId retryCtx err type' + +retryOrMarkError + :: (MonadIO m, MonadMetadataStorage m) + => ScheduledEventId + -> RetryContext + -> HTTPErr a + -> ScheduledEventType + -> m () +retryOrMarkError eventId retryCtx err type' = do + let RetryContext tries retryConf = retryCtx + mRetryHeader = getRetryAfterHeaderFromHTTPErr err mRetryHeaderSeconds = parseRetryHeaderValue =<< mRetryHeader - triesExhausted = sefTries >= strcNumRetries sefRetryConf + triesExhausted = tries >= strcNumRetries retryConf noRetryHeader = isNothing mRetryHeaderSeconds if triesExhausted && noRetryHeader - then do - setScheduledEventStatus sefId SESError type' + then + setScheduledEventOp eventId (SEOpStatus SESError) type' else do currentTime <- liftIO getCurrentTime let delay = fromMaybe (round $ unNonNegativeDiffTime - $ strcRetryIntervalSeconds sefRetryConf) - $ mRetryHeaderSeconds + $ strcRetryIntervalSeconds retryConf) + mRetryHeaderSeconds diff = fromIntegral delay retryTime = addUTCTime diff currentTime - setRetry se retryTime type' + setScheduledEventOp eventId (SEOpRetry retryTime) type' {- Note [Scheduled event lifecycle] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -599,69 +410,113 @@ and it can transition to other states in the following ways: -} processSuccess - :: (MonadIO m, MonadError QErr m) - => Q.PGPool - -> ScheduledEventFull + :: (MonadMetadataStorage m) + => ScheduledEventId -> [HeaderConf] -> ScheduledEventType -> J.Value -> HTTPResp a -> m () -processSuccess pgpool se decodedHeaders type' reqBodyJson resp = do +processSuccess eventId decodedHeaders type' reqBodyJson resp = do let respBody = hrsBody resp respHeaders = hrsHeaders resp respStatus = hrsStatus resp - invocation = mkInvocation se respStatus decodedHeaders respBody respHeaders reqBodyJson - liftExceptTIO $ - Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $ do - insertInvocation invocation type' - setScheduledEventStatus (sefId se) SESDelivered type' - -processDead :: (MonadIO m, MonadError QErr m) => Q.PGPool -> ScheduledEventFull -> ScheduledEventType -> m () -processDead pgpool se type' = - liftExceptTIO $ - Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $ - setScheduledEventStatus (sefId se) SESDead type' - -setRetry :: ScheduledEventFull -> UTCTime -> ScheduledEventType -> Q.TxE QErr () -setRetry se time type' = - case type' of - Cron -> - Q.unitQE defaultTxErrorHandler [Q.sql| - UPDATE hdb_catalog.hdb_cron_events - SET next_retry_at = $1, - STATUS = 'scheduled' - WHERE id = $2 - |] (time, sefId se) True - OneOff -> - Q.unitQE defaultTxErrorHandler [Q.sql| - UPDATE hdb_catalog.hdb_scheduled_events - SET next_retry_at = $1, - STATUS = 'scheduled' - WHERE id = $2 - |] (time, sefId se) True + invocation = mkInvocation eventId respStatus decodedHeaders respBody respHeaders reqBodyJson + insertScheduledEventInvocation invocation type' + setScheduledEventOp eventId (SEOpStatus SESDelivered) type' + +processDead + :: (MonadMetadataStorage m) + => ScheduledEventId -> ScheduledEventType -> m () +processDead eventId type' = + setScheduledEventOp eventId (SEOpStatus SESDead) type' mkInvocation - :: ScheduledEventFull + :: ScheduledEventId -> Int -> [HeaderConf] -> TBS.TByteString -> [HeaderConf] -> J.Value - -> Invocation 'ScheduledType -mkInvocation ScheduledEventFull {sefId} status reqHeaders respBody respHeaders reqBodyJson + -> (Invocation 'ScheduledType) +mkInvocation eventId status reqHeaders respBody respHeaders reqBodyJson = let resp = if isClientError status then mkClientErr respBody else mkResp status respBody respHeaders in Invocation - sefId + eventId status (mkWebhookReq reqBodyJson reqHeaders invocationVersionST) resp -insertInvocation :: Invocation 'ScheduledType -> ScheduledEventType -> Q.TxE QErr () -insertInvocation invo type' = do +-- metadata database transactions + +getDeprivedCronTriggerStatsTx :: Q.TxE QErr [CronTriggerStats] +getDeprivedCronTriggerStatsTx = + map (\(n, count, maxTx) -> CronTriggerStats n count maxTx) <$> + Q.listQE defaultTxErrorHandler + [Q.sql| + SELECT * FROM + ( SELECT + trigger_name, + count(*) as upcoming_events_count, + max(scheduled_time) as max_scheduled_time + FROM hdb_catalog.hdb_cron_events + WHERE tries = 0 and status = 'scheduled' + GROUP BY trigger_name + ) AS q + WHERE q.upcoming_events_count < 100 + |] () True + +getScheduledEventsForDeliveryTx :: Q.TxE QErr ([CronEvent], [OneOffScheduledEvent]) +getScheduledEventsForDeliveryTx = + (,) <$> getCronEventsForDelivery <*> getOneOffEventsForDelivery + where + getCronEventsForDelivery :: Q.TxE QErr [CronEvent] + getCronEventsForDelivery = + map (Q.getAltJ . runIdentity) <$> Q.listQE defaultTxErrorHandler [Q.sql| + WITH cte AS + ( UPDATE hdb_catalog.hdb_cron_events + SET status = 'locked' + WHERE id IN ( SELECT t.id + FROM hdb_catalog.hdb_cron_events t + WHERE ( t.status = 'scheduled' + and ( + (t.next_retry_at is NULL and t.scheduled_time <= now()) or + (t.next_retry_at is not NULL and t.next_retry_at <= now()) + ) + ) + FOR UPDATE SKIP LOCKED + ) + RETURNING * + ) + SELECT row_to_json(t.*) FROM cte AS t + |] () True + + getOneOffEventsForDelivery :: Q.TxE QErr [OneOffScheduledEvent] + getOneOffEventsForDelivery = do + map (Q.getAltJ . runIdentity) <$> Q.listQE defaultTxErrorHandler [Q.sql| + WITH cte AS ( + UPDATE hdb_catalog.hdb_scheduled_events + SET status = 'locked' + WHERE id IN ( SELECT t.id + FROM hdb_catalog.hdb_scheduled_events t + WHERE ( t.status = 'scheduled' + and ( + (t.next_retry_at is NULL and t.scheduled_time <= now()) or + (t.next_retry_at is not NULL and t.next_retry_at <= now()) + ) + ) + FOR UPDATE SKIP LOCKED + ) + RETURNING * + ) + SELECT row_to_json(t.*) FROM cte AS t + |] () False + +insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> Q.TxE QErr () +insertInvocationTx invo type' = do case type' of Cron -> do Q.unitQE defaultTxErrorHandler @@ -694,120 +549,72 @@ insertInvocation invo type' = do WHERE id = $1 |] (Identity $ iEventId invo) True -setScheduledEventStatus :: Text -> ScheduledEventStatus -> ScheduledEventType -> Q.TxE QErr () -setScheduledEventStatus scheduledEventId status type' = - case type' of - Cron -> do - Q.unitQE defaultTxErrorHandler - [Q.sql| - UPDATE hdb_catalog.hdb_cron_events - SET status = $2 - WHERE id = $1 - |] (scheduledEventId, status) True - OneOff -> do - Q.unitQE defaultTxErrorHandler - [Q.sql| - UPDATE hdb_catalog.hdb_scheduled_events - SET status = $2 - WHERE id = $1 - |] (scheduledEventId, status) True - -getPartialCronEvents :: Q.TxE QErr [CronEventPartial] -getPartialCronEvents = do - map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql| - UPDATE hdb_catalog.hdb_cron_events - SET status = 'locked' - WHERE id IN ( SELECT t.id - FROM hdb_catalog.hdb_cron_events t - WHERE ( t.status = 'scheduled' - and ( - (t.next_retry_at is NULL and t.scheduled_time <= now()) or - (t.next_retry_at is not NULL and t.next_retry_at <= now()) - ) - ) - FOR UPDATE SKIP LOCKED - ) - RETURNING id, trigger_name, scheduled_time, tries, created_at - |] () True - where uncurryEvent (i, n, st, tries, createdAt) = CronEventPartial i n st tries createdAt - -getOneOffScheduledEvents :: Q.TxE QErr [OneOffScheduledEvent] -getOneOffScheduledEvents = do - map uncurryOneOffScheduledEvent <$> Q.listQE defaultTxErrorHandler [Q.sql| - UPDATE hdb_catalog.hdb_scheduled_events - SET status = 'locked' - WHERE id IN ( SELECT t.id - FROM hdb_catalog.hdb_scheduled_events t - WHERE ( t.status = 'scheduled' - and ( - (t.next_retry_at is NULL and t.scheduled_time <= now()) or - (t.next_retry_at is not NULL and t.next_retry_at <= now()) - ) - ) - FOR UPDATE SKIP LOCKED - ) - RETURNING id, webhook_conf, scheduled_time, retry_conf, payload, header_conf, tries, comment, created_at - |] () False +setScheduledEventOpTx + :: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> Q.TxE QErr () +setScheduledEventOpTx eventId op type' = case op of + SEOpRetry time -> setRetry time + SEOpStatus status -> setStatus status where - uncurryOneOffScheduledEvent ( eventId - , webhookConf - , scheduledTime - , retryConf - , payload - , headerConf - , tries - , comment - , createdAt) = - OneOffScheduledEvent eventId - scheduledTime - tries - (Q.getAltJ webhookConf) - (Q.getAltJ payload) - (Q.getAltJ retryConf) - (Q.getAltJ headerConf) - comment - createdAt - - -liftExceptTIO :: (MonadError e m, MonadIO m) => ExceptT e IO a -> m a -liftExceptTIO m = liftEither =<< liftIO (runExceptT m) - -newtype ScheduledEventIdArray = - ScheduledEventIdArray { unScheduledEventIdArray :: [ScheduledEventId]} - deriving (Show, Eq) - -instance Q.ToPrepArg ScheduledEventIdArray where - toPrepVal (ScheduledEventIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ l - where - -- 25 is the OID value of TEXT, https://jdbc.postgresql.org/development/privateapi/constant-values.html - encoder = PE.array 25 . PE.dimensionArray foldl' (PE.encodingArray . PE.text_strict) - -unlockCronEvents :: [ScheduledEventId] -> Q.TxE QErr Int -unlockCronEvents scheduledEventIds = - runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler - [Q.sql| - WITH "cte" AS - (UPDATE hdb_catalog.hdb_cron_events - SET status = 'scheduled' - WHERE id = ANY($1::text[]) and status = 'locked' - RETURNING *) - SELECT count(*) FROM "cte" - |] (Identity $ ScheduledEventIdArray scheduledEventIds) True - -unlockOneOffScheduledEvents :: [ScheduledEventId] -> Q.TxE QErr Int -unlockOneOffScheduledEvents scheduledEventIds = - runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler - [Q.sql| - WITH "cte" AS - (UPDATE hdb_catalog.hdb_scheduled_events - SET status = 'scheduled' - WHERE id = ANY($1::text[]) AND status = 'locked' - RETURNING *) - SELECT count(*) FROM "cte" - |] (Identity $ ScheduledEventIdArray scheduledEventIds) True - -unlockAllLockedScheduledEvents :: Q.TxE QErr () -unlockAllLockedScheduledEvents = do + setRetry time = + case type' of + Cron -> + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.hdb_cron_events + SET next_retry_at = $1, + STATUS = 'scheduled' + WHERE id = $2 + |] (time, eventId) True + OneOff -> + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.hdb_scheduled_events + SET next_retry_at = $1, + STATUS = 'scheduled' + WHERE id = $2 + |] (time, eventId) True + setStatus status = + case type' of + Cron -> do + Q.unitQE defaultTxErrorHandler + [Q.sql| + UPDATE hdb_catalog.hdb_cron_events + SET status = $2 + WHERE id = $1 + |] (eventId, status) True + OneOff -> do + Q.unitQE defaultTxErrorHandler + [Q.sql| + UPDATE hdb_catalog.hdb_scheduled_events + SET status = $2 + WHERE id = $1 + |] (eventId, status) True + +unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> Q.TxE QErr Int +unlockScheduledEventsTx type' eventIds = + case type' of + Cron -> + (runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler + [Q.sql| + WITH "cte" AS + (UPDATE hdb_catalog.hdb_cron_events + SET status = 'scheduled' + WHERE id = ANY($1::text[]) and status = 'locked' + RETURNING *) + SELECT count(*) FROM "cte" + |] (Identity $ ScheduledEventIdArray eventIds) True + + OneOff -> + (runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler + [Q.sql| + WITH "cte" AS + (UPDATE hdb_catalog.hdb_scheduled_events + SET status = 'scheduled' + WHERE id = ANY($1::text[]) AND status = 'locked' + RETURNING *) + SELECT count(*) FROM "cte" + |] (Identity $ ScheduledEventIdArray eventIds) True + +unlockAllLockedScheduledEventsTx :: Q.TxE QErr () +unlockAllLockedScheduledEventsTx = do Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.hdb_cron_events SET status = 'scheduled' @@ -818,3 +625,41 @@ unlockAllLockedScheduledEvents = do SET status = 'scheduled' WHERE status = 'locked' |] () True + +insertScheduledEventTx :: ScheduledEventSeed -> Q.TxE QErr () +insertScheduledEventTx = \case + SESOneOff CreateScheduledEvent{..} -> + Q.unitQE defaultTxErrorHandler + [Q.sql| + INSERT INTO hdb_catalog.hdb_scheduled_events + (webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment) + VALUES + ($1, $2, $3, $4, $5, $6) + |] ( Q.AltJ cseWebhook + , cseScheduleAt + , Q.AltJ csePayload + , Q.AltJ cseRetryConf + , Q.AltJ cseHeaders + , cseComment) + False + + SESCron cronSeeds -> insertCronEventsTx cronSeeds + where + insertCronEventsTx :: [CronEventSeed] -> Q.TxE QErr () + insertCronEventsTx events = do + let insertCronEventsSql = TB.run $ toSQL + SQLInsert + { siTable = cronEventsTable + , siCols = map unsafePGCol ["trigger_name", "scheduled_time"] + , siValues = ValuesExp $ map (toTupleExp . toArr) events + , siConflict = Just $ DoNothing Nothing + , siRet = Nothing + } + Q.unitQE defaultTxErrorHandler (Q.fromText insertCronEventsSql) () False + where + toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)] + toTupleExp = TupleExp . map SELit + +cronEventsTable :: QualifiedTable +cronEventsTable = + QualifiedObject "hdb_catalog" $ TableName "hdb_cron_events" diff --git a/server/src-lib/Hasura/Eventing/ScheduledTrigger/Types.hs b/server/src-lib/Hasura/Eventing/ScheduledTrigger/Types.hs new file mode 100644 index 0000000000000..2eef391c0ac54 --- /dev/null +++ b/server/src-lib/Hasura/Eventing/ScheduledTrigger/Types.hs @@ -0,0 +1,69 @@ +module Hasura.Eventing.ScheduledTrigger.Types where + +import Data.Time.Clock +import Hasura.Eventing.HTTP +import Hasura.Prelude +import Hasura.RQL.Types + +import qualified Data.Aeson as J +import qualified Data.Aeson.Casing as J +import qualified Data.Aeson.TH as J +import qualified Database.PG.Query as Q +import qualified Database.PG.Query.PTI as PTI +import qualified Hasura.Logging as L +import qualified PostgreSQL.Binary.Encoding as PE + +newtype ScheduledTriggerInternalErr + = ScheduledTriggerInternalErr QErr + deriving (Show, Eq) + +instance L.ToEngineLog ScheduledTriggerInternalErr L.Hasura where + toEngineLog (ScheduledTriggerInternalErr qerr) = + (L.LevelError, L.scheduledTriggerLogType, J.toJSON qerr) + +data CronTriggerStats + = CronTriggerStats + { ctsName :: !TriggerName + , ctsUpcomingEventsCount :: !Int + , ctsMaxScheduledTime :: !UTCTime + } deriving (Show, Eq) + +data RetryContext + = RetryContext + { _rctxTries :: !Int + , _rctxConf :: !STRetryConf + } deriving (Show, Eq) + +data ScheduledEventWebhookPayload + = ScheduledEventWebhookPayload + { sewpId :: !EventId + , sewpName :: !(Maybe TriggerName) + , sewpScheduledTime :: !UTCTime + , sewpPayload :: !J.Value + , sewpComment :: !(Maybe Text) + , sewpCreatedAt :: !(Maybe UTCTime) + -- ^ sewpCreatedAt is the time at which the event was created, + -- In case of one-off scheduled events, it's the time at which + -- the user created the event and in case of cron triggers, the + -- graphql-engine generator, generates the cron events, the + -- `created_at` is just an implementation detail, so we + -- don't send it + } deriving (Show, Eq) + +$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) {J.omitNothingFields = True} ''ScheduledEventWebhookPayload) + +newtype ScheduledEventIdArray = + ScheduledEventIdArray { unScheduledEventIdArray :: [ScheduledEventId]} + deriving (Show, Eq) + +instance Q.ToPrepArg ScheduledEventIdArray where + toPrepVal (ScheduledEventIdArray l) = + Q.toPrepValHelper PTI.unknown encoder $ map unEventId l + where + -- 25 is the OID value of TEXT, https://jdbc.postgresql.org/development/privateapi/constant-values.html + encoder = PE.array 25 . PE.dimensionArray foldl' (PE.encodingArray . PE.text_strict) + +data ScheduledEventOp + = SEOpRetry !UTCTime + | SEOpStatus !ScheduledEventStatus + deriving (Show, Eq) diff --git a/server/src-lib/Hasura/GraphQL/Execute.hs b/server/src-lib/Hasura/GraphQL/Execute.hs index ce77c05b49c6d..bc833420d862c 100644 --- a/server/src-lib/Hasura/GraphQL/Execute.hs +++ b/server/src-lib/Hasura/GraphQL/Execute.hs @@ -28,8 +28,8 @@ import qualified Data.Environment as Env import qualified Data.HashMap.Strict as Map import qualified Data.HashSet as HS -import qualified Language.GraphQL.Draft.Syntax as G import qualified Language.GraphQL.Draft.Printer as G +import qualified Language.GraphQL.Draft.Syntax as G import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Types as HTTP import qualified Network.Wai.Extended as Wai @@ -42,7 +42,7 @@ import Hasura.GraphQL.Transport.HTTP.Protocol import Hasura.GraphQL.Utils (showName) import Hasura.HTTP import Hasura.RQL.Types -import Hasura.Server.Utils (RequestId) +import Hasura.Server.Types (RequestId) import Hasura.Server.Version (HasVersion) import Hasura.Session diff --git a/server/src-lib/Hasura/GraphQL/Logging.hs b/server/src-lib/Hasura/GraphQL/Logging.hs index 3372311e47719..8faefb6aba1a1 100644 --- a/server/src-lib/Hasura/GraphQL/Logging.hs +++ b/server/src-lib/Hasura/GraphQL/Logging.hs @@ -13,7 +13,7 @@ import qualified Language.GraphQL.Draft.Syntax as G import Hasura.GraphQL.Transport.HTTP.Protocol (GQLReqUnparsed) import Hasura.Prelude -import Hasura.Server.Utils (RequestId) +import Hasura.Server.Types (RequestId) import Hasura.Tracing (TraceT) import qualified Hasura.GraphQL.Execute.Query as EQ diff --git a/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs b/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs index ce842053b34c6..c171786e1459a 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs @@ -28,7 +28,7 @@ import Hasura.HTTP import Hasura.Prelude import Hasura.RQL.Types import Hasura.Server.Init.Config -import Hasura.Server.Utils (RequestId) +import Hasura.Server.Types (RequestId) import Hasura.Server.Version (HasVersion) import Hasura.Session import Hasura.Tracing (MonadTrace, TraceT, trace) diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs index a85149fe5eecb..bb2407dab2087 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs @@ -61,7 +61,8 @@ import Hasura.RQL.Types import Hasura.Server.Auth (AuthMode, UserAuthentication, resolveUserInfo) import Hasura.Server.Cors -import Hasura.Server.Utils (RequestId, getRequestId) +import Hasura.Server.Types (RequestId) +import Hasura.Server.Utils (getRequestId) import Hasura.Server.Version (HasVersion) import Hasura.Session diff --git a/server/src-lib/Hasura/Metadata/Class.hs b/server/src-lib/Hasura/Metadata/Class.hs new file mode 100644 index 0000000000000..6ca4443ff301e --- /dev/null +++ b/server/src-lib/Hasura/Metadata/Class.hs @@ -0,0 +1,159 @@ +-- | This module has type class and types which implements the Metadata Storage Abstraction +{-# LANGUAGE UndecidableInstances #-} +module Hasura.Metadata.Class + ( MetadataStorageT(..) + , runMetadataStorageT + , MonadMetadataStorage(..) + ) +where + +import Control.Monad.Morph (MFunctor) + +import Hasura.Eventing.HTTP +import Hasura.Eventing.ScheduledTrigger.Types +import Hasura.Prelude +import Hasura.RQL.Types + +import qualified Hasura.Tracing as Tracing + +{- Note [Todo: Common interface for eventing sub-system] +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Postgres tables' event triggers and scheduled event triggers are similar in the +core logic. But currently, their implementation is completely isolated and do not +share a common schema in Postgres. We're having a plan to simplify them via a +common 'event storage and retrieval' interface (maybe via a Postgres extension?). +This will potentially reduce number of interactions made to database and schema foot print. + +TODO: Reference to open issue or rfc? +-} + +-- | Metadata storage abstraction via a type class. +-- +-- This type class enables storing and managing Hasura metadata in an isolated +-- database which will not interfere with user's database where tables/functions +-- are defined. Hence, it'll enable support for databases of multiple backends +-- like MySQL, MSSQL etc. +-- +-- This class has functions broadly related to: +-- +-- 1. Metadata Management (TODO: Need to be added to the type class) +-- ---------------------- +-- Basic metadata management functions such as retrieving metadata from storage +-- database and replacing the given metadata. +-- +-- 2. Scheduled Triggers +-- --------------------- +-- Eventing sub-system for scheduled triggers is implemented via metadata storage. +-- For more details, refer description in 'Hasura.Eventing.ScheduledTrigger' module. +-- +-- TODO: Functions need to be added to the type class +-- - Retrieving invocation logs from storage (console requirement) +-- - Deleting an scheduled event +-- - Creating an one-off scheduled event +-- +-- 3. Async Actions (TODO: Need to be added to the type class) +-- ---------------- +-- Operations to implement async actions sub-system. This includes recording an +-- async action event and retreiving the details of action delivery to the webhook. +-- For more details see Note [Async action architecture] in 'Hasura.GraphQL.Execute.Action' module. +-- +-- It is believed that all the above three are implemented in a single storage +-- system (ex: a Postgres database). We can split the functions into appropriate and +-- specific type classes in future iterations if required. + +class (MonadError QErr m) => MonadMetadataStorage m where + + -- Scheduled triggers + -- TODO:- + -- Ideally we would've liked to avoid having functions that are specific to + -- scheduled/cron triggers and instead have functions that provide a generic + -- 'event storage and retrieval' interface but we'll have to change a lot of + -- existing code for scheduled and cron triggers. We can get to this after the + -- multi-source work is done. See Note [Todo: Common interface for eventing sub-system] + getDeprivedCronTriggerStats :: m [CronTriggerStats] + getScheduledEventsForDelivery :: m ([CronEvent], [OneOffScheduledEvent]) + insertScheduledEvent :: ScheduledEventSeed -> m () + insertScheduledEventInvocation :: Invocation 'ScheduledType -> ScheduledEventType -> m () + setScheduledEventOp :: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> m () + unlockScheduledEvents :: ScheduledEventType -> [ScheduledEventId] -> m Int + unlockAllLockedScheduledEvents :: m () + +instance (MonadMetadataStorage m) => MonadMetadataStorage (ReaderT r m) where + getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats + getScheduledEventsForDelivery = lift getScheduledEventsForDelivery + insertScheduledEvent = lift . insertScheduledEvent + insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b + setScheduledEventOp a b c = lift $ setScheduledEventOp a b c + unlockScheduledEvents a b = lift $ unlockScheduledEvents a b + unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents + +instance (MonadMetadataStorage m) => MonadMetadataStorage (Tracing.TraceT m) where + getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats + getScheduledEventsForDelivery = lift getScheduledEventsForDelivery + insertScheduledEvent = lift . insertScheduledEvent + insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b + setScheduledEventOp a b c = lift $ setScheduledEventOp a b c + unlockScheduledEvents a b = lift $ unlockScheduledEvents a b + unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents + +{- Note [Generic MetadataStorageT transformer] +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +All methods of the MonadMetadataStorage class may fail, which we represent in +the usual way using a MonadError superclass: + + class MonadError QErr m => MonadMetadataStorage m + +However, unusually, the location where we pick a concrete MonadMetadataStorage +instance is not a context where we can handle errors, and as such the monad at +that point has no MonadError instance! Instead, clients of MonadMetadataStorage +are expected to handle errors /locally/, even though the code is parameterized +over an arbitrary metadata storage mechanism. + +To encode this, we take a slightly unorthodox approach involving the auxiliary +MetadataStorageT transformer, which is really just a wrapper around ExceptT: + + newtype MetadataStorageT m a + = MetadataStorageT { unMetadataStorageT :: ExceptT QErr m a } + +We then define MonadMetadataStorage instances on a transformer stack comprising +both MetadataStorageT and a concrete base monad: + + instance MonadMetadataStorage (MetadataStorageT PGMetadataStorageApp) + +This looks unconventional, but it allows polymorphic code to be parameterized +over the metadata storage implementation while still handling errors locally. +Such functions include a constraint of the form + + MonadMetadataStorage (MetadataStorageT m) => ... + +and use runMetadataStorageT at the location where errors should be handled, e.g.: + + result <- runMetadataStorageT do + {- ... some metadata operations ... -} + case result of + Left err -> ... + Right value -> ... + +In other words, runMetadataStorageT serves as a marker that says ā€œIā€™m going to +handle exceptions raised by metadata operations right here,ā€ which allows them +to be handled more locally than the point at which the concrete +MonadMetadataStorage instance (and thus the particular metadata storage +implementation) is actually chosen. -} + +-- | The 'MetadataStorageT' transformer adds ability to throw exceptions +-- for monads deriving @'MonadMetadataStorage' instance. +-- For more details see Note [Generic MetadataStorageT transformer] +newtype MetadataStorageT m a + = MetadataStorageT {unMetadataStorageT :: ExceptT QErr m a} + deriving ( Functor, Applicative, Monad + , MonadError QErr + , MonadTrans + , MonadIO + , MFunctor + , Tracing.HasReporter + ) + +runMetadataStorageT + :: MetadataStorageT m a -> m (Either QErr a) +runMetadataStorageT = + runExceptT . unMetadataStorageT diff --git a/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs b/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs index d001aa2bfa3dd..d214e9ae91df9 100644 --- a/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs @@ -59,7 +59,7 @@ addCronTriggerToCatalog CronTriggerMetadata {..} = liftTx $ do ,Q.AltJ ctHeaders, ctIncludeInMetadata, ctComment) False currentTime <- liftIO C.getCurrentTime let scheduleTimes = generateScheduleTimes currentTime 100 ctSchedule -- generate next 100 events - insertCronEvents $ map (CronEventSeed ctName) scheduleTimes + insertScheduledEventTx $ SESCron $ map (CronEventSeed ctName) scheduleTimes resolveCronTrigger :: (QErrM m) @@ -113,7 +113,7 @@ updateCronTriggerInCatalog CronTriggerMetadata {..} = liftTx $ do -- create the next 100 cron events, as the future events were deleted currentTime <- liftIO C.getCurrentTime let scheduleTimes = generateScheduleTimes currentTime 100 ctSchedule - insertCronEvents $ map (CronEventSeed ctName) scheduleTimes + insertScheduledEventTx $ SESCron $ map (CronEventSeed ctName) scheduleTimes runDeleteCronTrigger :: (CacheRWM m, MonadTx m) => ScheduledTriggerName -> m EncJSON runDeleteCronTrigger (ScheduledTriggerName stName) = do diff --git a/server/src-lib/Hasura/RQL/Types/Action.hs b/server/src-lib/Hasura/RQL/Types/Action.hs index d2c1dad683e61..a5de00082be53 100644 --- a/server/src-lib/Hasura/RQL/Types/Action.hs +++ b/server/src-lib/Hasura/RQL/Types/Action.hs @@ -69,8 +69,8 @@ import Hasura.RQL.DDL.Headers import Hasura.RQL.IR.Select import Hasura.RQL.Types.Common import Hasura.RQL.Types.CustomTypes -import Hasura.SQL.Backend import Hasura.Session +import Hasura.SQL.Backend newtype ActionName diff --git a/server/src-lib/Hasura/RQL/Types/EventTrigger.hs b/server/src-lib/Hasura/RQL/Types/EventTrigger.hs index c2106d0b225d4..fb49f703771c7 100644 --- a/server/src-lib/Hasura/RQL/Types/EventTrigger.hs +++ b/server/src-lib/Hasura/RQL/Types/EventTrigger.hs @@ -5,7 +5,7 @@ module Hasura.RQL.Types.EventTrigger , TriggerName(..) , triggerNameToTxt , Ops(..) - , EventId + , EventId(..) , TriggerOpsDef(..) , EventTriggerConf(..) , RetryConf(..) @@ -59,7 +59,8 @@ newtype TriggerName = TriggerName { unTriggerName :: NonEmptyText } triggerNameToTxt :: TriggerName -> Text triggerNameToTxt = unNonEmptyText . unTriggerName -type EventId = Text +newtype EventId = EventId {unEventId :: Text} + deriving (Show, Eq, Ord, Hashable, Lift, ToTxt, FromJSON, ToJSON, ToJSONKey, Q.FromCol, Q.ToPrepArg, Generic, Arbitrary, NFData, Cacheable) data Ops = INSERT | UPDATE | DELETE | MANUAL deriving (Show) diff --git a/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs b/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs index cf058154efa65..5ef70e05ea56f 100644 --- a/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs @@ -9,6 +9,17 @@ module Hasura.RQL.Types.ScheduledTrigger , OneOffScheduledEventId , formatTime' , defaultSTRetryConf + , ScheduledEventId + , InvocationId + , CronEventSeed(..) + , ScheduledEventSeed(..) + , ScheduledEventStatus(..) + , scheduledEventStatusToText + , ScheduledEventType(..) + , ScheduledEvent(..) + , ScheduledEventInvocation(..) + , OneOffScheduledEvent(..) + , CronEvent(..) ) where import Data.Aeson @@ -18,18 +29,24 @@ import Data.Time.Clock import Data.Time.Clock.Units import Data.Time.Format.ISO8601 import Hasura.Incremental -import Hasura.RQL.Types.Common (NonNegativeDiffTime, unsafeNonNegativeDiffTime) -import Hasura.RQL.Types.Action (InputWebhook(..)) import Hasura.Prelude +import Hasura.RQL.Types.Action (InputWebhook (..)) +import Hasura.RQL.Types.Common (NonNegativeDiffTime, unsafeNonNegativeDiffTime) +import Hasura.RQL.Types.EventTrigger import System.Cron.Types import qualified Data.Aeson as J import qualified Data.Text as T -import qualified Hasura.RQL.Types.EventTrigger as ET +import qualified Database.PG.Query as Q +import qualified PostgreSQL.Binary.Decoding as PD -type CronEventId = Text +type CronEventId = EventId -type OneOffScheduledEventId = Text +type OneOffScheduledEventId = EventId + +type ScheduledEventId = EventId + +type InvocationId = Text data STRetryConf = STRetryConf @@ -73,12 +90,12 @@ defaultSTRetryConf = data CronTriggerMetadata = CronTriggerMetadata - { ctName :: !ET.TriggerName + { ctName :: !TriggerName , ctWebhook :: !InputWebhook , ctSchedule :: !CronSchedule , ctPayload :: !(Maybe J.Value) , ctRetryConf :: !STRetryConf - , ctHeaders :: ![ET.HeaderConf] + , ctHeaders :: ![HeaderConf] , ctIncludeInMetadata :: !Bool , ctComment :: !(Maybe Text) } deriving (Show, Eq, Generic) @@ -103,12 +120,12 @@ $(deriveToJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''CronTriggerMeta data CreateCronTrigger = CreateCronTrigger - { cctName :: !ET.TriggerName + { cctName :: !TriggerName , cctWebhook :: !InputWebhook , cctCronSchedule :: !CronSchedule , cctPayload :: !(Maybe J.Value) , cctRetryConf :: !STRetryConf - , cctHeaders :: ![ET.HeaderConf] + , cctHeaders :: ![HeaderConf] , cctIncludeInMetadata :: !Bool , cctComment :: !(Maybe Text) , cctReplace :: !Bool @@ -134,7 +151,7 @@ instance FromJSON CreateCronTrigger where $(deriveToJSON (aesonDrop 3 snakeCase){omitNothingFields=True} ''CreateCronTrigger) newtype ScheduledTriggerName - = ScheduledTriggerName { unName :: ET.TriggerName } + = ScheduledTriggerName { unName :: TriggerName } deriving (Show, Eq) $(deriveJSON (aesonDrop 2 snakeCase) ''ScheduledTriggerName) @@ -144,15 +161,15 @@ formatTime'= T.pack . iso8601Show data CreateScheduledEvent = CreateScheduledEvent - { cseWebhook :: !InputWebhook - , cseScheduleAt :: !UTCTime + { cseWebhook :: !InputWebhook + , cseScheduleAt :: !UTCTime -- ^ The timestamp should be in the -- -- format (which is what @aeson@ expects by default for 'UTCTime'). - , csePayload :: !(Maybe J.Value) - , cseHeaders :: ![ET.HeaderConf] - , cseRetryConf :: !STRetryConf - , cseComment :: !(Maybe Text) + , csePayload :: !(Maybe J.Value) + , cseHeaders :: ![HeaderConf] + , cseRetryConf :: !STRetryConf + , cseComment :: !(Maybe Text) } deriving (Show, Eq, Generic) instance FromJSON CreateScheduledEvent where @@ -166,3 +183,115 @@ instance FromJSON CreateScheduledEvent where <*> o .:? "comment" $(deriveToJSON (aesonDrop 3 snakeCase) ''CreateScheduledEvent) + +-- | The 'ScheduledEventType' data type is needed to differentiate +-- between a 'CronScheduledEvent' and 'OneOffScheduledEvent' scheduled +-- event because they both have different configurations +-- and they live in different tables. +data ScheduledEventType = + Cron + -- ^ A Cron scheduled event has a template defined which will + -- contain the webhook, header configuration, retry + -- configuration and a payload. Every cron event created + -- uses the above mentioned configurations defined in the template. + -- The configuration defined with the cron trigger is cached + -- and hence it's not fetched along the cron scheduled events. + | OneOff + -- ^ A One-off scheduled event doesn't have any template defined + -- so all the configuration is fetched along the scheduled events. + deriving (Eq, Show) +$(deriveJSON defaultOptions{constructorTagModifier = snakeCase} ''ScheduledEventType) + +data ScheduledEventInvocation + = ScheduledEventInvocation + { _seiId :: !InvocationId + , _seiEventId :: !EventId + , _seiStatus :: !(Maybe Int) + , _seiRequest :: !(Maybe Value) + , _seiResponse :: !(Maybe Value) + , _seiCreatedAt :: !UTCTime + } deriving (Show, Eq) +$(deriveJSON (aesonDrop 4 snakeCase) ''ScheduledEventInvocation) + +data ScheduledEvent + = SEOneOff + | SECron !TriggerName + deriving (Show, Eq) + +data CronEventSeed + = CronEventSeed + { cesName :: !TriggerName + , cesScheduledTime :: !UTCTime + } deriving (Show, Eq) + +data ScheduledEventSeed + = SESCron ![CronEventSeed] + | SESOneOff !CreateScheduledEvent + deriving (Show, Eq) + +data ScheduledEventStatus + = SESScheduled + | SESLocked + | SESDelivered + | SESError + | SESDead + deriving (Show, Eq) + +scheduledEventStatusToText :: ScheduledEventStatus -> Text +scheduledEventStatusToText SESScheduled = "scheduled" +scheduledEventStatusToText SESLocked = "locked" +scheduledEventStatusToText SESDelivered = "delivered" +scheduledEventStatusToText SESError = "error" +scheduledEventStatusToText SESDead = "dead" + +textToScheduledEventStatus :: Text -> Maybe ScheduledEventStatus +textToScheduledEventStatus = \case + "scheduled" -> Just SESScheduled + "locked" -> Just SESLocked + "delivered" -> Just SESDelivered + "error" -> Just SESError + "dead" -> Just SESDead + _ -> Nothing + +instance Q.ToPrepArg ScheduledEventStatus where + toPrepVal = Q.toPrepVal . scheduledEventStatusToText + +instance Q.FromCol ScheduledEventStatus where + fromCol bs = + flip Q.fromColHelper bs $ PD.enum textToScheduledEventStatus + +instance ToJSON ScheduledEventStatus where + toJSON = String . scheduledEventStatusToText + +instance FromJSON ScheduledEventStatus where + parseJSON = withText "String" $ \s -> onNothing (textToScheduledEventStatus s) $ + fail $ T.unpack $ "unexpected status: " <> s + +data OneOffScheduledEvent + = OneOffScheduledEvent + { _ooseId :: !OneOffScheduledEventId + , _ooseWebhookConf :: !InputWebhook + , _ooseScheduledTime :: !UTCTime + , _ooseRetryConf :: !STRetryConf + , _oosePayload :: !(Maybe Value) + , _ooseHeaderConf :: ![HeaderConf] + , _ooseStatus :: !Text + , _ooseTries :: !Int + , _ooseCreatedAt :: !UTCTime + , _ooseNextRetryAt :: !(Maybe UTCTime) + , _ooseComment :: !(Maybe Text) + } deriving (Show, Eq) +$(deriveJSON (aesonDrop 5 snakeCase) ''OneOffScheduledEvent) + +data CronEvent + = CronEvent + { _ceId :: !CronEventId + , _ceTriggerName :: !TriggerName + , _ceScheduledTime :: !UTCTime + , _ceStatus :: !Text + , _ceTries :: !Int + , _ceCreatedAt :: !UTCTime + -- ^ it is the time at which the cron event generator created the event + , _ceNextRetryAt :: !(Maybe UTCTime) + } deriving (Show, Eq) +$(deriveJSON (aesonDrop 3 snakeCase) ''CronEvent) diff --git a/server/src-lib/Hasura/Server/API/Query.hs b/server/src-lib/Hasura/Server/API/Query.hs index a9e29262a2eeb..47ccc6623c579 100644 --- a/server/src-lib/Hasura/Server/API/Query.hs +++ b/server/src-lib/Hasura/Server/API/Query.hs @@ -37,7 +37,7 @@ import Hasura.RQL.DML.Types import Hasura.RQL.DML.Update import Hasura.RQL.Types import Hasura.RQL.Types.Run -import Hasura.Server.Init (InstanceId (..)) +import Hasura.Server.Types (InstanceId (..)) import Hasura.Server.Utils import Hasura.Server.Version (HasVersion) import Hasura.Session diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index 302751b796fe3..f484c80d20694 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -65,6 +65,7 @@ import Hasura.Server.Cors import Hasura.Server.Init import Hasura.Server.Logging import Hasura.Server.Middleware (corsMiddleware) +import Hasura.Server.Types import Hasura.Server.Utils import Hasura.Server.Version import Hasura.Session diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index 0a4f77457374b..7051cc740ff26 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -35,16 +35,11 @@ import Hasura.Server.Auth import Hasura.Server.Cors import Hasura.Server.Init.Config import Hasura.Server.Logging +import Hasura.Server.Types import Hasura.Server.Utils import Hasura.Session import Network.URI (parseURI) -newtype DbUid - = DbUid { getDbUid :: Text } - deriving (Show, Eq, J.ToJSON, J.FromJSON) - -newtype PGVersion = PGVersion { unPGVersion :: Int } deriving (Show, Eq, J.ToJSON) - getDbId :: Q.TxE QErr Text getDbId = (runIdentity . Q.getRow) <$> @@ -56,10 +51,6 @@ getDbId = getPgVersion :: Q.TxE QErr PGVersion getPgVersion = PGVersion <$> Q.serverVersion -newtype InstanceId - = InstanceId { getInstanceId :: Text } - deriving (Show, Eq, J.ToJSON, J.FromJSON, Q.FromCol, Q.ToPrepArg) - generateInstanceId :: IO InstanceId generateInstanceId = InstanceId <$> generateFingerprint diff --git a/server/src-lib/Hasura/Server/Logging.hs b/server/src-lib/Hasura/Server/Logging.hs index 2bd67568fdd16..18d0cdb57107e 100644 --- a/server/src-lib/Hasura/Server/Logging.hs +++ b/server/src-lib/Hasura/Server/Logging.hs @@ -30,7 +30,7 @@ import Hasura.HTTP import Hasura.Logging import Hasura.RQL.Types import Hasura.Server.Compression -import Hasura.Server.Utils +import Hasura.Server.Types import Hasura.Session import Hasura.Tracing (TraceT) diff --git a/server/src-lib/Hasura/Server/SchemaUpdate.hs b/server/src-lib/Hasura/Server/SchemaUpdate.hs index e8003fcd786df..6f41d7511428d 100644 --- a/server/src-lib/Hasura/Server/SchemaUpdate.hs +++ b/server/src-lib/Hasura/Server/SchemaUpdate.hs @@ -13,8 +13,8 @@ import Hasura.RQL.DDL.Schema (runCacheRWT) import Hasura.RQL.Types import Hasura.RQL.Types.Run import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate) -import Hasura.Server.Init (InstanceId (..)) import Hasura.Server.Logging +import Hasura.Server.Types (InstanceId (..)) import Hasura.Session import Data.Aeson diff --git a/server/src-lib/Hasura/Server/Telemetry.hs b/server/src-lib/Hasura/Server/Telemetry.hs index c7be406af5991..69bec3ced462c 100644 --- a/server/src-lib/Hasura/Server/Telemetry.hs +++ b/server/src-lib/Hasura/Server/Telemetry.hs @@ -18,8 +18,8 @@ import Hasura.HTTP import Hasura.Logging import Hasura.Prelude import Hasura.RQL.Types -import Hasura.Server.Init import Hasura.Server.Telemetry.Counters +import Hasura.Server.Types import Hasura.Server.Version import Hasura.Session diff --git a/server/src-lib/Hasura/Server/Types.hs b/server/src-lib/Hasura/Server/Types.hs new file mode 100644 index 0000000000000..4af000564ddad --- /dev/null +++ b/server/src-lib/Hasura/Server/Types.hs @@ -0,0 +1,23 @@ +module Hasura.Server.Types where + +import Hasura.Prelude + +import Data.Aeson + +import qualified Database.PG.Query as Q + +newtype RequestId + = RequestId { unRequestId :: Text } + deriving (Show, Eq, ToJSON, FromJSON) + +newtype DbUid + = DbUid { getDbUid :: Text } + deriving (Show, Eq, ToJSON, FromJSON) + +newtype PGVersion + = PGVersion { unPGVersion :: Int } + deriving (Show, Eq, ToJSON) + +newtype InstanceId + = InstanceId { getInstanceId :: Text } + deriving (Show, Eq, ToJSON, FromJSON, Q.FromCol, Q.ToPrepArg) diff --git a/server/src-lib/Hasura/Server/Utils.hs b/server/src-lib/Hasura/Server/Utils.hs index 98710575e43a7..918a3ba5b19c4 100644 --- a/server/src-lib/Hasura/Server/Utils.hs +++ b/server/src-lib/Hasura/Server/Utils.hs @@ -7,6 +7,7 @@ import Data.Aeson import Data.Aeson.Internal import Data.Char import Data.Text.Extended +import Hasura.Server.Types import Language.Haskell.TH.Syntax (Lift, Q, TExp) import System.Environment import System.Exit @@ -31,10 +32,6 @@ import qualified Text.Regex.TDFA.TDFA as TDFA import Hasura.RQL.Instances () -newtype RequestId - = RequestId { unRequestId :: Text } - deriving (Show, Eq, ToJSON, FromJSON) - jsonHeader :: HTTP.Header jsonHeader = ("Content-Type", "application/json; charset=utf-8")