Skip to content

Commit

Permalink
server: expand metadata storage class with async actions and core met…
Browse files Browse the repository at this point in the history
…adata operations (#184)

An incremental PR towards #5797
- Expands `MonadMetadataStorage` with operations related to async actions and setting/updating metadata
  • Loading branch information
rakeshkky authored Dec 14, 2020
1 parent d4c0370 commit 53386b7
Show file tree
Hide file tree
Showing 32 changed files with 785 additions and 420 deletions.
21 changes: 12 additions & 9 deletions pro/server/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import qualified System.Metrics as EKG
import qualified System.Posix.Signals as Signals

import qualified Hasura.App as HGE
import qualified Hasura.Metadata.Class as HGE
import qualified Hasura.RQL.DDL.Schema.Cache as HGE
import qualified Hasura.RQL.DDL.Schema.Catalog as HGE
import qualified Hasura.RQL.Types as HGE
Expand Down Expand Up @@ -68,7 +69,7 @@ runApp env (ProOptionsG (HGEOptionsG rci command) ProEditionOptions{..}) = do
{ _policyConfigCacheSizeLimit = Just (1000 * 1000)
, _policyConfigCacheMaxTtl = Just 600
, _policyConfigMaxReqsPerMin = Nothing
, _policyConfigRequestAllocationLimit = Nothing
, _policyConfigRequestAllocationLimit = Nothing
}
getPoliciesConfig = pure $ Just proPoliciesConfig

Expand Down Expand Up @@ -109,14 +110,16 @@ runApp env (ProOptionsG (HGEOptionsG rci command) ProEditionOptions{..}) = do
queryBs <- liftIO BL.getContents
let sqlGenCtx = HGE.SQLGenCtx False
pool <- mkMinimalPool _gcConnInfo
res <- HGE.runAsAdmin pool sqlGenCtx _gcHttpManager $ do
schemaCache <- HGE.buildRebuildableSchemaCache env
metadata <- HGE.liftTx HGE.fetchMetadataFromCatalog
HGE.execQuery env queryBs
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
& HGE.runMetadataT metadata
& HGE.runCacheRWT schemaCache
& fmap (\((res, _), _, _) -> res)
res <- flip HGE.runPGMetadataStorageApp pool $
HGE.runMetadataStorageT $ liftEitherM $
HGE.runAsAdmin pool sqlGenCtx _gcHttpManager $ do
metadata <- HGE.liftTx HGE.fetchMetadataFromCatalog
schemaCache <- HGE.buildRebuildableSchemaCache env metadata
HGE.execQuery env queryBs
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
& HGE.runMetadataT metadata
& HGE.runCacheRWT schemaCache
& fmap (\((res, _), _, _) -> res)
either (HGE.printErrJExit HGE.ExecuteProcessError) (liftIO . BL.putStrLn) res

HCDowngrade opts -> do
Expand Down
39 changes: 28 additions & 11 deletions pro/server/src/HasuraPro/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ import qualified Hasura.Backends.Postgres.SQL.Types as HGE
import qualified Hasura.Eventing.ScheduledTrigger as HGE
import qualified Hasura.GraphQL.Context as Context
import qualified Hasura.GraphQL.Execute as HGE
import qualified Hasura.GraphQL.Execute.Action as HGE
import qualified Hasura.GraphQL.Execute.Query as EQ
import qualified Hasura.GraphQL.Logging as HGE
import qualified Hasura.GraphQL.Parser.Column as Column
import qualified Hasura.GraphQL.Transport.HTTP as HGE
import qualified Hasura.GraphQL.Transport.WebSocket.Server as HGE
import qualified Hasura.Logging as L
import qualified Hasura.Metadata.Class as HGE
import qualified Hasura.RQL.DDL.Schema.Catalog as HGE
import qualified Hasura.RQL.IR.RemoteJoin as RJ
import qualified Hasura.RQL.Types as HGE
import qualified Hasura.Server.API.Query as HGE
Expand Down Expand Up @@ -421,17 +423,19 @@ getProProjectConfig = do
liftIO $ STM.atomically $ StmTVar.readTVar projectConfigTVar

instance HGE.MetadataApiAuthorization AppM where
authorizeMetadataApi query userInfo = do
manager <- HGE.scManager <$> asks HGE.hcServerCtx
proLogger <- lift $ lift $ asks _acProLogger
jwkSetRef <- lift $ lift $ asks _acJwkSet
authorizeMetadataApi query hgeHandlerCtx = do
let userInfo = HGE.hcUser hgeHandlerCtx
manager = HGE.scManager $ HGE.hcServerCtx hgeHandlerCtx
proLogger <- asks _acProLogger
jwkSetRef <- asks _acJwkSet
jwkSet <- liftIO $ IORef.readIORef jwkSetRef
proCreds <- lift $ lift $ asks _acProCreds
exOpts <- lift $ lift $ asks _acProExtraOpts
metadataAuthzMiddleware proLogger manager proCreds exOpts jwkSet query
let currRole = HGE._uiRole userInfo
when (HGE.requiresAdmin query && currRole /= HGE.adminRoleName) $
HGE.withPathK "args" $ HGE.throw400 HGE.AccessDenied errMsg
proCreds <- asks _acProCreds
exOpts <- asks _acProExtraOpts
runExceptT do
metadataAuthzMiddleware proLogger manager proCreds exOpts jwkSet query hgeHandlerCtx
let currRole = HGE._uiRole userInfo
when (HGE.requiresAdmin query && currRole /= HGE.adminRoleName) $
HGE.withPathK "args" $ HGE.throw400 HGE.AccessDenied errMsg
where
errMsg = "restricted access : admin only"

Expand Down Expand Up @@ -557,13 +561,26 @@ instance HGE.MonadMetadataStorage (HGE.MetadataStorageT AppM) where
-- TODO: Change the following to pro/cloud specific implementation in subsequent incremental PRs
-- which introduces the schema migration for metadata separation.

fetchMetadata = runInSeparateTx HGE.fetchMetadataFromCatalog
setMetadata = runInSeparateTx . HGE.setMetadataInCatalog
notifySchemaCacheSync a b = runInSeparateTx $ HGE.notifySchemaCacheSyncTx a b
processSchemaSyncEventPayload instanceId payload = do
HGE.EventPayload{..} <- HGE.decodeValue payload
pure $ HGE.SchemaSyncEventProcessResult (instanceId /= _epInstanceId) _epInvalidations

getDeprivedCronTriggerStats = runInSeparateTx HGE.getDeprivedCronTriggerStatsTx
getScheduledEventsForDelivery = runInSeparateTx HGE.getScheduledEventsForDeliveryTx
insertScheduledEvent = runInSeparateTx . HGE.insertScheduledEventTx
insertScheduledEventInvocation a b = runInSeparateTx $ HGE.insertInvocationTx a b
setScheduledEventOp a b c = runInSeparateTx $ HGE.setScheduledEventOpTx a b c
unlockScheduledEvents a b = runInSeparateTx $ HGE.unlockScheduledEventsTx a b
unlockAllLockedScheduledEvents = runInSeparateTx HGE.unlockAllLockedScheduledEventsTx
clearFutureCronEvents = runInSeparateTx . HGE.dropFutureCronEventsTx

insertAction a b c d = runInSeparateTx $ HGE.insertActionTx a b c d
fetchUndeliveredActionEvents = runInSeparateTx HGE.fetchUndeliveredActionEventsTx
setActionStatus a b = runInSeparateTx $ HGE.setActionStatusTx a b
fetchActionResponse = runInSeparateTx . HGE.fetchActionResponseTx

-- | creates the 'HGE.PGExecCtx' type from given master and replica pools
mkProPGExecCtx :: L.Logger HasuraPro -> (Q.PGPool, NE.NonEmpty Q.PGPool) -> Q.TxIsolation -> HGE.PGExecCtx
Expand Down Expand Up @@ -646,7 +663,7 @@ initialiseProCatalog pool = do
-- | a separate function to create the initialization context because some of
-- these contexts might be used by external functions
hgeProInitialiseCtx
:: ( HGE.HasVersion, MonadIO m, MonadCatch m)
:: ( HGE.HasVersion, MonadIO m, MonadCatch m, MonadBaseControl IO m)
=> Maybe TenantId
-> Maybe ControlWorkerId
-> E.Environment
Expand Down
7 changes: 4 additions & 3 deletions pro/server/src/HasuraPro/Authorization.hs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ metadataAuthzMiddleware
-> LuxEndpoints
-> Either JWKSetError Jose.JWKSet
-> HGE.RQLQuery
-> HGE.Handler m ()
metadataAuthzMiddleware logger httpManager proCreds exOpts jwkSetE metadataQuery = do
reqHeaders <- asks HGE.hcReqHeaders
-> HGE.HandlerCtx
-> ExceptT HGE.QErr m ()
metadataAuthzMiddleware logger httpManager proCreds exOpts jwkSetE metadataQuery handlerCtx = do
let reqHeaders = HGE.hcReqHeaders handlerCtx
userInfo <- flip runReaderT (httpManager, logger, proCreds, exOpts) $ resolveProUserInfo jwkSetE reqHeaders
let maybeAuthorized = authorizeRqlQuery metadataQuery <$> userInfo
onJust maybeAuthorized $ \authorized ->
Expand Down
19 changes: 11 additions & 8 deletions server/src-exec/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Data.Time.Clock.POSIX (getPOSIXTime)

import Hasura.App
import Hasura.Logging (Hasura, LogLevel (..), defaultEnabledEngineLogTypes)
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema
import Hasura.RQL.Types
Expand Down Expand Up @@ -85,14 +86,16 @@ runApp env (HGEOptionsG rci hgeCmd) = do
queryBs <- liftIO BL.getContents
let sqlGenCtx = SQLGenCtx False
pool <- mkMinimalPool _gcConnInfo
res <- runAsAdmin pool sqlGenCtx _gcHttpManager $ do
schemaCache <- buildRebuildableSchemaCache env
metadata <- liftTx fetchMetadataFromCatalog
execQuery env queryBs
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
& runMetadataT metadata
& runCacheRWT schemaCache
& fmap (\((res, _), _, _) -> res)
res <- flip runPGMetadataStorageApp pool $
runMetadataStorageT $ liftEitherM $
runAsAdmin pool sqlGenCtx _gcHttpManager $ do
metadata <- liftTx fetchMetadataFromCatalog
schemaCache <- buildRebuildableSchemaCache env metadata
execQuery env queryBs
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
& runMetadataT metadata
& runCacheRWT schemaCache
& fmap (\((res, _), _, _) -> res)
either (printErrJExit ExecuteProcessError) (liftIO . BLC.putStrLn) res

HCDowngrade opts -> do
Expand Down
70 changes: 50 additions & 20 deletions server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import Control.Monad.Stateless
import Control.Monad.STM (atomically)
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Control.Monad.Unique
import Data.Aeson ((.=))
import Data.Time.Clock (UTCTime)
#ifndef PROFILING
import GHC.AssertNF
Expand All @@ -26,6 +25,7 @@ import System.Mem (performMajorGC)
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Concurrent.Extended as C
import qualified Control.Immortal as Immortal
import qualified Data.Aeson as J
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Char8 as BLC
Expand All @@ -48,7 +48,7 @@ import Hasura.Eventing.EventTrigger
import Hasura.Eventing.ScheduledTrigger
import Hasura.GraphQL.Execute (MonadGQLExecutionCheck (..),
checkQueryInAllowlist)
import Hasura.GraphQL.Execute.Action (asyncActionsProcessor)
import Hasura.GraphQL.Execute.Action
import Hasura.GraphQL.Execute.Query (MonadQueryInstrumentation (..),
noProfile)
import Hasura.GraphQL.Logging (MonadQueryLog (..), QueryLog (..))
Expand All @@ -58,6 +58,7 @@ import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema.Cache
import Hasura.RQL.DDL.Schema.Catalog
import Hasura.RQL.Types
import Hasura.RQL.Types.Run
import Hasura.Server.API.Query (requiresAdmin, runQueryM)
Expand Down Expand Up @@ -178,7 +179,7 @@ data ServeCtx
, _scConnInfo :: !Q.ConnInfo
, _scPgPool :: !Q.PGPool
, _scShutdownLatch :: !ShutdownLatch
, _scSchemaCache :: !(RebuildableSchemaCache Run)
, _scSchemaCache :: !RebuildableSchemaCache
, _scSchemaSyncCtx :: !SchemaSyncCtx
}

