Skip to content

Commit

Permalink
Increase timeout for DC Agents
Browse files Browse the repository at this point in the history
PR-URL: hasura/graphql-engine-mono#5172
GitOrigin-RevId: 1d286447901e34a77518e062315b80f4f775eebf
  • Loading branch information
sordina authored and hasura-bot committed Jul 27, 2022
1 parent e4dad73 commit 607497f
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 28 deletions.
2 changes: 1 addition & 1 deletion dc-agents/sdk/docker-compose.yaml
Expand Up @@ -69,4 +69,4 @@ services:
- 8300:8080

volumes:
db_data:
db_data:
3 changes: 2 additions & 1 deletion server/src-lib/Hasura/Backends/BigQuery/DDL/Source.hs
Expand Up @@ -48,8 +48,9 @@ resolveSourceConfig ::
BackendSourceKind 'BigQuery ->
BackendConfig 'BigQuery ->
Env.Environment ->
manager ->
m (Either QErr BigQuerySourceConfig)
resolveSourceConfig _logger _name BigQueryConnSourceConfig {..} _backendKind _backendConfig env = runExceptT $ do
resolveSourceConfig _logger _name BigQueryConnSourceConfig {..} _backendKind _backendConfig env _manager = runExceptT $ do
eSA <- resolveConfigurationJson env _cscServiceAccount
case eSA of
Left e -> throw400 Unexpected $ T.pack e
Expand Down
Expand Up @@ -69,27 +69,27 @@ resolveSourceConfig' ::
BackendSourceKind 'DataConnector ->
DC.DataConnectorBackendConfig ->
Environment ->
HTTP.Manager ->
m (Either QErr DC.SourceConfig)
resolveSourceConfig' logger sourceName csc@ConnSourceConfig {template, value = originalConfig} (DataConnectorKind dataConnectorName) backendConfig env = runExceptT do
resolveSourceConfig' logger sourceName csc@ConnSourceConfig {template, timeout, value = originalConfig} (DataConnectorKind dataConnectorName) backendConfig env manager = runExceptT do
DC.DataConnectorOptions {..} <-
OMap.lookup dataConnectorName backendConfig
`onNothing` throw400 DataConnectorError ("Data connector named " <> toTxt dataConnectorName <> " was not found in the data connector backend config")

transformedConfig <- transformConnSourceConfig csc [("$session", J.object []), ("$env", J.toJSON env)] env
manager <- liftIO $ HTTP.newManager HTTP.defaultManagerSettings

-- TODO: capabilities applies to all sources for an agent.
-- We should be able to call it once per agent and store it in the SchemaCache
API.CapabilitiesResponse {..} <-
runTraceTWithReporter noReporter "capabilities"
. flip runAgentClientT (AgentClientContext logger _dcoUri manager)
. flip runAgentClientT (AgentClientContext logger _dcoUri manager (DC.sourceTimeoutMicroseconds <$> timeout))
$ genericClient // API._capabilities

validateConfiguration sourceName dataConnectorName crConfigSchemaResponse transformedConfig

schemaResponse <-
runTraceTWithReporter noReporter "resolve source"
. flip runAgentClientT (AgentClientContext logger _dcoUri manager)
. flip runAgentClientT (AgentClientContext logger _dcoUri manager (DC.sourceTimeoutMicroseconds <$> timeout))
$ (genericClient // API._schema) (toTxt sourceName) transformedConfig

pure
Expand All @@ -100,6 +100,7 @@ resolveSourceConfig' logger sourceName csc@ConnSourceConfig {template, value = o
_scCapabilities = crCapabilities,
_scSchema = schemaResponse,
_scManager = manager,
_scTimeoutMicroseconds = (DC.sourceTimeoutMicroseconds <$> timeout),
_scDataConnectorName = dataConnectorName
}

Expand Down
Expand Up @@ -58,7 +58,7 @@ runDBQuery' requestId query fieldName _userInfo logger SourceConfig {..} action
withElapsedTime
. Tracing.trace ("Data Connector backend query for root field " <>> fieldName)
. Tracing.interpTraceT (liftEitherM . liftIO . runExceptT)
. flip runAgentClientT (AgentClientContext logger _scEndpoint _scManager)
. flip runAgentClientT (AgentClientContext logger _scEndpoint _scManager _scTimeoutMicroseconds)
$ action

mkQueryLog ::
Expand All @@ -82,5 +82,5 @@ runDBQueryExplain' (DBStepInfo _ SourceConfig {..} _ action) =
liftEitherM . liftIO
. runExceptT
. Tracing.runTraceTWithReporter Tracing.noReporter "explain"
. flip runAgentClientT (AgentClientContext nullLogger _scEndpoint _scManager)
. flip runAgentClientT (AgentClientContext nullLogger _scEndpoint _scManager _scTimeoutMicroseconds)
$ action
58 changes: 46 additions & 12 deletions server/src-lib/Hasura/Backends/DataConnector/Adapter/Types.hs
Expand Up @@ -7,6 +7,8 @@ module Hasura.Backends.DataConnector.Adapter.Types
DataConnectorName (..),
DataConnectorOptions (..),
CountType (..),
SourceTimeout (),
sourceTimeoutMicroseconds,
)
where

