-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
App.hs
975 lines (859 loc) · 45 KB
/
App.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE DataKinds #-}
module HasuraPro.App where
import Control.Exception (SomeException, try)
import Control.Lens (anyOf, ix, (^?))
import Control.Monad.Base
import Control.Monad.Catch (MonadCatch,
MonadMask,
MonadThrow,
onException)
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Data.IORef (IORef)
import Data.Int (Int64)
import Data.String (fromString)
import Data.Time.Clock (UTCTime)
import Data.Time.Clock.POSIX (POSIXTime,
getPOSIXTime)
import Hasura.EncJSON
import Hasura.Prelude
import Hasura.Server.Init
import HasuraPro.Authorization
import HasuraPro.Config
import HasuraPro.Context
import HasuraPro.Directives (directive, _VInt)
import HasuraPro.Firewall
import HasuraPro.LiveQuery (logLiveQueryMetrics)
import HasuraPro.Logging
import HasuraPro.LogSender
import HasuraPro.OAuth
import HasuraPro.Server.API.Config
import HasuraPro.Utils (cryptoHash,
pgConnInfoToLabel,
runRWTx, runTx)
import HasuraPro.Workers
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Concurrent.Extended as Conc
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TVar as StmTVar
import qualified Control.Immortal as Immortal
import qualified Data.Aeson as J
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as B8
import qualified Data.Environment as E
import qualified Data.HashMap.Strict as Map
import qualified Data.HashSet as Set
import qualified Data.IORef as IORef
import qualified Data.List.NonEmpty as NE
import qualified Data.TByteString as TBS
import qualified Data.Text as T
import qualified Data.Text.Conversions as T
import qualified Data.Time.Clock as Clock
import qualified Data.Time.Clock.POSIX as ClockPosix
import qualified Database.PG.Query as Q
import qualified Database.Redis as Redis
import qualified Hasura.App as HGE
import qualified Hasura.Backends.Postgres.SQL.Types as HGE
import qualified Hasura.Eventing.ScheduledTrigger as HGE
import qualified Hasura.GraphQL.Context as Context
import qualified Hasura.GraphQL.Execute as HGE
import qualified Hasura.GraphQL.Execute.Query as EQ
import qualified Hasura.GraphQL.Logging as HGE
import qualified Hasura.GraphQL.Parser.Column as Column
import qualified Hasura.GraphQL.Transport.HTTP as HGE
import qualified Hasura.GraphQL.Transport.WebSocket.Server as HGE
import qualified Hasura.Logging as L
import qualified Hasura.RQL.IR.RemoteJoin as RJ
import qualified Hasura.Metadata.Class as HGE
import qualified Hasura.RQL.Types as HGE
import qualified Hasura.Server.API.Query as HGE
import qualified Hasura.Server.App as HGE
import qualified Hasura.Server.Auth as HGE
import qualified Hasura.Server.Init as HGE
import qualified Hasura.Server.Logging as HGE
import qualified Hasura.Server.SchemaUpdate as HGE
import qualified Hasura.Server.Types as HGE
import qualified Hasura.Server.Version as HGE
import qualified Hasura.Session as HGE
import qualified Hasura.Tracing as Tracing
import qualified HasuraPro.Errors as ProErrors
import qualified Language.GraphQL.Draft.Parser as GQL
import qualified Language.GraphQL.Draft.Syntax as GQL
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTP
import qualified Network.HTTP.Types as HTTP
import qualified Network.Wai as Wai
import qualified System.Log.FastLogger as FL
import qualified System.Metrics as EKG
import qualified System.Metrics.Counter as Counter
import qualified System.Metrics.Json as EKG
import qualified System.Random as Rand
import qualified Text.Mustache.Compile as M
-- | Our App Monad in IO; currently uses an 'InMemoryStore' store for rate limiting state
newtype AppM a
= AppM { unAppM :: ReaderT (AppContext RedisStore) IO a }
deriving ( Functor, Applicative, Monad
, MonadIO
, MonadBase IO
, MonadBaseControl IO
, MonadReader (AppContext RedisStore)
, MonadCatch
, MonadThrow
, MonadMask
)
newtype RedisStore
= RedisStore { unRedisStore :: Maybe Redis.Connection }
-- TODO: MonadQueryInstrumentation was added in order to support
-- an integration between tracing and EXPLAIN ANALYZE in Pro.
-- However, that PR has been put on hold. We could remove this type
-- class entirely, or we could find a better way to implement it.
-- See https://github.com/hasura/graphql-engine-pro/pull/500/
instance EQ.MonadQueryInstrumentation AppM where
askInstrumentQuery _ = pure (id, EQ.noProfile)
-- | Return a function which will report common metrics to both the
-- process-level store (if one exists) and the tenant-level store for
-- the current project.
askCommonMetrics :: MonadReader (AppContext store) m => m ((CommonMetrics -> IO a) -> IO ())
askCommonMetrics = do
processMetrics <- asks _acProcessMetrics
tenantMetrics <- asks _acTenantMetrics
let withCommonMetrics :: (CommonMetrics -> IO a) -> IO ()
withCommonMetrics f = do
_ <- f (_tmCommon tenantMetrics)
traverse_ (f . _pmCommon) processMetrics
pure withCommonMetrics
-- | Make a key to store query results in Redis
mkRedisKey :: ProjectId -> HGE.QueryCacheKey -> B8.ByteString
mkRedisKey projectId cacheKey =
txtToBs (unProjectId projectId) <> ":" <> cryptoHash (J.toJSON cacheKey)
-- | Look for the cached directive in the query AST, and match its
-- ttl argument, if there is one, or use the default value.
--
-- Returns 'Nothing' only if the cached directive itself is missing,
-- i.e. if caching should not be used for this query.
extractTTL :: HGE.GQLReq HGE.GQLExecDoc -> Maybe Integer
extractTTL req =
case req ^? directive $$(GQL.litName "cached") of
Just args -> Just (fromMaybe 60 (args ^? (ix @(Map.HashMap _ _)) $$(GQL.litName "ttl") . _VInt))
_ -> Nothing
instance HGE.MonadExecuteQuery AppM where
cacheLookup query queryPlan cacheKey@(HGE.QueryCacheKey req _user) = do
connMay <- asks _acRedis
projectId <- asks (_pcProjectId . _acProCreds)
logger <- asks _acProLogger
withCommonMetrics <- askCommonMetrics
-- get the tenant's cache max size and max ttl
getPoliciesConfig <- asks _acPoliciesConfig
policyConfig <- liftIO getPoliciesConfig
let tenantCacheMaxTTL = _policyConfigCacheMaxTtl =<< policyConfig
tenantCacheMaxSize = _policyConfigCacheSizeLimit =<< policyConfig
-- We disallow caching for remote joins which involve forwarded
-- client headers. Otherwise, if the remote schema uses those for
-- authorization, we might leak client data across requests.
--
-- The use of any here is a shorthand for folding over multiple containers
-- to get to the RemoteSchemaInfo structures inside, but we don't need
-- the full power of lens here.
hasRemoteJoinsForwardingHeaders =
(any . any . any . any . any . any . any)
(HGE.rsFwdClientHeaders . RJ._rjRemoteSchema)
queryPlan
hasSessionVars :: [Context.QueryRootField (Column.UnpreparedValue 'HGE.Postgres)] -> Bool
hasSessionVars = anyOf (traverse . EQ.traverseQueryRootField) \case
Column.UVSession -> True
Column.UVSessionVar{} -> True
_ -> False
runQuery
:: Maybe Redis.Connection
-> Maybe Integer
-> Tracing.TraceT (ExceptT HGE.QErr AppM) (HTTP.ResponseHeaders, Maybe EncJSON)
runQuery (Just conn) (Just ttlDirective) = do
when (hasSessionVars query) do
HGE.throw400 HGE.NotSupported "This query depends on session variables and therefore cannot be cached"
when (tenantCacheMaxTTL == Just 0 || tenantCacheMaxSize == Just 0) do
HGE.throw400 HGE.NotSupported "Caching is not enabled"
let maxTTL = fromMaybe defaultPolicyCacheMaxTTL tenantCacheMaxTTL
when (ttlDirective > fromIntegral maxTTL) do
HGE.throw400 HGE.NotSupported . fromString $ "The maximum allowed TTL is " <> show @Int maxTTL <> " seconds"
when hasRemoteJoinsForwardingHeaders do
HGE.throw400 HGE.NotSupported "Remote joins which forward client headers cannot currently be cached"
-- The key we store in Redis is a combination of the project ID and
-- the query hash.
let key = mkRedisKey projectId cacheKey
liftIO $ withCommonMetrics (Counter.inc . _cmCacheAttempts)
getResp <- Tracing.trace "cache check" . runRedis logger conn $
liftM2 (,) <$> Redis.get key <*> Redis.ttl key
case getResp of
Right (Just res, ttl) -> do
liftIO do
withCommonMetrics (Counter.inc . _cmCacheHits)
L.unLogger logger $ QueryCacheLog
{ _qclLifetime = ttl
, _qclQueryHash = bsToTxt key
, _qclStoredBytes = 0
, _qclHit = True
}
pure ([("Cache-Control", fromString (show ttl))], Just (encJFromBS res))
_ -> pure ([], Nothing)
runQuery Nothing (Just _ttlDirective) =
HGE.throw400 HGE.NotSupported "Caching is not configured"
runQuery _ _ = pure ([], Nothing)
runQuery connMay (extractTTL req)
cacheStore cacheKey@(HGE.QueryCacheKey req _user) res = do
connMay <- asks _acRedis
projectId <- asks (_pcProjectId . _acProCreds)
logger <- asks _acProLogger
withCommonMetrics <- askCommonMetrics
-- get the tenant's cache max size and max ttl
getPoliciesConfig <- asks _acPoliciesConfig
policyConfig <- liftIO getPoliciesConfig
let tenantCacheLimit = _policyConfigCacheSizeLimit =<< policyConfig
key = mkRedisKey projectId cacheKey
-- We need to cap the size of entries stored in the cache
-- Eventually, we might want to enable a better form of rate and size
-- limiting here, but for now, we can use this per-row limit of 100KB,
-- and use the emitted metrics to determine how well this is working.
let response = encJToBS res
responseLength = BS.length response
responseLimit = fromMaybe defaultPolicyCacheSizeLimit tenantCacheLimit
for_ connMay $ \conn -> do
for_ (extractTTL req) $ \ttlDirective -> do
unless (responseLength > responseLimit) $ do
void . Tracing.trace "cache update" . runRedis logger conn $
Redis.setex key ttlDirective response
liftIO do
withCommonMetrics (flip Counter.add (fromIntegral responseLength) . _cmCacheTotalStoredBytes)
L.unLogger logger $ QueryCacheLog
{ _qclLifetime = ttlDirective
, _qclQueryHash = bsToTxt key
, _qclStoredBytes = responseLength
, _qclHit = False
}
runRedis
:: (MonadIO m, MonadError HGE.QErr m)
=> L.Logger HasuraPro -> Redis.Connection -> Redis.Redis a -> m a
runRedis logger conn tx =
either throwErr return =<< (liftIO $ try $ Redis.runRedis conn tx)
where
throwErr e = do
liftIO $ L.unLogger logger $ RedisErrorLog $
HGE.err500 HGE.Unexpected $ T.pack (show @SomeException e)
HGE.throw500 "unexpected internal error: redis exception"
instance HGE.UserAuthentication (Tracing.TraceT AppM) where
resolveUserInfo logger manager headers authMode = do
proCreds <- asks _acProCreds
jwkSetRef <- asks _acJwkSet
jwkSet <- liftIO $ IORef.readIORef jwkSetRef
proLogger <- asks _acProLogger
exOpts <- asks _acProExtraOpts
runExceptT $ resolveUserInfo proCreds exOpts jwkSet logger proLogger manager headers authMode
instance HGE.HttpLog AppM where
logHttpError logger userInfoM reqId waiReq req qErr headers = do
proLogger <- asks _acProLogger
L.unLogger logger $ mkProHttpLog req $
HGE.mkHttpErrorLogContext userInfoM reqId waiReq req qErr Nothing Nothing headers
-- log the http response log
logHttpResponse proLogger reqId (J.encode qErr)
logHttpSuccess logger userInfoM reqId waiReq req@(_,parsedReq) response compressedResponse qTime cType headers = do
proCustomLogger <- asks $ snd ._acCustomLoggers
-- log the http success log
L.unLogger logger $ mkProHttpLog req $ withReqBody $
HGE.mkHttpAccessLogContext userInfoM reqId waiReq compressedResponse qTime cType headers
-- log the http response log
shouldSendToLux <- _ppcAnalyzeResponseBody <$> getProProjectConfig
logHttpResponseCustom proCustomLogger reqId response shouldSendToLux
where
path = Wai.pathInfo waiReq
-- Log the request body when it is a metadata query
withReqBody hlc
| path == ["v1", "query"] = setReqBody parsedReq hlc
| otherwise = hlc
setReqBody q hlc =
let oper = HGE.hlcOperation hlc
oper' = oper {HGE.olQuery = q}
in hlc { HGE.hlcOperation = oper' }
instance Tracing.HasReporter AppM where
askReporter = do
logger <- asks _acProLogger
pure $ Tracing.Reporter $ \ctx name ma -> do
t1 <- liftIO getPOSIXTime
(a, meta) <- ma
t2 <- liftIO getPOSIXTime
a <$ reportTraceSpan logger ctx name meta t1 t2
reportTraceSpan
:: MonadIO m
=> L.Logger HasuraPro
-> Tracing.TraceContext
-> Text
-> [(Text, Text)]
-> POSIXTime
-> POSIXTime
-> m ()
reportTraceSpan logger Tracing.TraceContext{..} name meta t1 t2 = liftIO $ do
L.unLogger logger . TracingLog $ TracingLogSpan
{ _tlsTraceId = tcCurrentTrace
, _tlsSpanId = tcCurrentSpan
, _tlsParentId = tcCurrentParent
, _tlsName = name
, _tlsStart = (truncate (t1 * 1e9) :: Integer)
, _tlsDuration = (truncate ((t2 - t1) * 1e9) :: Integer)
, _tlsMeta = Map.fromList meta
}
instance HGE.MonadQueryLog AppM where
logQueryLog _ gqlReq genSqlM reqId = do
ossCustomLogger <- asks $ fst ._acCustomLoggers
shouldCaptureVariables <- _ppcAnalyzeQueryVariables <$> getProProjectConfig
let luxLog = LuxLog $ bool withoutVariablesLog queryLog shouldCaptureVariables
(unCustomLogger ossCustomLogger) (Just luxLog) queryLog
where
queryLog = HGE.QueryLog gqlReq genSqlM reqId
withoutVariablesLog = HGE.QueryLog noVariablesGqlReq noArgsGenSqlM reqId
noVariablesGqlReq = gqlReq { HGE._grVariables = Nothing }
noArgsGenSqlM = fmap (fmap removeArgs) genSqlM
removeArgs :: HGE.PreparedSql -> HGE.PreparedSql
removeArgs (HGE.PreparedSql query _ _) = HGE.PreparedSql query mempty Nothing
instance HGE.MonadWSLog AppM where
logWSLog logger wsLog@(HGE.WSLog _ _ Nothing) = L.unLogger logger wsLog
logWSLog _ wsLog@(HGE.WSLog wsId ev (Just metadata)) = do
ossCustomLogger <- asks $ fst . _acCustomLoggers
shouldCaptureResponseBody <- _ppcAnalyzeResponseBody <$> getProProjectConfig
let luxLog = LuxLog $ bool wsNoResponseLog wsLog shouldCaptureResponseBody
(unCustomLogger ossCustomLogger) (Just luxLog) wsLog
where
-- FIXME: evType is not there anymore?
HGE.WSEventInfo evType opId _ _ = metadata
wsNoResponseLog = case ev of
HGE.EMessageSent (HGE.MessageDetails _ size) -> bool wsLog (withoutResponseLog size) isDataEvent
_ -> wsLog
isDataEvent = evType == Just HGE.SMT_GQL_DATA
withoutResponseLog size =
let event = HGE.EMessageSent $ HGE.MessageDetails modifiedMessage size
in HGE.WSLog wsId event (Just metadata)
-- FIXME: evType is not there anymore?
modifiedMessage = TBS.fromBS $ encJToBS $ encJFromAssocList
[ ("type", encJFromJValue evType)
, ("id", encJFromJValue opId)
]
getProProjectConfig :: AppM ProProjectConfig
getProProjectConfig = do
projectConfigTVar <- asks $ _ccProjectConfigCtx . _acProConfig
liftIO $ STM.atomically $ StmTVar.readTVar projectConfigTVar
instance HGE.MetadataApiAuthorization AppM where
authorizeMetadataApi query userInfo = do
manager <- HGE.scManager <$> asks HGE.hcServerCtx
proLogger <- lift $ lift $ asks _acProLogger
jwkSetRef <- lift $ lift $ asks _acJwkSet
jwkSet <- liftIO $ IORef.readIORef jwkSetRef
proCreds <- lift $ lift $ asks _acProCreds
exOpts <- lift $ lift $ asks _acProExtraOpts
metadataAuthzMiddleware proLogger manager proCreds exOpts jwkSet query
let currRole = HGE._uiRole userInfo
when (HGE.requiresAdmin query && currRole /= HGE.adminRoleName) $
HGE.withPathK "args" $ HGE.throw400 HGE.AccessDenied errMsg
where
errMsg = "restricted access : admin only"
instance HGE.MonadGQLExecutionCheck AppM where
checkGQLExecution userInfo reqHeaders enableAL sc req = runExceptT $ do
firewallCtx <- asks $ _ccFirewallCtx . _acProConfig
getPoliciesConfig <- asks _acPoliciesConfig
-- TODO: check if store has a redis connection then only proceed with apply
-- rate limits. otherwise ignore or throw errors?
-- apply rate and tenant limits only when redis connection is given
when (isJust $ unRedisStore $ _fwcStore firewallCtx) $ do
-- we want to apply tenant limits on all roles including admin, for all GQL
-- requests
requirePermit "tenant-limit-exceeded" $ applyTenantLimits firewallCtx getPoliciesConfig
requirePermit "rate-limit-exceeded" $ rateLimit firewallCtx reqHeaders userInfo
proLogger <- asks _acProLogger
parsedReq <- parseGQLReq
HGE.checkQueryInAllowlist enableAL userInfo parsedReq sc
limitOperationDepth firewallCtx proLogger userInfo parsedReq
return parsedReq
where
requirePermit code action = action `catchError` logErr >>= \case
RPDenied e -> HGE.throw400 (HGE.CustomCode code) e
RPGranted -> pure ()
logErr err = do
logger <- asks _acProLogger
L.unLogger logger $ RedisErrorLog err
pure RPGranted
parseGQLReq = case GQL.parseExecutableDoc gqlText of
Left _ -> HGE.withPathK "query" $ HGE.throw400 HGE.BadRequest "not a valid GraphQL query"
Right a -> return $ req { HGE._grQuery = HGE.GQLExecDoc $ GQL.getExecutableDefinitions a }
gqlText = HGE._unGQLQueryText $ HGE._grQuery req
instance HGE.MonadConfigApiHandler AppM where
runConfigApiHandler = configApiHandler
instance HGE.ConsoleRenderer AppM where
renderConsole path authMode enableTelemetry consoleAssetsDir = do
consoleId <- _pcConsoleId <$> asks _acProCreds
authIssuer <- _leAuthIssuer <$> asks _acProExtraOpts
return $ HGE.renderHtmlTemplate proConsoleTemplate $
-- variables required to render the template
J.object [ "isAdminSecretSet" J..= HGE.isAdminSecretSet authMode
, "consolePath" J..= consolePath path
, "enableTelemetry" J..= HGE.boolToText enableTelemetry
, "cdnAssets" J..= HGE.boolToText (isNothing consoleAssetsDir)
, "assetsVersion" J..= ("channel/versioned/" <> T.toText HGE.currentVersion)
, "serverVersion" J..= HGE.currentVersion
, "consoleId" J..= consoleId
, "hasuraOAuthUrl" J..= authIssuer
]
where
proConsoleTemplate = $(M.embedSingleTemplate "assets/console.html")
consolePath = \case
"" -> "/console"
r -> "/console/" <> r
-- | Use Redis sorted sets (https://redis.io/commands#sorted_set) to implement a
-- map of timestamps for each client (or key) to rate limit. Then we can use
-- Redis primitives to query for a range of timestamps.
instance RateLimitStore (ExceptT HGE.QErr AppM) RedisStore where
type RLSConnection RedisStore = RedisStore
incrementRate (RedisStore Nothing) _ _ _ =
HGE.throw500 "Redis is not available for rate limiting"
incrementRate (RedisStore (Just conn)) key currTime slidingWindow = do
logger <- asks _acProLogger
proCreds <- asks _acProCreds
res <- runRedis logger conn $
incrementRateInRedis key currTime slidingWindow (_pcProjectId proCreds)
either HGE.throw500 return res
-- | We use a sorted set to store each timestamp of a given client. We store
-- the timestamp as the key as well as the score of the member. Storing the
-- timestamp as the score let's us query Redis directly with the range
-- (ZRANGEBYSCORE) of two timestamps for our sliding window
incrementRateInRedis
:: J.ToJSON a
=> a -> UTCTime -> Double -> ProjectId -> Redis.Redis (Either Text Int)
incrementRateInRedis key currTime slidingWindow (ProjectId projectId) = do
let currTimestamp = realToFrac $ ClockPosix.utcTimeToPOSIXSeconds currTime
prevTimestamp = currTimestamp - slidingWindow
redisKey = makeRedisRateLimitKey
currTimestampBS = B8.pack $ show currTimestamp
res <- Redis.multiExec $ do
-- TODO: should we do this on every call to redis?
-- remove all keys older than the previous timestamp
Redis.zremrangebyscore redisKey 0 prevTimestamp
-- add an entry for the current timestamp
void $ Redis.zadd redisKey [(currTimestamp, currTimestampBS)]
-- get all keys in range of min: prevTimestamp, max:currTimestamp
Redis.zrangebyscore redisKey prevTimestamp currTimestamp
case res of
Redis.TxSuccess r -> return (Right $ length r)
Redis.TxAborted -> return $ Left "Unexpected: Redis aborted transaction"
Redis.TxError err -> return $ Left $ "Redis transaction error: " <> T.pack err
where
makeRedisRateLimitKey =
let projId = txtToBs projectId
rateLimitKeyHash = cryptoHash key
in B8.intercalate ":" ["ratelimit-set", projId, rateLimitKeyHash]
runInSeparateTx :: Q.TxE HGE.QErr a -> HGE.MetadataStorageT AppM a
runInSeparateTx tx = do
pool <- lift $ asks _acMetadataStoragePool
liftEitherM $ liftIO $ runExceptT $ Q.runTx pool (Q.RepeatableRead, Nothing) tx
-- | Each of the function in the type class is executed in a totally separate transaction.
instance HGE.MonadMetadataStorage (HGE.MetadataStorageT AppM) where
-- NOTE: Re-using the OSS implementations for scheduled triggers eventing storage system.
-- Technically, as of now, metadata separation in an isolated database isn't implemented in OSS.
-- TODO: Change the following to pro/cloud specific implementation in subsequent incremental PRs
-- which introduces the schema migration for metadata separation.
getDeprivedCronTriggerStats = runInSeparateTx HGE.getDeprivedCronTriggerStatsTx
getScheduledEventsForDelivery = runInSeparateTx HGE.getScheduledEventsForDeliveryTx
insertScheduledEvent = runInSeparateTx . HGE.insertScheduledEventTx
insertScheduledEventInvocation a b = runInSeparateTx $ HGE.insertInvocationTx a b
setScheduledEventOp a b c = runInSeparateTx $ HGE.setScheduledEventOpTx a b c
unlockScheduledEvents a b = runInSeparateTx $ HGE.unlockScheduledEventsTx a b
unlockAllLockedScheduledEvents = runInSeparateTx HGE.unlockAllLockedScheduledEventsTx
-- | creates the 'HGE.PGExecCtx' type from given master and replica pools
mkProPGExecCtx :: L.Logger HasuraPro -> (Q.PGPool, NE.NonEmpty Q.PGPool) -> Q.TxIsolation -> HGE.PGExecCtx
mkProPGExecCtx logger (masterPool, rrPools) masterIso =
HGE.PGExecCtx
{ _pecRunReadOnly = readOnlyTx
, _pecRunReadNoTx = readNoTx
, _pecRunReadWrite = (Q.runTx masterPool (masterIso, Just Q.ReadWrite))
, _pecCheckHealth = checkHealth
}
where
readOnlyTx q = choosePool >>= \pool -> Q.runTx pool (Q.RepeatableRead, Just Q.ReadOnly) q
readNoTx q = choosePool >>= \pool -> (Q.runTx' pool q)
choosePool :: MonadIO m => m Q.PGPool
choosePool = liftIO $ randElem rrPools
randElem :: MonadIO m => NE.NonEmpty a -> m a
randElem arr = liftIO $ (arr NE.!!) <$> Rand.randomRIO (0, NE.length arr - 1)
-- function to check if we can establish connection to a given (master or replica) pool
checkHealth :: IO Bool
checkHealth =
checkMasterHealth >>=
bool
-- if master is unhealthy, return false
(pure False)
-- if master is healthy, check the replicas
(and <$> checkReplicasHealth)
checkMasterHealth = checkDbConnection masterPool
checkReplicasHealth = mapM checkDbConnection $ NE.toList rrPools
-- check given pool health; print a 'DbHealthError' log when unhealthy
checkDbConnection pool = do
res <- runExceptT $ Q.runTx' pool select1Query
case res of
Left e -> L.unLogger logger (DbHealthError e) >> pure False
Right _ -> pure True
where
select1Query :: Q.TxE HGE.QErr Int
select1Query =
runIdentity . Q.getRow <$>
Q.withQE HGE.defaultTxErrorHandler [Q.sql| SELECT 1 |] () False
data ProInitCtx
= ProInitCtx
{ _picCoreServeCtx :: !HGE.ServeCtx
, _picReplicasPool :: ![(Text, Q.PGPool)]
-- ^ a list of replica pool with a label. this is used in storing metrics in EKG.
, _picLogger :: !(L.Logger HasuraPro)
, _picCustomLoggers :: !CustomLoggers
, _picPgExecCtx :: !HGE.PGExecCtx
, _picInitTime :: !UTCTime
, _picDBId :: !DBId
, _picLuxLogsHandler :: !LuxLogger
, _picProConnInfo :: !ProConnInfo
}
-- helper function to initialise the pro catalog
initialiseProCatalog :: MonadIO m => Q.PGPool -> m ()
initialiseProCatalog pool = do
proInitRes <- runTx pool initProDb
either (ProErrors.printErrJExit ProErrors.DatabaseConnectionInitializationError) return proInitRes
where
initProDb :: Q.TxE HGE.QErr ()
initProDb = do
doesSchemaExist (HGE.SchemaName "hdb_pro_catalog") >>= \schemaExist -> do
when (not schemaExist) $ HGE.liftTx $ Q.catchE HGE.defaultTxErrorHandler $
Q.unitQ "CREATE SCHEMA IF NOT EXISTS hdb_pro_catalog" () False
Q.multiQE HGE.defaultTxErrorHandler $(Q.sqlFromFile "res/initialise.sql")
doesSchemaExist schemaName =
HGE.liftTx $ (runIdentity . Q.getRow) <$> Q.withQE HGE.defaultTxErrorHandler [Q.sql|
SELECT EXISTS
( SELECT 1 FROM information_schema.schemata
WHERE schema_name = $1
) |] (Identity schemaName) False
-- | a separate function to create the initialization context because some of
-- these contexts might be used by external functions
hgeProInitialiseCtx
:: ( HGE.HasVersion, MonadIO m, MonadCatch m)
=> Maybe TenantId
-> E.Environment
-> ProHGECommand
-> ProRawConnInfo
-> IORef (Either OAuthErrors OAuth2Token)
-> LuxEndpoints
-> LuxLoggerOpts
-> Maybe HGE.ShutdownLatch
-> Maybe EnableTenantLogging
-> m ProInitCtx
hgeProInitialiseCtx tenantId env hgeCmd ProRawConnInfo{..} credsRef
exOpts luxLogsHandlerOpts upstreamShutdownLatch enableTenantLogs = do
initTime <- liftIO Clock.getCurrentTime
-- global http manager
httpManager <- liftIO $ HTTP.newManager HTTP.tlsManagerSettings
instanceId <- liftIO generateInstanceId
latch <- maybe (liftIO HGE.newShutdownLatch) return upstreamShutdownLatch
-- resolve the external connection credentials
pci@ProConnInfo{..} <- mkProConnInfo
let mkProAndOssLoggers enabledLogTypes logLevel = flip runReaderT
(instanceId, credsRef, httpManager, exOpts) $ do
r <- ask
let luxLogHook luxLogger logStr = void $ flip runReaderT r $
runExceptT $ pushLuxLogStrLn luxLogger logStr
mkLoggers tenantId enabledLogTypes logLevel luxLogHook
( loggers, proLogger, customLoggers, luxLogsHandler@LuxLogger{..}, primaryPool, replicaPools, sqlGenCtx, pgCtx ) <- case hgeCmd of
HCServe (ProServeOptions ServeOptions{..} ProConnParams{..} _) -> do
(ls, proLogger, customLoggers, luxLogsHandler) <- mkProAndOssLoggers soEnabledLogTypes soLogLevel
let sqlGenCtx = HGE.SQLGenCtx soStringifyNum
-- log postgres connection info
L.unLogger (HGE._lsLogger ls) $ proConnInfoToLog pci
-- init primary pool
pool <- liftIO $ Q.initPGPool _pciMasterConn _pcpConnParamsMaster (HGE._lsPgLogger ls)
-- init read replicas pool
readReplicaPools <- liftIO $
for (getReadReplicaParam _pciReadReplicaConns) $ \connInfo ->
(pgConnInfoToLabel connInfo,)
<$> Q.initPGPool connInfo (getReadReplicaParam _pcpConnParamsReadReplica) (HGE._lsPgLogger ls)
-- if there are no read replicas, then don't create any pro specific 'HGE.PGExecCtx'
case readReplicaPools of
[] -> do
let pgCtx = HGE.mkPGExecCtx soTxIso pool
return (ls, proLogger, customLoggers, luxLogsHandler, pool, [], sqlGenCtx, pgCtx)
_ -> do
let pools = (pool, NE.fromList (snd <$> readReplicaPools))
pgCtx = mkProPGExecCtx proLogger pools soTxIso
-- test the read replica connections
healthOk <- liftIO $ HGE._pecCheckHealth pgCtx
when (not healthOk) $
ProErrors.printErrJExit ProErrors.UnhealthyDbConnectionError (J.String "FATAL ERROR: unhealthy master or read replica")
return (ls, proLogger, customLoggers, luxLogsHandler, pool, readReplicaPools, sqlGenCtx, pgCtx)
_ -> do
let sqlGenCtx = HGE.SQLGenCtx False
(l@(HGE.Loggers _ _ pgLogger), proLogger, customLoggers, luxLogsHandler) <-
mkProAndOssLoggers L.defaultEnabledLogTypes L.LevelInfo
pool <- getMinimalPool pgLogger _pciMasterConn
let pgCtx = HGE.mkPGExecCtx Q.Serializable pool
return (l, proLogger, customLoggers, luxLogsHandler, pool, [], sqlGenCtx, pgCtx)
let logger = HGE._lsLogger loggers
-- init core @hdb_catalog@
(rebuildableSchemaCache, cacheInitStartTime) <- flip onException (HGE.flushLogger $ HGE._lsLoggerCtx loggers) $
HGE.migrateCatalogSchema env logger primaryPool httpManager sqlGenCtx
-- safe init Pro catalog
initialiseProCatalog primaryPool
-- Start a background thread for listening schema sync events from other server instances,
(schemaSyncListenerThread, schemaSyncEventRef) <- HGE.startSchemaSyncListenerThread primaryPool logger instanceId
let schemaSyncCtx = HGE.SchemaSyncCtx schemaSyncListenerThread schemaSyncEventRef cacheInitStartTime
coreServeCtx = HGE.ServeCtx httpManager instanceId loggers _pciMasterConn primaryPool latch rebuildableSchemaCache schemaSyncCtx
dbId <- either (ProErrors.printErrJExit ProErrors.UnexpectedHasuraProError) (return . DBId)
=<< liftIO (runTx primaryPool getDbId)
-- Set database id which is going to be used by log pusher to Lux
liftIO $ IORef.writeIORef _llDbId $ Just $ dbId
return $ ProInitCtx coreServeCtx replicaPools proLogger customLoggers pgCtx initTime dbId luxLogsHandler pci
where
-- create the connection parameters for master and replica pools, and redis pool
mkProConnInfo = ProConnInfo
<$> procConnInfo _prciMaster
<*> traverse (mapM procConnInfo) _prciReadReplicas
<*> traverse redisConnInfo _prciRedis
<*> traverse redisConnInfo _prciRateLimitRedis
-- helper function to print the connection params of master and the replicas
proConnInfoToLog ProConnInfo{..} =
let proInfo = J.object
[ "master" J..= getInfo _pciMasterConn
, "read_replicas" J..= map getInfo (getReadReplicaParam _pciReadReplicaConns)
]
getInfo = HGE.slInfo . connInfoToLog
in HGE.StartupLog L.LevelInfo "postgres_connection" proInfo
procConnInfo rawConnInfo =
either (ProErrors.printErrExit ProErrors.DatabaseConnectionParameterError . connInfoErrModifier "Postgres Connection") return
$ mkConnInfo rawConnInfo
redisConnInfo =
either (ProErrors.printErrExit ProErrors.RedisConnectionParameterError . connInfoErrModifier "Redis Connection") return
. Redis.parseConnectInfo . T.unpack
connInfoErrModifier prefix s = "Fatal Error: " ++ prefix ++ ": " ++ s
getMinimalPool pgLogger ci = do
let connParams = Q.defaultConnParams { Q.cpConns = 1 }
liftIO $ Q.initPGPool ci connParams pgLogger
mkLoggers
:: (MonadIO m, MonadReader r m, HasLuxLoggerVars r)
=> Maybe TenantId
-> Set.HashSet (L.EngineLogType HasuraPro)
-> L.LogLevel
-> (LuxLogger -> FL.LogStr -> IO ())
-> m (HGE.Loggers, L.Logger HasuraPro, CustomLoggers, LuxLogger)
mkLoggers tenandId' enabledLogs logLevel luxLogsHook = do
loggerCtx <- liftIO $ L.mkLoggerCtx (L.defaultLoggerSettings True logLevel) enabledLogs
luxLogger <- createLuxLogger luxLogsHandlerOpts loggerCtx
let ossLogs = proEnabledLogsToOssEnabledLogs enabledLogs
ossLoggerCtx <- liftIO $ L.mkLoggerCtx (L.defaultLoggerSettings True logLevel) ossLogs
let proCustomLogger = mkCustomLogger tenandId' loggerCtx luxLogger luxLogsHook proStdoutLogsFilter'
ossCustomLogger = mkCustomLogger tenandId' ossLoggerCtx luxLogger luxLogsHook ossStdoutLogsFilter'
ossLogger = toDefaultLogger ossCustomLogger
proLogger = toDefaultLogger proCustomLogger
pgLogger = HGE.mkPGLogger ossLogger
return (HGE.Loggers ossLoggerCtx ossLogger pgLogger, proLogger, (ossCustomLogger, proCustomLogger), luxLogger)
-- For now we have to filter only the Poller logs
ossStdoutLogsFilter logType = logType /= L.ELTLivequeryPollerLog
proStdoutLogsFilter logType = logType /= HPLOss L.ELTLivequeryPollerLog
-- Send the Stdout filter only when `enableTenantLog` is True
(ossStdoutLogsFilter', proStdoutLogsFilter') =
( enableTenantLogs $> ossStdoutLogsFilter
, enableTenantLogs $> proStdoutLogsFilter
)
type StdoutLogsFilter impl = L.EngineLogType impl -> Bool
-- | This logger allows us to send a different push log when compared to stdout log.
-- | This way we can customise the log that is send to Lux (For example not sending
-- | sensitive info to Lux like query variables and response body). The log hook is
-- | used to process logs to be pushed to Lux
mkCustomLogger
:: J.ToJSON (L.EngineLogType impl)
=> Maybe TenantId
-> L.LoggerCtx impl
-> LuxLogger
-> (LuxLogger -> FL.LogStr -> IO ())
-> Maybe (StdoutLogsFilter impl)
-> CustomLogger impl
mkCustomLogger tenantId (L.LoggerCtx loggerSet serverLogLevel timeGetter enabledLogTypes)
luxLogger luxLogHook stdoutLogsFilter = CustomLogger $ \luxLogM engineLog -> do
localTime <- liftIO timeGetter
let toLogLine el =
let (logLevel, logTy, logDet) = L.toEngineLog el
in ProCustomEngineLog tenantId $ L.EngineLog localTime logLevel logTy logDet
toLuxLogStr (LuxLog el) = FL.toLogStr $ J.encode $ toLogLine el
-- send all logs (log-types and levels) to the hook function
liftIO $ onJust luxLogM $ luxLogHook luxLogger . toLuxLogStr
let stdoutLogLine@(ProCustomEngineLog _ (L.EngineLog _ logLevel logTy _))
= toLogLine engineLog
let stdoutLogsFilter' = fromMaybe False (stdoutLogsFilter >>= \stdLogfilter -> Just $ stdLogfilter logTy)
when (logLevel >= serverLogLevel && L.isLogTypeEnabled enabledLogTypes logTy
&& stdoutLogsFilter') $
liftIO $ FL.pushLogStrLn loggerSet $ FL.toLogStr $ J.encode stdoutLogLine
proEnabledLogsToOssEnabledLogs
:: Set.HashSet (L.EngineLogType HasuraPro)
-> Set.HashSet (L.EngineLogType L.Hasura)
proEnabledLogsToOssEnabledLogs = Set.fromList . mapMaybe getosslog . Set.toList
where
getosslog = \case
HPLPro _ -> Nothing
HPLOss x -> Just x
proEngineToOssEngineServeOpts :: ProServeOptions -> ServeOptions L.Hasura
proEngineToOssEngineServeOpts ProServeOptions{..} =
let ossLogs = proEnabledLogsToOssEnabledLogs $ soEnabledLogTypes _psoHGEServeOptions
in _psoHGEServeOptions { soEnabledLogTypes = ossLogs }
data EnableTenantLogging = EnableTenantLogging
-- | Initialize some context, create background workers, create 'AppContext' and run
-- 'HGE.runHGEServer'
runProServer
:: HGE.HasVersion
=> Maybe TenantId
-> E.Environment
-> ProHGECommand
-> ProRawConnInfo
-> ProServeOptions
-> ProCredentials
-> LuxEndpoints
-> LuxLoggerOpts
-> Maybe HGE.ShutdownLatch
-> Maybe ProcessMetrics
-> Maybe TenantMetricOpts
-> Maybe EnableTenantLogging
-- ^ we disable sending tenant Logs to stdout by default.
-- This is done because, each tenant instance already sends it's logs to Lux. And since stdout is also
-- captured in Lux, enabling tenant logs in stdout is redundant.
-> IO (Maybe PoliciesConfig)
-- ^ the getter for 'PoliciesConfig'
-> IO ()
runProServer tenantId env command prci ProServeOptions{..} proCreds exOpts
luxLogsHandlerOpts upstreamShutdownLatch processMetrics
tenantMetricOpts enableTenantLogs getPoliciesConfig = do
-- setup the 'AppContext' with empty global shared state
credsRef <- liftIO $ IORef.newIORef (Left OAEServerStart)
jwkSetRef <- liftIO $ IORef.newIORef (Left JSEServerStart)
-- initialize the context for HGE
ProInitCtx coreServeCtx replicaPools proLogger customLoggers pgCtx initTime _dbId LuxLogger{..} ProConnInfo{..} <-
hgeProInitialiseCtx tenantId env command prci credsRef exOpts
luxLogsHandlerOpts upstreamShutdownLatch enableTenantLogs
let (HGE.ServeCtx httpManager instanceId loggers _ primaryPool _latch _ _) = coreServeCtx
let (HGE.Loggers _ logger _) = loggers
L.unLogger proLogger $ StartupLog "pro_info" "starting Hasura GraphQL Engine Pro edition"
-- create a separate metrics store for tenant-level metrics
tenantEkgStore <- EKG.newStore
tenantMetrics <- makeTenantMetrics tenantEkgStore
sendTenantMetrics <- tenantMetricOpts & traverse \opts ->
Conc.forkImmortal "sendTenantMetrics" logger $ forever $ liftIO $ do
sample <- EKG.sampleAll tenantEkgStore
L.unLogger proLogger $ TenantMetricsLog (fmap EKG.valueToJson sample)
Conc.sleep (fromIntegral (_tmoInterval opts))
liftIO $ EKG.registerGauge "postgres.primary.pool.total_connections" (getPgConnections primaryPool) tenantEkgStore
-- TODO: use a distribution here instead?
for replicaPools $ \(label, pool) ->
EKG.registerGauge ("postgres.replicas." <> label <> ".pool.total_connections") (getPgConnections pool) tenantEkgStore
-- start a background thread to get and refresh access tokens
L.unLogger proLogger $ StartupLog "account_auth" "starting background loop to update credentials"
tokenRefreshThread <- oauthTokenUpdateWorker logger proLogger httpManager proCreds credsRef exOpts
-- Start a background thread to get the JWK from auth servers
L.unLogger proLogger $ StartupLog "account_auth" "fecthing JWK from auth servers"
jwkAsyncAction <- LA.async $ void $ fetchJWKSetThread jwkSetRef proLogger httpManager exOpts
L.unLogger proLogger $ StartupLog "redis" "initializing redis connection"
redisConnection <- try (traverse Redis.checkedConnect _pciRedis)
>>= either (redisError "Redis Error") return
-- we assume a separate redis connection to be given for rate limiting. if
-- this is not provided, rate limit is not enabled.
L.unLogger proLogger $ StartupLog "redis" "initializing rate-limit redis connection"
rateLimitRedisConn <- try (traverse Redis.checkedConnect _pciRateLimitRedis)
>>= either (redisError "Rate Limit Redis Error") return
-- create the context
(proCtx, proCtxWorkers) <-
liftIO $ mkProCtx instanceId pgCtx rateLimitRedisConn proLogger logger httpManager proCreds credsRef exOpts
let appCtx = AppContext
{ _acProCreds = proCreds
, _acJwkSet = jwkSetRef
, _acProLogger = proLogger
, _acCustomLoggers = customLoggers
, _acOauthCredsRef = credsRef
, _acProConfig = proCtx
, _acPoliciesConfig = getPoliciesConfig
, _acRedis = redisConnection
, _acProExtraOpts = exOpts
, _acProcessMetrics = processMetrics
, _acTenantMetrics = tenantMetrics
, _acMetadataStoragePool = primaryPool
}
-- function to cleanup all background threads created by pro-server
let immortalThreads = proCtxWorkers <> _llSendPendingLogsThreads <> maybeToList sendTenantMetrics <>
[tokenRefreshThread, _llPeriodicFlushLogsThread, _llFlushOperLogsThread]
let shutdownApp = do
L.unLogger logger $ HGE.mkGenericStrLog L.LevelInfo "pro-server" "killing all background threads"
mapM_ Immortal.stop immortalThreads
LA.cancel jwkAsyncAction
-- start the server with 'AppContext'
flip runReaderT appCtx . unAppM $
-- TODO: Phil: How do we propagate the shutdown action here if the latch is removed?
HGE.runHGEServer env _psoHGEServeOptions coreServeCtx (Just pgCtx) initTime shutdownApp (Just $ logLiveQueryMetrics proLogger) tenantEkgStore
where
getPgConnections :: Q.PGPool -> IO Int64
getPgConnections = pure . fromIntegral <=< Q.getInUseConnections
redisError pfx = (ProErrors.printErrExit ProErrors.RedisConnectionError
. ((pfx ++ ": ") ++)
. show @SomeException)
-- TODO
-- sendLogOptions = SendLogOptions $ _pspEnableLogCompression _psoProServeParams
-- | Fetch @pro_config@ from lux (fallback to postgres if lux errs), and build the 'FirewallCtx'
-- also start a background thread to send hearbeat and update the total count of instances
mkProCtx
:: (HGE.HasVersion)
=> HGE.InstanceId
-> HGE.PGExecCtx
-> Maybe Redis.Connection
-> L.Logger HasuraPro
-> L.Logger L.Hasura
-> HTTP.Manager
-> ProCredentials
-> OAuthCredsRef
-> LuxEndpoints
-> IO (ConfigsCtx RedisStore, [Immortal.Thread])
mkProCtx _instanceId pgCtx redisConn logger ossLogger httpManager proCreds credsRef exOpts = do
let projectId = _pcProjectId proCreds
accessTokenE <- fmap _oatAccessToken <$> liftIO (IORef.readIORef credsRef)
-- if there is an access token fetch config from lux
configResE <- runExceptT $ case accessTokenE of
Right accessToken -> do
let headers = [("Authorization", txtToBs $ "Bearer " <> unAccessToken accessToken)]
fetchConfigFromLux httpManager exOpts headers projectId
Left _ -> noAccessTokenErr False
config <- case configResE of
Left e -> do
logLuxConfigError e
fmap toV3ProConfig getFallbackConf
Right conf -> return conf
-- create the in-memory cache of config from the conf from pg
(fwConfigRef, projectConfigRef) <- liftIO $ (,)
<$> StmTVar.newTVarIO (pcv3ApiLimits config)
<*> StmTVar.newTVarIO (pcv3ProjectConfigs config)
let fwCtx = FirewallCtx (RedisStore redisConn) fwConfigRef
proCtx = ConfigsCtx fwCtx projectConfigRef
-- update the current db with the latest config (from lux)
resE <- runExceptT $ updateProConfig pgCtx proCtx config
either logWritePgConfigError return resE
-- start the proconfig update thread as well
configThreadId <- updateProConfigThread ossLogger pgCtx proCtx
return (proCtx, [configThreadId])
where
-- TODO: is the 0 in version correct? - will this have any userfacing implications?
defaultConf = PCV3 $ ProConfigV3 emptyFwConf defaultProjectConfig
emptyFwConf = FirewallConfig Nothing Nothing False 0
getFallbackConf = do
proConfigTx <- runRWTx pgCtx readPersistentProConfig
res <- either logPgConfigError return proConfigTx
return $ maybe defaultConf _ppcConfig res
logLuxConfigError err = do
let msg = "Could not fetch Hasura Pro configuration. Using fallback config."
liftIO $ L.unLogger logger $ ProConfigErrorLog (J.toJSON err) msg
logPgConfigError err = do
let msg = "Could not get Hasura Pro fallback configuration from db cache"
logAndExitOnError logger ProErrors.LoadConfigFromDbCacheError $
ProConfigErrorLog (J.toJSON err) msg
logWritePgConfigError err = do
let msg = "Fetched latest config from Lux, but failed writing to Postgres"
logAndExitOnError logger ProErrors.WriteConfigToDbCacheError $
ProConfigErrorLog (J.toJSON err) msg