Expand All @@ -202,7 +203,7 @@ newtype PGMetadataStorageApp a

-- | Initializes or migrates the catalog and returns the context required to start the server.
initialiseServeCtx
:: (HasVersion, MonadIO m, MonadCatch m)
:: (HasVersion, MonadIO m, MonadBaseControl IO m, MonadCatch m)
=> Env.Environment
-> GlobalCtx
-> ServeOptions Hasura
Expand Down Expand Up @@ -243,9 +244,9 @@ mkLoggers enabledLogs logLevel = do

-- | helper function to initialize or migrate the @hdb_catalog@ schema (used by pro as well)
migrateCatalogSchema
:: (HasVersion, MonadIO m)
:: (HasVersion, MonadIO m, MonadBaseControl IO m)
=> Env.Environment -> Logger Hasura -> Q.PGPool -> HTTP.Manager -> SQLGenCtx
-> m (RebuildableSchemaCache Run, UTCTime)
-> m (RebuildableSchemaCache, UTCTime)
migrateCatalogSchema env logger pool httpManager sqlGenCtx = do
let pgExecCtx = mkPGExecCtx Q.Serializable pool
adminRunCtx = RunCtx adminUserInfo httpManager sqlGenCtx
Expand Down Expand Up @@ -413,7 +414,7 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po

