From 05db1f0bb51ca6e38e42facd0d4b84b8678b63d0 Mon Sep 17 00:00:00 2001 From: s12f <97083380+s12f@users.noreply.github.com> Date: Wed, 17 Jan 2024 16:35:42 +0800 Subject: [PATCH] feat(hstream-connector): add alterConnectorConfig (#1742) --- external/protocol | 2 +- hstream-io/HStream/IO/IOTask.hs | 5 +-- hstream-io/HStream/IO/Meta.hs | 6 +++ hstream-io/HStream/IO/Worker.hs | 43 ++++++++++++++++--- hstream/app/client.hs | 16 ++++++- hstream/src/HStream/Client/Action.hs | 7 +++ hstream/src/HStream/Client/Types.hs | 17 ++++++++ hstream/src/HStream/Server/Core/Query.hs | 2 +- hstream/src/HStream/Server/Handler.hs | 1 + .../src/HStream/Server/Handler/Connector.hs | 31 ++++++++++++- hstream/src/HStream/Server/HsGrpcHandler.hs | 1 + 11 files changed, 116 insertions(+), 15 deletions(-) diff --git a/external/protocol b/external/protocol index e8c9a3617..d28d55aa0 160000 --- a/external/protocol +++ b/external/protocol @@ -1 +1 @@ -Subproject commit e8c9a36175734506b3fb4233233763615c41f92e +Subproject commit d28d55aa0f90f7d30ca8236b1b5573b1e67971cf diff --git a/hstream-io/HStream/IO/IOTask.hs b/hstream-io/HStream/IO/IOTask.hs index 2ace6e810..186bc4e4f 100644 --- a/hstream-io/HStream/IO/IOTask.hs +++ b/hstream-io/HStream/IO/IOTask.hs @@ -149,8 +149,8 @@ startIOTask task = do runIOTask task return RUNNING -stopIOTask :: IOTask -> Bool -> Bool -> IO () -stopIOTask task@IOTask{..} ifIsRunning force = do +stopIOTask :: IOTask -> Bool -> IO () +stopIOTask task@IOTask{..} force = do updateStatus task $ \case RUNNING -> do if force @@ -168,7 +168,6 @@ stopIOTask task@IOTask{..} ifIsRunning force = do writeIORef process' Nothing return STOPPED s -> do - unless ifIsRunning $ throwIO (HE.ConnectorInvalidStatus $ ioTaskStatusToText s) return s killIOTask :: IOTask -> IO () diff --git a/hstream-io/HStream/IO/Meta.hs b/hstream-io/HStream/IO/Meta.hs index b00956baa..e2b689fca 100644 --- a/hstream-io/HStream/IO/Meta.hs +++ b/hstream-io/HStream/IO/Meta.hs @@ -1,6 +1,7 @@ {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TupleSections #-} @@ -9,6 +10,7 @@ module HStream.IO.Meta where import qualified Data.Text as T import GHC.Stack (HasCallStack) +import qualified Data.Aeson as J import HStream.IO.Types import qualified HStream.IO.Types as Types import HStream.MetaStore.Types (MetaHandle, MetaStore (..)) @@ -46,6 +48,10 @@ deleteIOTaskMeta h name = do deleteMeta @TaskIdMeta name Nothing h updateStatusInMeta h tid DELETED +updateConfig :: MetaHandle -> T.Text -> J.Object -> IO () +updateConfig h taskId cfg = do + updateMetaWith taskId (\(Just tm) -> tm {taskInfoMeta=tm.taskInfoMeta{connectorConfig = cfg}}) Nothing h + mapKvKey :: T.Text -> T.Text -> T.Text mapKvKey taskId key = taskId <> "_" <> key diff --git a/hstream-io/HStream/IO/Worker.hs b/hstream-io/HStream/IO/Worker.hs index c9b5253ea..55a149c5c 100644 --- a/hstream-io/HStream/IO/Worker.hs +++ b/hstream-io/HStream/IO/Worker.hs @@ -1,6 +1,7 @@ -{-# LANGUAGE DerivingStrategies #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} module HStream.IO.Worker where @@ -149,10 +150,10 @@ showIOTask_ worker@Worker{..} name = do listIOTasks :: Worker -> IO [API.Connector] listIOTasks Worker{..} = M.listIOTaskMeta workerHandle -stopIOTask :: Worker -> T.Text -> Bool -> Bool-> IO () -stopIOTask worker name ifIsRunning force = do +stopIOTask :: Worker -> T.Text -> Bool-> IO () +stopIOTask worker name force = do ioTask <- getIOTask_ worker name - IOTask.stopIOTask ioTask ifIsRunning force + IOTask.stopIOTask ioTask force -- startIOTask :: Worker -> T.Text -> IO () -- startIOTask worker name = do @@ -177,6 +178,34 @@ recoverTask worker@Worker{..} name = do let newConnCfg = J.insert "hstream" (J.toJSON hsConfig) connectorConfig createIOTaskFromTaskInfo worker taskId taskInfo{connectorConfig=newConnCfg} options True False False +-- update config and restart +alterConnectorConfig :: Worker -> T.Text -> T.Text -> IO () +alterConnectorConfig worker name config = do + updated <- updateConnectorConfig worker name config + when updated $ do + Log.info $ "updated connector config, connector:" <> Log.build name + E.catch + (stopIOTask worker name True) + (\(e :: E.SomeException) -> Log.warning $ "failed to stop io task:" <> Log.buildString (show e)) + Log.info $ "paused connector:" <> Log.build name + recoverTask worker name + Log.info $ "resumed connector:" <> Log.build name + +updateConnectorConfig :: Worker -> T.Text -> T.Text -> IO Bool +updateConnectorConfig worker name config = do + M.getIOTaskFromName worker.workerHandle name >>= \case + Nothing -> throwIO $ HE.ConnectorNotFound name + Just (taskId, TaskMeta{taskInfoMeta=TaskInfo{..}}) -> do + case J.decodeStrict $ T.encodeUtf8 config :: Maybe J.Object of + Nothing -> return False + Just overrided -> do + let mergeCfg (J.Object x) (J.Object y) = J.Object (J.union x y) + let newConnCfg = J.insertWith mergeCfg "connector" (J.toJSON overrided) connectorConfig + M.updateConfig worker.workerHandle taskId newConnCfg + Log.info $ "updated connector config, connector:" <> Log.build name + <> ", new config:" <> Log.buildString' newConnCfg + return True + getIOTask :: Worker -> T.Text -> IO (Maybe IOTask) getIOTask Worker{..} name = HM.lookup name <$> C.readMVar ioTasksM @@ -190,7 +219,7 @@ getIOTask_ Worker{..} name = do deleteIOTask :: Worker -> T.Text -> IO () deleteIOTask worker@Worker{..} taskName = do E.catch - (stopIOTask worker taskName True False) + (stopIOTask worker taskName True) (\(e :: E.SomeException) -> Log.info $ "try to stop io task:" <> Log.buildString (show e)) M.deleteIOTaskMeta workerHandle taskName C.modifyMVar_ ioTasksM $ return . HM.delete taskName diff --git a/hstream/app/client.hs b/hstream/app/client.hs index ccab93971..760ac90a8 100644 --- a/hstream/app/client.hs +++ b/hstream/app/client.hs @@ -21,6 +21,7 @@ import qualified Data.List as L import Data.Maybe (mapMaybe, maybeToList) import qualified Data.Text as T import qualified Data.Text.Encoding as T +import qualified Data.Text.IO as T import qualified Data.Vector as V import Network.GRPC.HighLevel.Generated (ClientError (..), ClientResult (..), @@ -33,7 +34,8 @@ import System.Timeout (timeout) import Text.RawString.QQ (r) import qualified HStream.Admin.Server.Command as Admin -import HStream.Client.Action (createSubscription', +import HStream.Client.Action (alterConnectorConfig, + createSubscription', deleteStream, deleteSubscription, getStream, getSubscription, @@ -54,6 +56,7 @@ import HStream.Client.SQL (commandExec, import HStream.Client.Types (AppendContext (..), AppendOpts (..), CliCmd (..), Command (..), + ConnectorCommand (..), HStreamCommand (..), HStreamInitOpts (..), HStreamNodes (..), @@ -106,6 +109,7 @@ runCommand (CliCmd HStreamCommand{..}) = do HStreamSql opts -> hstreamSQL rConnOpts opts HStreamStream opts -> hstreamStream rConnOpts opts HStreamSubscription opts -> hstreamSubscription rConnOpts opts + HStreamConnector opts -> hstreamConnector rConnOpts opts hstreamSQL :: RefinedCliConnOpts -> HStreamSqlOpts -> IO () hstreamSQL connOpt HStreamSqlOpts{_updateInterval = updateInterval, @@ -237,6 +241,16 @@ hstreamSubscription connOpts@RefinedCliConnOpts{..} = \case ctx <- initCliContext connOpts executeWithLookupResource_ ctx (Resource ResSubscription sid) (deleteSubscription sid bool) >>= printResult +hstreamConnector :: RefinedCliConnOpts -> ConnectorCommand -> IO () +hstreamConnector connOpts@RefinedCliConnOpts{..} = \case + ConnectorCmdAlterConfig name configJson configPath -> do + cfg <- case (configJson, configPath) of + (Just x, _) -> pure x + (_, Just x) -> T.readFile (T.unpack x) + _ -> error "connector config is required(--config-json or --config-path)" + ctx <- initCliContext connOpts + executeWithLookupResource_ ctx (Resource ResConnector name) (alterConnectorConfig name cfg) >>= printResult + getNodes :: RefinedCliConnOpts -> IO DescribeClusterResponse getNodes RefinedCliConnOpts{..} = withGRPCClient clientConfig $ \client -> do diff --git a/hstream/src/HStream/Client/Action.hs b/hstream/src/HStream/Client/Action.hs index c03457c12..5be10d824 100644 --- a/hstream/src/HStream/Client/Action.hs +++ b/hstream/src/HStream/Client/Action.hs @@ -31,6 +31,7 @@ module HStream.Client.Action , listConnectors , pauseConnector , resumeConnector + , alterConnectorConfig , listQueries , listViews @@ -168,6 +169,12 @@ createConnector name typ target cfg API.HStreamApi{..} = , API.createConnectorRequestTarget = target , API.createConnectorRequestConfig = cfg }) +alterConnectorConfig :: T.Text -> T.Text -> Action Empty +alterConnectorConfig name cfg API.HStreamApi{..} = + hstreamApiAlterConnectorConfig (mkClientNormalRequest' def + { API.alterConnectorConfigRequestName = name + , API.alterConnectorConfigRequestConfig = cfg }) + listShards :: T.Text -> Action API.ListShardsResponse listShards sName API.HStreamApi{..} = do hstreamApiListShards $ mkClientNormalRequest' def { diff --git a/hstream/src/HStream/Client/Types.hs b/hstream/src/HStream/Client/Types.hs index 932f6420c..507349684 100644 --- a/hstream/src/HStream/Client/Types.hs +++ b/hstream/src/HStream/Client/Types.hs @@ -56,6 +56,7 @@ data Command | HStreamInit HStreamInitOpts | HStreamStream StreamCommand | HStreamSubscription SubscriptionCommand + | HStreamConnector ConnectorCommand commandParser :: O.Parser HStreamCommand commandParser = HStreamCommand @@ -68,6 +69,7 @@ commandParser = HStreamCommand <> O.command "subscription" (O.info (HStreamSubscription <$> subscriptionCmdParser) (O.progDesc "Manage Subscriptions in HStreamDB (`sub` is an alias for this command)")) -- Also see: https://github.com/pcapriotti/optparse-applicative#command-groups <> O.command "sub" (O.info (HStreamSubscription <$> subscriptionCmdParser) (O.progDesc "Alias for the command `subscription`")) + <> O.command "connector" (O.info (HStreamConnector <$> connectorCmdParser) (O.progDesc "Manage Connector in HStreamDB")) ) data StreamCommand @@ -277,6 +279,21 @@ subscriptionCmdParser = O.hsubparser (O.progDesc "Delete a subscription")) ) +data ConnectorCommand + = ConnectorCmdAlterConfig Text (Maybe Text) (Maybe Text) + deriving (Show) + +connectorCmdParser :: O.Parser ConnectorCommand +connectorCmdParser = O.hsubparser + ( O.command "alter-config" (O.info (ConnectorCmdAlterConfig <$> O.strArgument ( O.metavar "CONNECTOR_NAME" + <> O.help "The Name of the connector") + <*> (O.optional . O.option O.str) (O.long "config-json" + <> O.metavar "STRING" <> O.help "connector config json string") + <*> (O.optional . O.option O.str) (O.long "config-path" + <> O.metavar "STRING" <> O.help "connector config file path")) + (O.progDesc "alter connector config")) + ) + data HStreamCliContext = HStreamCliContext { availableServers :: MVar [SocketAddr] , currentServer :: MVar SocketAddr diff --git a/hstream/src/HStream/Server/Core/Query.hs b/hstream/src/HStream/Server/Core/Query.hs index 131c48789..280d02ade 100644 --- a/hstream/src/HStream/Server/Core/Query.hs +++ b/hstream/src/HStream/Server/Core/Query.hs @@ -106,7 +106,7 @@ executeQuery sc@ServerContext{..} CommandQuery{..} = do #endif ExplainPlan plan -> pure $ API.CommandQueryResponse (mkVectorStruct plan "explain") PausePlan (PauseObjectConnector name) -> do - IO.stopIOTask scIOWorker name False False + IO.stopIOTask scIOWorker name False pure (CommandQueryResponse V.empty) ResumePlan (ResumeObjectConnector name) -> do IO.recoverTask scIOWorker name diff --git a/hstream/src/HStream/Server/Handler.hs b/hstream/src/HStream/Server/Handler.hs index eef902ef8..e2090d310 100644 --- a/hstream/src/HStream/Server/Handler.hs +++ b/hstream/src/HStream/Server/Handler.hs @@ -103,6 +103,7 @@ handlers serverContext@ServerContext{..} = hstreamApiDeleteConnector = deleteConnectorHandler serverContext, hstreamApiPauseConnector = pauseConnectorHandler serverContext, hstreamApiResumeConnector = resumeConnectorHandler serverContext, + hstreamApiAlterConnectorConfig = alterConnectorConfigHandler serverContext, -- View hstreamApiGetView = getViewHandler serverContext, diff --git a/hstream/src/HStream/Server/Handler/Connector.hs b/hstream/src/HStream/Server/Handler/Connector.hs index 1a600a7cc..092d69148 100644 --- a/hstream/src/HStream/Server/Handler/Connector.hs +++ b/hstream/src/HStream/Server/Handler/Connector.hs @@ -16,6 +16,7 @@ module HStream.Server.Handler.Connector , deleteConnectorHandler , resumeConnectorHandler , pauseConnectorHandler + , alterConnectorConfigHandler -- * For hs-grpc-server , handleCreateConnector , handleListConnectors @@ -25,6 +26,7 @@ module HStream.Server.Handler.Connector , handleDeleteConnector , handleResumeConnector , handlePauseConnector + , handleAlterConnectorConfig ) where import Control.Exception (throwIO) @@ -175,7 +177,7 @@ pauseConnectorHandler sc@ServerContext{..} ServerNode{..} <- lookupResource sc ResConnector pauseConnectorRequestName unless (serverNodeId == serverID) $ throwIO $ HE.WrongServer "Connector is bound to a different node" - IO.stopIOTask scIOWorker pauseConnectorRequestName False False + IO.stopIOTask scIOWorker pauseConnectorRequestName False returnResp Empty handlePauseConnector :: ServerContext -> G.UnaryHandler PauseConnectorRequest Empty @@ -183,7 +185,32 @@ handlePauseConnector sc@ServerContext{..} _ PauseConnectorRequest{..} = catchDef ServerNode{..} <- lookupResource sc ResConnector pauseConnectorRequestName unless (serverNodeId == serverID) $ throwIO $ HE.WrongServer "Connector is bound to a different node" - IO.stopIOTask scIOWorker pauseConnectorRequestName False False >> pure Empty + IO.stopIOTask scIOWorker pauseConnectorRequestName False >> pure Empty + +alterConnectorConfigHandler + :: ServerContext + -> ServerRequest 'Normal AlterConnectorConfigRequest Empty + -> IO (ServerResponse 'Normal Empty) +alterConnectorConfigHandler sc@ServerContext{..} + (ServerNormalRequest _metadata AlterConnectorConfigRequest{..}) = defaultExceptionHandle $ do + Log.info $ "Receive Alter Connector config Request. " + <> "Connector ID: " <> Log.build alterConnectorConfigRequestName + <> "Overrided Config: " <> Log.build alterConnectorConfigRequestConfig + ServerNode{..} <- lookupResource sc ResConnector alterConnectorConfigRequestName + unless (serverNodeId == serverID) $ + throwIO $ HE.WrongServer "Connector is bound to a different node" + IO.alterConnectorConfig scIOWorker alterConnectorConfigRequestName alterConnectorConfigRequestConfig + returnResp Empty + +handleAlterConnectorConfig :: ServerContext -> G.UnaryHandler AlterConnectorConfigRequest Empty +handleAlterConnectorConfig sc@ServerContext{..} _ AlterConnectorConfigRequest{..} = catchDefaultEx $ do + Log.info $ "Receive Alter Connector config Request. " + <> "Connector ID: " <> Log.build alterConnectorConfigRequestName + <> "Overrided Config: " <> Log.build alterConnectorConfigRequestConfig + ServerNode{..} <- lookupResource sc ResConnector alterConnectorConfigRequestName + unless (serverNodeId == serverID) $ + throwIO $ HE.WrongServer "Connector is bound to a different node" + IO.alterConnectorConfig scIOWorker alterConnectorConfigRequestName alterConnectorConfigRequestConfig >> pure Empty -- uncurry createIOTaskFromRequest :: ServerContext -> CreateConnectorRequest -> IO Connector diff --git a/hstream/src/HStream/Server/HsGrpcHandler.hs b/hstream/src/HStream/Server/HsGrpcHandler.hs index fa1d2f18a..40c094be5 100644 --- a/hstream/src/HStream/Server/HsGrpcHandler.hs +++ b/hstream/src/HStream/Server/HsGrpcHandler.hs @@ -80,6 +80,7 @@ handlers sc = , unary (GRPC :: GRPC P.HStreamApi "deleteConnector") (H.handleDeleteConnector sc) , unary (GRPC :: GRPC P.HStreamApi "resumeConnector") (H.handleResumeConnector sc) , unary (GRPC :: GRPC P.HStreamApi "pauseConnector") (H.handlePauseConnector sc) + , unary (GRPC :: GRPC P.HStreamApi "alterConnectorConfig") (H.handleAlterConnectorConfig sc) -- View , unary (GRPC :: GRPC P.HStreamApi "getView") (H.handleGetView sc) , unary (GRPC :: GRPC P.HStreamApi "listViews") (H.handleListView sc)