Expand All @@ -19,13 +21,14 @@ import Hasura.Backends.DataConnector.API qualified as API
import Hasura.Backends.DataConnector.IR.Column qualified as IR.C
import Hasura.Incremental (Cacheable (..))
import Hasura.Prelude
import Network.HTTP.Client (Manager)
import Network.HTTP.Client qualified as HTTP
import Servant.Client (BaseUrl)
import Witch qualified

data ConnSourceConfig = ConnSourceConfig
{ value :: API.Config,
template :: Maybe Text
template :: Maybe Text,
timeout :: Maybe SourceTimeout
}
deriving stock (Eq, Ord, Show, Generic)
deriving anyclass (Hashable, NFData, ToJSON)
Expand All @@ -36,37 +39,68 @@ data ConnSourceConfig = ConnSourceConfig
instance FromJSON ConnSourceConfig where
parseJSON = J.withObject "ConnSourceConfig" \o ->
case J.lookup "value" o of
Just _ -> ConnSourceConfig <$> o J..: "value" <*> o J..:? "template"
Nothing -> pure $ ConnSourceConfig (API.Config o) Nothing
Just _ -> ConnSourceConfig <$> o J..: "value" <*> o J..:? "template" <*> (o J..:? "timeout")
Nothing -> ConnSourceConfig (API.Config o) Nothing <$> (o J..:? "timeout")

instance Cacheable ConnSourceConfig where
unchanged _ = (==)

-- NOTE: There may be a time type with units datatype already available somewhere
data SourceTimeout
= SourceTimeoutSeconds Int
| SourceTimeoutMilliseconds Int
| SourceTimeoutMicroseconds Int
deriving stock (Eq, Ord, Show, Generic)
deriving anyclass (Hashable, NFData)

sourceTimeoutMicroseconds :: SourceTimeout -> Int
sourceTimeoutMicroseconds = \case
SourceTimeoutSeconds s -> s * 1000000
SourceTimeoutMilliseconds m -> m * 1000
SourceTimeoutMicroseconds u -> u

instance FromJSON SourceTimeout where
parseJSON = J.withObject "SourceTimeout" \o ->
case J.toList o of
[("seconds", n)] -> convertTimeout n "seconds" SourceTimeoutSeconds
[("milliseconds", n)] -> convertTimeout n "milliseconds" SourceTimeoutMilliseconds
[("microseconds", n)] -> convertTimeout n "microseconds" SourceTimeoutMicroseconds
_ -> fail "Invalid SourceTimeout. Formats include: {seconds: Int}, {milliseconds: Int}, {microseconds: Int}"
where
convertTimeout n l m = J.withScientific l (\s -> pure $ m (round s)) n

instance ToJSON SourceTimeout where
toJSON (SourceTimeoutSeconds t) = J.object ["seconds" J..= t]
toJSON (SourceTimeoutMilliseconds t) = J.object ["milliseconds" J..= t]
toJSON (SourceTimeoutMicroseconds t) = J.object ["microseconds" J..= t]

data SourceConfig = SourceConfig
{ _scEndpoint :: BaseUrl,
_scConfig :: API.Config,
_scTemplate :: Maybe Text, -- TODO: Use Parsed Kriti Template
_scCapabilities :: API.Capabilities,
_scSchema :: API.SchemaResponse,
_scManager :: Manager,
_scManager :: HTTP.Manager,
_scTimeoutMicroseconds :: Maybe Int,
_scDataConnectorName :: DataConnectorName
}

instance Show SourceConfig where
show _ = "SourceConfig"

instance J.ToJSON SourceConfig where
toJSON _ = J.String "SourceConfig"

instance Eq SourceConfig where
SourceConfig ep1 capabilities1 config1 template1 schema1 _ dcName1 == SourceConfig ep2 capabilities2 config2 template2 schema2 _ dcName2 =
SourceConfig ep1 capabilities1 config1 template1 schema1 _ timeout1 dcName1 == SourceConfig ep2 capabilities2 config2 template2 schema2 _ timeout2 dcName2 =
ep1 == ep2
&& capabilities1 == capabilities2
&& config1 == config2
&& template1 == template2
&& schema1 == schema2
&& timeout1 == timeout2
&& dcName1 == dcName2

instance Show SourceConfig where
show _ = "SourceConfig"

instance J.ToJSON SourceConfig where
toJSON _ = J.String "SourceConfig"

instance Cacheable SourceConfig where
unchanged _ = (==)