-- start a backgroud thread to handle async actions
asyncActionsThread <- C.forkImmortal "asyncActionsProcessor" logger $
asyncActionsProcessor env logger (_scrCache cacheRef) _scPgPool _scHttpManager
asyncActionsProcessor env logger (_scrCache cacheRef) _scHttpManager

-- start a background thread to create new cron events
cronEventsThread <- C.forkImmortal "runCronEventsGenerator" logger $
Expand Down Expand Up @@ -461,16 +462,16 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
shutdownHandler' <- liftWithStateless $ \lowerIO ->
pure $ shutdownHandler _scLoggers immortalThreads stopWsServer lockedEventsCtx _scPgPool $
\a b -> hoist lowerIO $ unlockScheduledEvents a b

-- Install a variant of forkIOWithUnmask which tracks Warp threads using an EKG metric
let setForkIOWithMetrics :: Warp.Settings -> Warp.Settings
setForkIOWithMetrics = Warp.setFork \f -> do
void $ C.forkIOWithUnmask (\unmask ->
bracket_
bracket_
(EKG.Gauge.inc $ smWarpThreads serverMetrics)
(EKG.Gauge.dec $ smWarpThreads serverMetrics)
(f unmask))

let warpSettings = Warp.setPort soPort
. Warp.setHost soHost
. Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown
Expand Down Expand Up @@ -625,11 +626,11 @@ ourIdleGC (Logger logger) idleInterval minGCInterval maxNoGCInterval =
go gcs major_gcs timerSinceLastMajorGC

