Skip to content

Commit

Permalink
server: fix mishandling of GeoJSON inputs in subscriptions (fix #3239) (
Browse files Browse the repository at this point in the history
#4551)

* Add support for multiple top-level fields in a subscription to improve testability of subscriptions

* Add an internal flag to enable multiple subscriptions

* Add missing call to withConstructorFn in live queries (fix #3239)

Co-authored-by: Alexis King <lexi.lambda@gmail.com>
  • Loading branch information
Auke Booij and lexi-lambda committed May 13, 2020
1 parent 9a16e25 commit 4d10a61
Show file tree
Hide file tree
Showing 18 changed files with 170 additions and 150 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -37,6 +37,7 @@ Read more about the session argument for computed fields in the [docs](https://h

(Add entries here in the order of: server, console, cli, docs, others)

- server: fix mishandling of GeoJSON inputs in subscriptions (fix #3239)
- console: avoid count queries for large tables (#4692)
- console: add read replica support section to pro popup (#4118)
- console: allow modifying default value for PK (fix #4075) (#4679)
Expand Down
33 changes: 4 additions & 29 deletions server/src-lib/Hasura/GraphQL/Execute.hs
Expand Up @@ -331,32 +331,6 @@ getMutOp ctx sqlGenCtx userInfo manager reqHeaders selSet =
ordByCtx = _gOrdByCtx ctx
insCtxMap = _gInsCtxMap ctx

getSubsOpM
:: ( MonadError QErr m
, MonadReader r m
, Has QueryCtxMap r
, Has FieldMap r
, Has OrdByCtx r
, Has SQLGenCtx r
, Has UserInfo r
, MonadIO m
, HasVersion
)
=> PGExecCtx
-> QueryReusability
-> VQ.Field
-> QueryActionExecuter
-> m (EL.LiveQueryPlan, Maybe EL.ReusableLiveQueryPlan)
getSubsOpM pgExecCtx initialReusability fld actionExecuter =
case VQ._fName fld of
"__typename" ->
throwVE "you cannot create a subscription on '__typename' field"
_ -> do
(astUnresolved, finalReusability) <- runReusabilityTWith initialReusability $
GR.queryFldToPGAST fld actionExecuter
let varTypes = finalReusability ^? _Reusable
EL.buildLiveQueryPlan pgExecCtx (VQ._fAlias fld) astUnresolved varTypes

getSubsOp
:: ( MonadError QErr m
, MonadIO m
Expand All @@ -368,10 +342,11 @@ getSubsOp
-> UserInfo
-> QueryReusability
-> QueryActionExecuter
-> VQ.Field
-> VQ.SelSet
-> m (EL.LiveQueryPlan, Maybe EL.ReusableLiveQueryPlan)
getSubsOp pgExecCtx gCtx sqlGenCtx userInfo queryReusability actionExecuter fld =
runE gCtx sqlGenCtx userInfo $ getSubsOpM pgExecCtx queryReusability fld actionExecuter
getSubsOp pgExecCtx gCtx sqlGenCtx userInfo queryReusability actionExecuter fields =
runE gCtx sqlGenCtx userInfo $ EL.buildLiveQueryPlan pgExecCtx queryReusability actionExecuter fields
-- runE gCtx sqlGenCtx userInfo $ getSubsOpM pgExecCtx queryReusability fld actionExecuter

execRemoteGQ
:: ( HasVersion
Expand Down
100 changes: 65 additions & 35 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs
Expand Up @@ -44,43 +44,61 @@ import Data.Has
import Data.UUID (UUID)

import qualified Hasura.GraphQL.Resolve as GR
import qualified Hasura.GraphQL.Resolve.Action as RA
import qualified Hasura.GraphQL.Resolve.Types as GR
import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH
import qualified Hasura.GraphQL.Validate as GV
import qualified Hasura.GraphQL.Validate.Types as GV
import qualified Hasura.SQL.DML as S

import Hasura.Db
import Hasura.EncJSON
import Hasura.GraphQL.Utils
import Hasura.RQL.Types
import Hasura.SQL.Error
import Hasura.SQL.Types
import Hasura.SQL.Value
import Hasura.Server.Version (HasVersion)

-- -------------------------------------------------------------------------------------------------
-- Multiplexed queries

newtype MultiplexedQuery = MultiplexedQuery { unMultiplexedQuery :: Q.Query }
deriving (Show, Eq, Hashable, J.ToJSON)

mkMultiplexedQuery :: Q.Query -> MultiplexedQuery
mkMultiplexedQuery baseQuery =
MultiplexedQuery . Q.fromText $ foldMap Q.getQueryText [queryPrefix, baseQuery, querySuffix]
mkMultiplexedQuery :: Map.HashMap G.Alias GR.QueryRootFldResolved -> MultiplexedQuery
mkMultiplexedQuery rootFields = MultiplexedQuery . Q.fromBuilder . toSQL $ S.mkSelect
{ S.selExtr =
-- SELECT _subs.result_id, _fld_resp.root AS result
[ S.Extractor (mkQualIden (Iden "_subs") (Iden "result_id")) Nothing
, S.Extractor (mkQualIden (Iden "_fld_resp") (Iden "root")) (Just . S.Alias $ Iden "result") ]
, S.selFrom = Just $ S.FromExp [S.FIJoin $
S.JoinExpr subsInputFromItem S.LeftOuter responseLateralFromItem (S.JoinOn $ S.BELit True)]
}
where
queryPrefix =
[Q.sql|
select
_subs.result_id, _fld_resp.root as result
from
unnest(
$1::uuid[], $2::json[]
) _subs (result_id, result_vars)
left outer join lateral
(
|]

querySuffix =
[Q.sql|
) _fld_resp ON ('true')
|]
-- FROM unnest($1::uuid[], $2::json[]) _subs (result_id, result_vars)
subsInputFromItem = S.FIUnnest
[S.SEPrep 1 `S.SETyAnn` S.TypeAnn "uuid[]", S.SEPrep 2 `S.SETyAnn` S.TypeAnn "json[]"]
(S.Alias $ Iden "_subs")
[S.SEIden $ Iden "result_id", S.SEIden $ Iden "result_vars"]

-- LEFT OUTER JOIN LATERAL ( ... ) _fld_resp
responseLateralFromItem = S.mkLateralFromItem selectRootFields (S.Alias $ Iden "_fld_resp")
selectRootFields = S.mkSelect
{ S.selExtr = [S.Extractor rootFieldsJsonAggregate (Just . S.Alias $ Iden "root")]
, S.selFrom = Just . S.FromExp $
flip map (Map.toList rootFields) $ \(fieldAlias, resolvedAST) ->
S.mkSelFromItem (GR.toSQLSelect resolvedAST) (S.Alias $ aliasToIden fieldAlias)
}

-- json_build_object('field1', field1.root, 'field2', field2.root, ...)
rootFieldsJsonAggregate = S.SEFnApp "json_build_object" rootFieldsJsonPairs Nothing
rootFieldsJsonPairs = flip concatMap (Map.keys rootFields) $ \fieldAlias ->
[ S.SELit (G.unName $ G.unAlias fieldAlias)
, mkQualIden (aliasToIden fieldAlias) (Iden "root") ]

mkQualIden prefix = S.SEQIden . S.QIden (S.QualIden prefix Nothing) -- TODO fix this Nothing of course
aliasToIden = Iden . G.unName . G.unAlias

-- | Resolves an 'GR.UnresolvedVal' by converting 'GR.UVPG' values to SQL expressions that refer to
-- the @result_vars@ input object, collecting variable values along the way.
Expand All @@ -103,11 +121,13 @@ resolveMultiplexedValue = \case
GR.UVSQL sqlExp -> pure sqlExp
GR.UVSession -> pure $ fromResVars (PGTypeScalar PGJSON) ["session"]
where
fromResVars ty jPath =
flip S.SETyAnn (S.mkTypeAnn ty) $ S.SEOpApp (S.SQLOp "#>>")
fromResVars pgType jPath = addTypeAnnotation pgType $ S.SEOpApp (S.SQLOp "#>>")
[ S.SEQIden $ S.QIden (S.QualIden (Iden "_subs") Nothing) (Iden "result_vars")
, S.SEArray $ map S.SELit jPath
]
addTypeAnnotation pgType = flip S.SETyAnn (S.mkTypeAnn pgType) . case pgType of
PGTypeScalar scalarType -> withConstructorFn scalarType
PGTypeArray _ -> id

newtype CohortId = CohortId { unCohortId :: UUID }
deriving (Show, Eq, Hashable, J.ToJSON, Q.FromCol)
Expand Down Expand Up @@ -230,7 +250,6 @@ data LiveQueryPlan
data ParameterizedLiveQueryPlan
= ParameterizedLiveQueryPlan
{ _plqpRole :: !RoleName
, _plqpAlias :: !G.Alias
, _plqpQuery :: !MultiplexedQuery
} deriving (Show)
$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ParameterizedLiveQueryPlan)
Expand All @@ -249,30 +268,41 @@ buildLiveQueryPlan
:: ( MonadError QErr m
, MonadReader r m
, Has UserInfo r
, Has GR.FieldMap r
, Has GR.OrdByCtx r
, Has GR.QueryCtxMap r
, Has SQLGenCtx r
, HasVersion
, MonadIO m
)
=> PGExecCtx
-> G.Alias
-> GR.QueryRootFldUnresolved
-> Maybe GV.ReusableVariableTypes
-> GV.QueryReusability
-> RA.QueryActionExecuter
-> GV.SelSet
-> m (LiveQueryPlan, Maybe ReusableLiveQueryPlan)
buildLiveQueryPlan pgExecCtx fieldAlias astUnresolved varTypes = do
userInfo <- asks getter
buildLiveQueryPlan pgExecCtx initialReusability actionExecutioner fields = do
((resolvedASTs, (queryVariableValues, syntheticVariableValues)), finalReusability) <-
GV.runReusabilityTWith initialReusability . flip runStateT mempty $
fmap Map.fromList . for (toList fields) $ \field -> case GV._fName field of
"__typename" -> throwVE "you cannot create a subscription on '__typename' field"
_ -> do
unresolvedAST <- GR.queryFldToPGAST field actionExecutioner
resolvedAST <- GR.traverseQueryRootFldAST resolveMultiplexedValue unresolvedAST
pure (GV._fAlias field, resolvedAST)

(astResolved, (queryVariableValues, syntheticVariableValues)) <- flip runStateT mempty $
GR.traverseQueryRootFldAST resolveMultiplexedValue astUnresolved
let pgQuery = mkMultiplexedQuery $ GR.toPGQuery astResolved
userInfo <- asks getter
let multiplexedQuery = mkMultiplexedQuery resolvedASTs
roleName = _uiRole userInfo
parameterizedPlan = ParameterizedLiveQueryPlan roleName fieldAlias pgQuery
parameterizedPlan = ParameterizedLiveQueryPlan roleName multiplexedQuery

-- We need to ensure that the values provided for variables
-- are correct according to Postgres. Without this check
-- an invalid value for a variable for one instance of the
-- subscription will take down the entire multiplexed query
-- We need to ensure that the values provided for variables are correct according to Postgres.
-- Without this check an invalid value for a variable for one instance of the subscription will
-- take down the entire multiplexed query.
validatedQueryVars <- validateVariables pgExecCtx queryVariableValues
validatedSyntheticVars <- validateVariables pgExecCtx (toList syntheticVariableValues)
let cohortVariables = CohortVariables (_uiSession userInfo) validatedQueryVars validatedSyntheticVars
plan = LiveQueryPlan parameterizedPlan cohortVariables
varTypes = finalReusability ^? GV._Reusable
reusablePlan = ReusableLiveQueryPlan parameterizedPlan validatedSyntheticVars <$> varTypes
pure (plan, reusablePlan)

Expand Down
18 changes: 3 additions & 15 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs
Expand Up @@ -44,7 +44,6 @@ import qualified Data.HashMap.Strict as Map
import qualified Data.Time.Clock as Clock
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import qualified Language.GraphQL.Draft.Syntax as G
import qualified ListT
import qualified StmContainers.Map as STMMap
import qualified System.Metrics.Distribution as Metrics
Expand All @@ -65,11 +64,7 @@ import Hasura.Session
-- -------------------------------------------------------------------------------------------------
-- Subscribers

data Subscriber
= Subscriber
{ _sRootAlias :: !G.Alias
, _sOnChangeCallback :: !OnChange
}
newtype Subscriber = Subscriber { _sOnChangeCallback :: OnChange }

-- | live query onChange metadata, used for adding more extra analytics data
data LiveQueryMetadata
Expand All @@ -85,7 +80,6 @@ data LiveQueryResponse
}

type LGQResponse = GQResult LiveQueryResponse

type OnChange = LGQResponse -> IO ()

newtype SubscriberId = SubscriberId { _unSinkId :: UUID.UUID }
Expand Down Expand Up @@ -183,7 +177,6 @@ data CohortSnapshot

pushResultToCohort
:: GQResult EncJSON
-- ^ a response that still needs to be wrapped with each 'Subscriber'’s root 'G.Alias'
-> Maybe ResponseHash
-> LiveQueryMetadata
-> CohortSnapshot
Expand All @@ -202,13 +195,8 @@ pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot =
pushResultToSubscribers sinks
where
CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber alias action) ->
let aliasText = G.unName $ G.unAlias alias
wrapWithAlias response = LiveQueryResponse
{ _lqrPayload = encJToLBS $ encJFromAssocList [(aliasText, response)]
, _lqrExecutionTime = dTime
}
in action (wrapWithAlias <$> result)
response = result <&> \payload -> LiveQueryResponse (encJToLBS payload) dTime
pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber action) -> action response

-- -------------------------------------------------------------------------------------------------
-- Pollers
Expand Down
4 changes: 2 additions & 2 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs
Expand Up @@ -107,11 +107,11 @@ addLiveQuery logger lqState plan onResultAction = do
where
LiveQueriesState lqOpts pgExecCtx lqMap = lqState
LiveQueriesOptions batchSize refetchInterval = lqOpts
LiveQueryPlan (ParameterizedLiveQueryPlan role alias query) cohortKey = plan
LiveQueryPlan (ParameterizedLiveQueryPlan role query) cohortKey = plan

handlerId = PollerKey role query

!subscriber = Subscriber alias onResultAction
!subscriber = Subscriber onResultAction
addToCohort sinkId handlerC =
TMap.insert subscriber sinkId $ _cNewSubscribers handlerC

Expand Down
22 changes: 16 additions & 6 deletions server/src-lib/Hasura/GraphQL/Resolve.hs
Expand Up @@ -12,6 +12,7 @@ module Hasura.GraphQL.Resolve
, QueryRootFldUnresolved
, QueryRootFldResolved
, toPGQuery
, toSQLSelect

, RIntro.schemaR
, RIntro.typeR
Expand Down Expand Up @@ -68,12 +69,12 @@ traverseQueryRootFldAST f = \case

toPGQuery :: QueryRootFldResolved -> Q.Query
toPGQuery = \case
QRFPk s -> DS.selectQuerySQL DS.JASSingleObject s
QRFSimple s -> DS.selectQuerySQL DS.JASMultipleRows s
QRFAgg s -> DS.selectAggQuerySQL s
QRFActionSelect s -> DS.selectQuerySQL DS.JASSingleObject s
QRFActionExecuteObject s -> DS.selectQuerySQL DS.JASSingleObject s
QRFActionExecuteList s -> DS.selectQuerySQL DS.JASMultipleRows s
QRFPk s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASSingleObject s
QRFSimple s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASMultipleRows s
QRFAgg s -> Q.fromBuilder $ toSQL $ DS.mkAggSelect s
QRFActionSelect s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASSingleObject s
QRFActionExecuteObject s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASSingleObject s
QRFActionExecuteList s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASMultipleRows s

validateHdrs
:: (Foldable t, QErrM m) => UserInfo -> t Text -> m ()
Expand Down Expand Up @@ -189,3 +190,12 @@ getOpCtx f = do
opCtxMap <- asks getter
onNothing (Map.lookup f opCtxMap) $ throw500 $
"lookup failed: opctx: " <> showName f

toSQLSelect :: QueryRootFldResolved -> S.Select
toSQLSelect = \case
QRFPk s -> DS.mkSQLSelect DS.JASSingleObject s
QRFSimple s -> DS.mkSQLSelect DS.JASMultipleRows s
QRFAgg s -> DS.mkAggSelect s
QRFActionSelect s -> DS.mkSQLSelect DS.JASSingleObject s
QRFActionExecuteObject s -> DS.mkSQLSelect DS.JASSingleObject s
QRFActionExecuteList s -> DS.mkSQLSelect DS.JASSingleObject s
2 changes: 1 addition & 1 deletion server/src-lib/Hasura/GraphQL/Resolve/Action.hs
Expand Up @@ -171,7 +171,7 @@ resolveActionMutationSync field executionContext sessionVariables = do
(_fType field) $ _fSelSet field
astResolved <- RS.traverseAnnSimpleSel resolveValTxt selectAstUnresolved
let jsonAggType = mkJsonAggSelect outputType
return $ (,respHeaders) $ asSingleRowJsonResp (RS.selectQuerySQL jsonAggType astResolved) []
return $ (,respHeaders) $ asSingleRowJsonResp (Q.fromBuilder $ toSQL $ RS.mkSQLSelect jsonAggType astResolved) []
where
ActionExecutionContext actionName outputType outputFields definitionList resolvedWebhook confHeaders
forwardClientHeaders = executionContext
Expand Down
22 changes: 12 additions & 10 deletions server/src-lib/Hasura/GraphQL/Validate.hs
Expand Up @@ -21,8 +21,8 @@ import Data.Has

import qualified Data.HashMap.Strict as Map
import qualified Data.HashSet as HS
import qualified Data.Sequence as Seq
import qualified Language.GraphQL.Draft.Syntax as G
import qualified Data.Sequence as Seq

import Hasura.GraphQL.Schema
import Hasura.GraphQL.Transport.HTTP.Protocol
Expand Down Expand Up @@ -58,8 +58,7 @@ getTypedOp opNameM selSets opDefs =
throwVE $ "operationName cannot be used when " <>
"an anonymous operation exists in the document"
(Nothing, [selSet], []) ->
return $ G.TypedOperationDefinition
G.OperationTypeQuery Nothing [] [] selSet
return $ G.TypedOperationDefinition G.OperationTypeQuery Nothing [] [] selSet
(Nothing, [], [opDef]) ->
return opDef
(Nothing, _, _) ->
Expand Down Expand Up @@ -145,7 +144,7 @@ validateFrag (G.FragmentDefinition n onTy dirs selSet) = do
data RootSelSet
= RQuery !SelSet
| RMutation !SelSet
| RSubscription !Field
| RSubscription !SelSet
deriving (Show, Eq)

validateGQ
Expand All @@ -172,12 +171,15 @@ validateGQ (QueryParts opDef opRoot fragDefsL varValsM) = do
G.OperationTypeQuery -> return $ RQuery selSet
G.OperationTypeMutation -> return $ RMutation selSet
G.OperationTypeSubscription ->
case Seq.viewl selSet of
Seq.EmptyL -> throw500 "empty selset for subscription"
fld Seq.:< rst -> do
unless (null rst) $
throwVE "subscription must select only one top level field"
return $ RSubscription fld
case selSet of
Seq.Empty -> throw500 "empty selset for subscription"
(_ Seq.:<| rst) -> do
-- As an internal testing feature, we support subscribing to multiple
-- selection sets. First check if the corresponding directive is set.
let multipleAllowed = elem (G.Directive "_multiple_top_level_fields" []) (G._todDirectives opDef)
unless (multipleAllowed || null rst) $
throwVE "subscriptions must select one top level field"
return $ RSubscription selSet

isQueryInAllowlist :: GQLExecDoc -> HS.HashSet GQLQuery -> Bool
isQueryInAllowlist q = HS.member gqlQuery
Expand Down
3 changes: 3 additions & 0 deletions server/src-lib/Hasura/GraphQL/Validate/Types.hs
Expand Up @@ -825,6 +825,9 @@ class (Monad m) => MonadReusability m where
instance (MonadReusability m) => MonadReusability (ReaderT r m) where
recordVariableUse a b = lift $ recordVariableUse a b
markNotReusable = lift markNotReusable
instance (MonadReusability m) => MonadReusability (StateT s m) where
recordVariableUse a b = lift $ recordVariableUse a b
markNotReusable = lift markNotReusable

newtype ReusabilityT m a = ReusabilityT { unReusabilityT :: StateT QueryReusability m a }
deriving (Functor, Applicative, Monad, MonadError e, MonadReader r, MonadIO)
Expand Down

0 comments on commit 4d10a61

Please sign in to comment.