Expand Down
11 changes: 9 additions & 2 deletions server/src-lib/Hasura/Backends/DataConnector/Agent/Client.hs
Expand Up @@ -8,6 +8,7 @@ module Hasura.Backends.DataConnector.Agent.Client
where

import Control.Exception (try)
import Control.Lens ((&~), (.=))
import Hasura.Backends.DataConnector.Logging (logAgentRequest, logClientError)
import Hasura.Base.Error
import Hasura.HTTP qualified
Expand All @@ -25,7 +26,8 @@ import Servant.Client.Internal.HttpClient (clientResponseToResponse, mkFailureRe
data AgentClientContext = AgentClientContext
{ _accLogger :: Logger Hasura,
_accBaseUrl :: BaseUrl,
_accHttpManager :: Manager
_accHttpManager :: Manager,
_accResponseTimeout :: Maybe Int
}

newtype AgentClientT m a = AgentClientT (ReaderT AgentClientContext m a)
Expand All @@ -50,7 +52,12 @@ runRequestAcceptStatus' acceptStatus req = do
TransformableHTTP.tryFromClientRequest req'
`onLeft` (\err -> throw500 $ "Error in Data Connector backend: Could not create request. " <> err)

(tracedReq, responseOrException) <- tracedHttpRequest transformableReq (\tracedReq -> fmap (tracedReq,) . liftIO . try @HTTP.HttpException $ TransformableHTTP.performRequest tracedReq _accHttpManager)
-- Set the response timeout explicitly if it is provided
let transformableReq' =
transformableReq &~ do
for _accResponseTimeout \x -> TransformableHTTP.timeout .= HTTP.responseTimeoutMicro x

(tracedReq, responseOrException) <- tracedHttpRequest transformableReq' (\tracedReq -> fmap (tracedReq,) . liftIO . try @HTTP.HttpException $ TransformableHTTP.performRequest tracedReq _accHttpManager)
logAgentRequest _accLogger tracedReq responseOrException
case responseOrException of
Left ex ->
Expand Down
3 changes: 2 additions & 1 deletion server/src-lib/Hasura/Backends/MSSQL/DDL/Source.hs
Expand Up @@ -55,8 +55,9 @@ resolveSourceConfig ::
BackendSourceKind 'MSSQL ->
BackendConfig 'MSSQL ->
Env.Environment ->
manager ->
m (Either QErr MSSQLSourceConfig)
resolveSourceConfig _logger name config _backendKind _backendConfig _env = runExceptT do
resolveSourceConfig _logger name config _backendKind _backendConfig _env _manager = runExceptT do
sourceResolver <- getMSSQLSourceResolver
liftEitherM $ liftIO $ sourceResolver name config

Expand Down
5 changes: 2 additions & 3 deletions server/src-lib/Hasura/Backends/MySQL/Connection.hs
Expand Up @@ -16,7 +16,6 @@ import Data.Aeson.Key qualified as K
import Data.Aeson.KeyMap qualified as KM
import Data.Aeson.Text (encodeToTextBuilder)
import Data.ByteString (ByteString)
import Data.Environment qualified as Env
import Data.HashMap.Strict.InsOrd qualified as OMap
import Data.Pool
import Data.Scientific (fromFloatDigits)
Expand All @@ -43,8 +42,8 @@ import Hasura.RQL.Types.SourceCustomization
import Hasura.RQL.Types.Table (TableEventTriggers)
import Hasura.SQL.Backend

resolveSourceConfig :: (MonadIO m) => Logger Hasura -> SourceName -> ConnSourceConfig -> BackendSourceKind 'MySQL -> BackendConfig 'MySQL -> Env.Environment -> m (Either QErr SourceConfig)
resolveSourceConfig _logger _name csc@ConnSourceConfig {_cscPoolSettings = ConnPoolSettings {..}, ..} _backendKind _backendConfig _env = do
resolveSourceConfig :: (MonadIO m) => Logger Hasura -> SourceName -> ConnSourceConfig -> BackendSourceKind 'MySQL -> BackendConfig 'MySQL -> environment -> manager -> m (Either QErr SourceConfig)
resolveSourceConfig _logger _name csc@ConnSourceConfig {_cscPoolSettings = ConnPoolSettings {..}, ..} _backendKind _backendConfig _env _manager = do
let connectInfo =
defaultConnectInfo
{ connectHost = T.unpack _cscHost,
Expand Down
3 changes: 2 additions & 1 deletion server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs
Expand Up @@ -77,8 +77,9 @@ resolveSourceConfig ::
BackendSourceKind ('Postgres pgKind) ->
BackendConfig ('Postgres pgKind) ->
Env.Environment ->
manager ->
m (Either QErr (SourceConfig ('Postgres pgKind)))
resolveSourceConfig _logger name config _backendKind _backendConfig _env = runExceptT do
resolveSourceConfig _logger name config _backendKind _backendConfig _env _manager = runExceptT do
sourceResolver <- getPGSourceResolver
liftEitherM $ liftIO $ sourceResolver name config

Expand Down
5 changes: 4 additions & 1 deletion server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs
Expand Up @@ -355,6 +355,7 @@ buildSchemaCacheRule logger env = proc (metadata, invalidationKeys) -> do
ArrowWriter (Seq CollectedInfo) arr,
MonadIO m,
MonadResolveSource m,
HasHttpManagerM m,
BackendMetadata b
) =>
( Inc.Dependency (HashMap SourceName Inc.InvalidationKey),
Expand All @@ -366,10 +367,11 @@ buildSchemaCacheRule logger env = proc (metadata, invalidationKeys) -> do
`arr` Maybe (SourceConfig b)
getSourceConfigIfNeeded = Inc.cache proc (invalidationKeys, sourceName, sourceConfig, backendKind, backendConfig) -> do
let metadataObj = MetadataObject (MOSource sourceName) $ toJSON sourceName
httpMgr <- bindA -< askHttpManager
Inc.dependOn -< Inc.selectKeyD sourceName invalidationKeys
(|
withRecordInconsistency
( liftEitherA <<< bindA -< resolveSourceConfig @b logger sourceName sourceConfig backendKind backendConfig env
( liftEitherA <<< bindA -< resolveSourceConfig @b logger sourceName sourceConfig backendKind backendConfig env httpMgr
)
|) metadataObj

Expand All @@ -381,6 +383,7 @@ buildSchemaCacheRule logger env = proc (metadata, invalidationKeys) -> do
MonadIO m,
MonadBaseControl IO m,
MonadResolveSource m,
HasHttpManagerM m,
BackendMetadata b
) =>
( Inc.Dependency (HashMap SourceName Inc.InvalidationKey),
Expand Down
2 changes: 2 additions & 0 deletions server/src-lib/Hasura/RQL/Types/Metadata/Backend.hs
Expand Up @@ -26,6 +26,7 @@ import Hasura.RQL.Types.SourceCustomization
import Hasura.RQL.Types.Table
import Hasura.SQL.Backend
import Hasura.SQL.Types
import Network.HTTP.Client qualified as HTTP

class
( Backend b,
Expand Down Expand Up @@ -65,6 +66,7 @@ class
BackendSourceKind b ->
BackendConfig b ->
Env.Environment ->
HTTP.Manager ->
m (Either QErr (SourceConfig b))

-- | Function that introspects a database for tables, columns, functions etc.
Expand Down

2 comments on commit 607497f

@DmitriyBrashevets
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sordina

Could you please tell how to set the timeout for DC Agents?

In particular I'm interested in setting of the timeout for clickhouse-data connector https://github.com/hasura/clickhouse_gdc_v2

I'm facing with this error when I'm trying to replace metadata object which has more than ~2750 tables per connection :

{
    "error": "cannot continue due to inconsistent metadata",
    "path": "$.args",
    "code": "unexpected",
    "internal": [
        {
            "definition": "${clickhouseDatabaseName}",
            "name": "source ${clickhouseDatabaseName}",
            "reason": "Inconsistent object: Error communicating with data connector agent: No response data received",
            "type": "source"
        }
    ]
}

Here is how the HASURA_GRAPHQL_METADATA_DEFAULTS looks like:

- name: HASURA_GRAPHQL_METADATA_DEFAULTS
          value: >-
            {"backend_configs":{"dataconnector":{"clickhouse":{"uri":"http://hasura-clickhouse-data-connector:8080"}}}}

I'm using these images:

image:
    repository: hasura/graphql-engine
    tag: v2.37.0
    
image:
    repository: hasura/clickhouse-data-connector
    tag: v2.36.0

@sordina
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @DmitriyBrashevets

Does this solve it for you?

https://hasura.io/docs/latest/databases/database-config/data-connector-config/

            "DataConnectorConnSourceConfig": {
                "properties": {
                    "template": {
                        "type": "string"
                    },
                    "template_variables": {
                        "additionalProperties": {
                            "$ref": "#/components/schemas/TemplateVariableSource"
                        },
                        "type": "object"
                    },
                    "timeout": {
                        "oneOf": [
                            {
                                "$ref": "#/components/schemas/DataConnectorSourceTimeoutSeconds"
                            },
                            {
                                "$ref": "#/components/schemas/DataConnectorSourceTimeoutMilliseconds"
                            },
                            {
                                "$ref": "#/components/schemas/DataConnectorSourceTimeoutMicroseconds"
                            }
                        ]
                    },
                    "value": {
                        "$ref": "#/components/schemas/Config"
                    }
                },
                "required": [
                    "value"
                ],
                "type": "object"
            },

Please sign in to comment.