runAsAdmin
:: (MonadIO m)
:: (MonadIO m, MonadBaseControl IO m)
=> Q.PGPool
-> SQLGenCtx
-> HTTP.Manager
-> Run a
-> RunT m a
-> m (Either QErr a)
runAsAdmin pool sqlGenCtx httpManager m = do
let runCtx = RunCtx adminUserInfo httpManager sqlGenCtx
Expand All @@ -647,6 +648,7 @@ execQuery
, UserInfoM m
, Tracing.MonadTrace m
, MetadataM m
, MonadScheduledEvents m
)
=> Env.Environment
-> BLC.ByteString
Expand Down Expand Up @@ -684,8 +686,8 @@ instance UserAuthentication (Tracing.TraceT PGMetadataStorageApp) where
runExceptT $ getUserInfoWithExpTime logger manager headers authMode

instance MetadataApiAuthorization PGMetadataStorageApp where
authorizeMetadataApi query userInfo = do
let currRole = _uiRole userInfo
authorizeMetadataApi query handlerCtx = runExceptT do
let currRole = _uiRole $ hcUser handlerCtx
when (requiresAdmin query && currRole /= adminRoleName) $
withPathK "args" $ throw400 AccessDenied errMsg
where
Expand Down Expand Up @@ -716,31 +718,59 @@ runInSeparateTx tx = do
pool <- lift ask
liftEitherM $ liftIO $ runExceptT $ Q.runTx pool (Q.RepeatableRead, Nothing) tx

