-
-
Notifications
You must be signed in to change notification settings - Fork 999
/
App.hs
270 lines (227 loc) · 13.5 KB
/
App.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
{-|
Module : PostgREST.App
Description : PostgREST main application
This module is in charge of mapping HTTP requests to PostgreSQL queries.
Some of its functionality includes:
- Mapping HTTP request methods to proper SQL statements. For example, a GET request is translated to executing a SELECT query in a read-only TRANSACTION.
- Producing HTTP Headers according to RFCs.
- Content Negotiation
-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
module PostgREST.App
( postgrest
, run
) where
import Control.Monad.Except (liftEither)
import Data.Either.Combinators (mapLeft)
import Data.Maybe (fromJust)
import Data.String (IsString (..))
import Network.Wai.Handler.Warp (defaultSettings, setHost, setPort,
setServerName)
import qualified Data.HashMap.Strict as HM
import qualified Data.Text.Encoding as T
import qualified Hasql.Transaction.Sessions as SQL
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
import qualified PostgREST.Admin as Admin
import qualified PostgREST.ApiRequest as ApiRequest
import qualified PostgREST.ApiRequest.Types as ApiRequestTypes
import qualified PostgREST.AppState as AppState
import qualified PostgREST.Auth as Auth
import qualified PostgREST.Cors as Cors
import qualified PostgREST.Error as Error
import qualified PostgREST.Logger as Logger
import qualified PostgREST.Plan as Plan
import qualified PostgREST.Query as Query
import qualified PostgREST.Response as Response
import qualified PostgREST.Unix as Unix (installSignalHandlers)
import PostgREST.ApiRequest (Action (..), ApiRequest (..),
Mutation (..), Target (..))
import PostgREST.AppState (AppState)
import PostgREST.Auth (AuthResult (..))
import PostgREST.Config (AppConfig (..))
import PostgREST.Config.PgVersion (PgVersion (..))
import PostgREST.Error (Error)
import PostgREST.Observation (Observation (..))
import PostgREST.Query (DbHandler)
import PostgREST.Response.Performance (ServerTiming (..),
serverTimingHeader)
import PostgREST.SchemaCache (SchemaCache (..))
import PostgREST.SchemaCache.Routine (Routine (..))
import PostgREST.Version (docsVersion, prettyVersion)
import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import qualified Network.HTTP.Types as HTTP
import qualified Network.Socket as NS
import Protolude hiding (Handler)
import System.TimeIt (timeItT)
type Handler = ExceptT Error
run :: AppState -> (Observation -> IO ()) -> IO ()
run appState observer = do
observer $ AppStartObs prettyVersion
conf@AppConfig{..} <- AppState.getConfig appState
AppState.connectionWorker appState -- Loads the initial SchemaCache
Unix.installSignalHandlers (AppState.getMainThreadId appState) (AppState.connectionWorker appState) (AppState.reReadConfig False appState observer)
-- reload schema cache + config on NOTIFY
AppState.runListener conf appState observer
Admin.runAdmin conf appState (serverSettings conf) observer
let app = postgrest conf appState (AppState.connectionWorker appState) observer
case configServerUnixSocket of
Just path -> do
observer $ AppServerUnixObs path
Nothing -> do
port <- NS.socketPort $ AppState.getSocketREST appState
observer $ AppServerPortObs port
Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) app
serverSettings :: AppConfig -> Warp.Settings
serverSettings AppConfig{..} =
defaultSettings
& setHost (fromString $ toS configServerHost)
& setPort configServerPort
& setServerName ("postgrest/" <> prettyVersion)
-- | PostgREST application
postgrest :: AppConfig -> AppState.AppState -> IO () -> (Observation -> IO ()) -> Wai.Application
postgrest conf appState connWorker observer =
traceHeaderMiddleware conf .
Cors.middleware (configServerCorsAllowedOrigins conf) .
Auth.middleware appState .
Logger.middleware (configLogLevel conf) $
-- fromJust can be used, because the auth middleware will **always** add
-- some AuthResult to the vault.
\req respond -> case fromJust $ Auth.getResult req of
Left err -> respond $ Error.errorResponseFor err
Right authResult -> do
appConf <- AppState.getConfig appState -- the config must be read again because it can reload
maybeSchemaCache <- AppState.getSchemaCache appState
pgVer <- AppState.getPgVersion appState
let
eitherResponse :: IO (Either Error Wai.Response)
eitherResponse =
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req observer
response <- either Error.errorResponseFor identity <$> eitherResponse
-- Launch the connWorker when the connection is down. The postgrest
-- function can respond successfully (with a stale schema cache) before
-- the connWorker is done.
when (isServiceUnavailable response) connWorker
resp <- do
delay <- AppState.getRetryNextIn appState
return $ addRetryHint delay response
respond resp
postgrestResponse
:: AppState.AppState
-> AppConfig
-> Maybe SchemaCache
-> PgVersion
-> AuthResult
-> Wai.Request
-> (Observation -> IO ())
-> Handler IO Wai.Response
postgrestResponse appState conf@AppConfig{..} maybeSchemaCache pgVer authResult@AuthResult{..} req observer = do
sCache <-
case maybeSchemaCache of
Just sCache ->
return sCache
Nothing ->
throwError Error.NoSchemaCacheError
body <- lift $ Wai.strictRequestBody req
(parseTime, apiRequest) <-
calcTiming configServerTimingEnabled $
liftEither . mapLeft Error.ApiRequestError $
ApiRequest.userApiRequest conf req body sCache
let jwtTime = if configServerTimingEnabled then Auth.getJwtDur req else Nothing
handleRequest authResult conf appState (Just authRole /= configDbAnonRole) configDbPreparedStatements pgVer apiRequest sCache jwtTime parseTime observer
runDbHandler :: AppState.AppState -> AppConfig -> SQL.IsolationLevel -> SQL.Mode -> Bool -> Bool -> (Observation -> IO ()) -> DbHandler b -> Handler IO b
runDbHandler appState config isoLvl mode authenticated prepared observer handler = do
dbResp <- lift $ do
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction
AppState.usePool appState config (transaction isoLvl mode $ runExceptT handler) observer
resp <-
liftEither . mapLeft Error.PgErr $
mapLeft (Error.PgError authenticated) dbResp
liftEither resp
handleRequest :: AuthResult -> AppConfig -> AppState.AppState -> Bool -> Bool -> PgVersion -> ApiRequest -> SchemaCache ->
Maybe Double -> Maybe Double -> (Observation -> IO ()) -> Handler IO Wai.Response
handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@ApiRequest{..} sCache jwtTime parseTime observer =
case (iAction, iTarget) of
(ActionRead headersOnly, TargetIdent identifier) -> do
(planTime', wrPlan) <- withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.wrTxMode wrPlan) $ Query.readQuery wrPlan conf apiReq
(respTime', pgrst) <- withTiming $ liftEither $ Response.readResponse wrPlan headersOnly identifier apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
(ActionMutate MutationCreate, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationCreate apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.createQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.createResponse identifier mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
(ActionMutate MutationUpdate, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationUpdate apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.updateQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.updateResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
(ActionMutate MutationSingleUpsert, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationSingleUpsert apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.singleUpsertQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.singleUpsertResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
(ActionMutate MutationDelete, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationDelete apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.deleteQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.deleteResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
(ActionInvoke invMethod, TargetProc identifier _) -> do
(planTime', cPlan) <- withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq invMethod
(txTime', resultSet) <- withTiming $ runQuery (fromMaybe roleIsoLvl $ pdIsoLvl (Plan.crProc cPlan)) (pdFuncSettings $ Plan.crProc cPlan) (Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
(respTime', pgrst) <- withTiming $ liftEither $ Response.invokeResponse cPlan invMethod (Plan.crProc cPlan) apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
(ActionInspect headersOnly, TargetDefaultSpec tSchema) -> do
(planTime', iPlan) <- withTiming $ liftEither $ Plan.inspectPlan apiReq
(txTime', oaiResult) <- withTiming $ runQuery roleIsoLvl mempty (Plan.ipTxmode iPlan) $ Query.openApiQuery sCache pgVer conf tSchema
(respTime', pgrst) <- withTiming $ liftEither $ Response.openApiResponse (T.decodeUtf8 prettyVersion, docsVersion) headersOnly oaiResult conf sCache iSchema iNegotiatedByProfile
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
(ActionInfo, TargetIdent identifier) -> do
(respTime', pgrst) <- withTiming $ liftEither $ Response.infoIdentResponse identifier sCache
return $ pgrstResponse (ServerTiming jwtTime parseTime Nothing Nothing respTime') pgrst
(ActionInfo, TargetProc identifier _) -> do
(planTime', cPlan) <- withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq ApiRequest.InvHead
(respTime', pgrst) <- withTiming $ liftEither $ Response.infoProcResponse (Plan.crProc cPlan)
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' Nothing respTime') pgrst
(ActionInfo, TargetDefaultSpec _) -> do
(respTime', pgrst) <- withTiming $ liftEither Response.infoRootResponse
return $ pgrstResponse (ServerTiming jwtTime parseTime Nothing Nothing respTime') pgrst
_ ->
-- This is unreachable as the ApiRequest.hs rejects it before
-- TODO Refactor the Action/Target types to remove this line
throwError $ Error.ApiRequestError ApiRequestTypes.NotFound
where
roleSettings = fromMaybe mempty (HM.lookup authRole $ configRoleSettings conf)
roleIsoLvl = HM.findWithDefault SQL.ReadCommitted authRole $ configRoleIsoLvl conf
runQuery isoLvl funcSets mode query =
runDbHandler appState conf isoLvl mode authenticated prepared observer $ do
Query.setPgLocals conf authClaims authRole (HM.toList roleSettings) funcSets apiReq
Query.runPreReq conf
query
pgrstResponse :: ServerTiming -> Response.PgrstResponse -> Wai.Response
pgrstResponse timing (Response.PgrstResponse st hdrs bod) = Wai.responseLBS st (hdrs ++ ([serverTimingHeader timing | configServerTimingEnabled conf])) bod
withTiming = calcTiming $ configServerTimingEnabled conf
calcTiming :: Bool -> Handler IO a -> Handler IO (Maybe Double, a)
calcTiming timingEnabled f = if timingEnabled
then do
(t, r) <- timeItT f
pure (Just t, r)
else do
r <- f
pure (Nothing, r)
traceHeaderMiddleware :: AppConfig -> Wai.Middleware
traceHeaderMiddleware AppConfig{configServerTraceHeader} app req respond =
case configServerTraceHeader of
Nothing -> app req respond
Just hdr ->
let hdrVal = L.lookup hdr $ Wai.requestHeaders req in
app req (respond . Wai.mapResponseHeaders ([(hdr, fromMaybe mempty hdrVal)] ++))
addRetryHint :: Int -> Wai.Response -> Wai.Response
addRetryHint delay response = do
let h = ("Retry-After", BS.pack $ show delay)
Wai.mapResponseHeaders (\hs -> if isServiceUnavailable response then h:hs else hs) response
isServiceUnavailable :: Wai.Response -> Bool
isServiceUnavailable response = Wai.responseStatus response == HTTP.status503