-- | Using @pg_notify@ function to publish schema sync events to other server
-- instances via 'hasura_schema_update' channel.
-- See Note [Schema Cache Sync]
notifySchemaCacheSyncTx :: InstanceId -> CacheInvalidations -> Q.TxE QErr ()
notifySchemaCacheSyncTx instanceId invalidations = do
Q.Discard () <- Q.withQE defaultTxErrorHandler [Q.sql|
SELECT pg_notify('hasura_schema_update', json_build_object(
'instance_id', $1,
'occurred_at', NOW(),
'invalidations', $2
)::text
)
|] (instanceId, Q.AltJ invalidations) True
pure ()

-- | Each of the function in the type class is executed in a totally separate transaction.
--
-- To learn more about why the instance is derived as following, see Note [Generic MetadataStorageT transformer]
instance MonadMetadataStorage (MetadataStorageT PGMetadataStorageApp) where

fetchMetadata = runInSeparateTx fetchMetadataFromCatalog
setMetadata = runInSeparateTx . setMetadataInCatalog
notifySchemaCacheSync a b = runInSeparateTx $ notifySchemaCacheSyncTx a b
processSchemaSyncEventPayload instanceId payload = do
EventPayload{..} <- decodeValue payload
pure $ SchemaSyncEventProcessResult (instanceId /= _epInstanceId) _epInvalidations

getDeprivedCronTriggerStats = runInSeparateTx getDeprivedCronTriggerStatsTx
getScheduledEventsForDelivery = runInSeparateTx getScheduledEventsForDeliveryTx
insertScheduledEvent = runInSeparateTx . insertScheduledEventTx
insertScheduledEventInvocation a b = runInSeparateTx $ insertInvocationTx a b
setScheduledEventOp a b c = runInSeparateTx $ setScheduledEventOpTx a b c
unlockScheduledEvents a b = runInSeparateTx $ unlockScheduledEventsTx a b
unlockAllLockedScheduledEvents = runInSeparateTx unlockAllLockedScheduledEventsTx
clearFutureCronEvents = runInSeparateTx . dropFutureCronEventsTx

insertAction a b c d = runInSeparateTx $ insertActionTx a b c d
fetchUndeliveredActionEvents = runInSeparateTx fetchUndeliveredActionEventsTx
setActionStatus a b = runInSeparateTx $ setActionStatusTx a b
fetchActionResponse = runInSeparateTx . fetchActionResponseTx

--- helper functions ---

mkConsoleHTML :: HasVersion => Text -> AuthMode -> Bool -> Maybe Text -> Either String Text
mkConsoleHTML path authMode enableTelemetry consoleAssetsDir =
renderHtmlTemplate consoleTmplt $
-- variables required to render the template
A.object [ "isAdminSecretSet" .= isAdminSecretSet authMode
, "consolePath" .= consolePath
, "enableTelemetry" .= boolToText enableTelemetry
, "cdnAssets" .= boolToText (isNothing consoleAssetsDir)
, "assetsVersion" .= consoleAssetsVersion
, "serverVersion" .= currentVersion
A.object [ "isAdminSecretSet" J..= isAdminSecretSet authMode
, "consolePath" J..= consolePath
, "enableTelemetry" J..= boolToText enableTelemetry
, "cdnAssets" J..= boolToText (isNothing consoleAssetsDir)
, "assetsVersion" J..= consoleAssetsVersion
, "serverVersion" J..= currentVersion
]
where
consolePath = case path of
Expand Down
2 changes: 1 addition & 1 deletion server/src-lib/Hasura/Backends/Postgres/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ import Hasura.Backends.Postgres.SQL.Error
import Hasura.Backends.Postgres.SQL.Types
import Hasura.EncJSON
import Hasura.RQL.Types.Error
import Hasura.SQL.Types
import Hasura.Session
import Hasura.SQL.Types

type RunTx =
forall m a. (MonadIO m, MonadBaseControl IO m) => Q.TxET QErr m a -> ExceptT QErr m a
Expand Down
Loading

0 comments on commit 53386b7

Please sign in to comment.