Skip to content

Commit

Permalink
feat(hstream-connector): add alterConnectorConfig (#1742)
Browse files Browse the repository at this point in the history
  • Loading branch information
s12f committed Jan 17, 2024
1 parent ccbdc17 commit 05db1f0
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 15 deletions.
2 changes: 1 addition & 1 deletion external/protocol
Submodule protocol updated 1 files
+7 −0 hstream.proto
5 changes: 2 additions & 3 deletions hstream-io/HStream/IO/IOTask.hs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ startIOTask task = do
runIOTask task
return RUNNING

stopIOTask :: IOTask -> Bool -> Bool -> IO ()
stopIOTask task@IOTask{..} ifIsRunning force = do
stopIOTask :: IOTask -> Bool -> IO ()
stopIOTask task@IOTask{..} force = do
updateStatus task $ \case
RUNNING -> do
if force
Expand All @@ -168,7 +168,6 @@ stopIOTask task@IOTask{..} ifIsRunning force = do
writeIORef process' Nothing
return STOPPED
s -> do
unless ifIsRunning $ throwIO (HE.ConnectorInvalidStatus $ ioTaskStatusToText s)
return s

killIOTask :: IOTask -> IO ()
Expand Down
6 changes: 6 additions & 0 deletions hstream-io/HStream/IO/Meta.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}

Expand All @@ -9,6 +10,7 @@ module HStream.IO.Meta where
import qualified Data.Text as T
import GHC.Stack (HasCallStack)

import qualified Data.Aeson as J
import HStream.IO.Types
import qualified HStream.IO.Types as Types
import HStream.MetaStore.Types (MetaHandle, MetaStore (..))
Expand Down Expand Up @@ -46,6 +48,10 @@ deleteIOTaskMeta h name = do
deleteMeta @TaskIdMeta name Nothing h
updateStatusInMeta h tid DELETED

updateConfig :: MetaHandle -> T.Text -> J.Object -> IO ()
updateConfig h taskId cfg = do
updateMetaWith taskId (\(Just tm) -> tm {taskInfoMeta=tm.taskInfoMeta{connectorConfig = cfg}}) Nothing h

mapKvKey :: T.Text -> T.Text -> T.Text
mapKvKey taskId key = taskId <> "_" <> key

Expand Down
43 changes: 36 additions & 7 deletions hstream-io/HStream/IO/Worker.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}

module HStream.IO.Worker where

Expand Down Expand Up @@ -149,10 +150,10 @@ showIOTask_ worker@Worker{..} name = do
listIOTasks :: Worker -> IO [API.Connector]
listIOTasks Worker{..} = M.listIOTaskMeta workerHandle

stopIOTask :: Worker -> T.Text -> Bool -> Bool-> IO ()
stopIOTask worker name ifIsRunning force = do
stopIOTask :: Worker -> T.Text -> Bool-> IO ()
stopIOTask worker name force = do
ioTask <- getIOTask_ worker name
IOTask.stopIOTask ioTask ifIsRunning force
IOTask.stopIOTask ioTask force

-- startIOTask :: Worker -> T.Text -> IO ()
-- startIOTask worker name = do
Expand All @@ -177,6 +178,34 @@ recoverTask worker@Worker{..} name = do
let newConnCfg = J.insert "hstream" (J.toJSON hsConfig) connectorConfig
createIOTaskFromTaskInfo worker taskId taskInfo{connectorConfig=newConnCfg} options True False False

-- update config and restart
alterConnectorConfig :: Worker -> T.Text -> T.Text -> IO ()
alterConnectorConfig worker name config = do
updated <- updateConnectorConfig worker name config
when updated $ do
Log.info $ "updated connector config, connector:" <> Log.build name
E.catch
(stopIOTask worker name True)
(\(e :: E.SomeException) -> Log.warning $ "failed to stop io task:" <> Log.buildString (show e))
Log.info $ "paused connector:" <> Log.build name
recoverTask worker name
Log.info $ "resumed connector:" <> Log.build name

updateConnectorConfig :: Worker -> T.Text -> T.Text -> IO Bool
updateConnectorConfig worker name config = do
M.getIOTaskFromName worker.workerHandle name >>= \case
Nothing -> throwIO $ HE.ConnectorNotFound name
Just (taskId, TaskMeta{taskInfoMeta=TaskInfo{..}}) -> do
case J.decodeStrict $ T.encodeUtf8 config :: Maybe J.Object of
Nothing -> return False
Just overrided -> do
let mergeCfg (J.Object x) (J.Object y) = J.Object (J.union x y)
let newConnCfg = J.insertWith mergeCfg "connector" (J.toJSON overrided) connectorConfig
M.updateConfig worker.workerHandle taskId newConnCfg
Log.info $ "updated connector config, connector:" <> Log.build name
<> ", new config:" <> Log.buildString' newConnCfg
return True

getIOTask :: Worker -> T.Text -> IO (Maybe IOTask)
getIOTask Worker{..} name = HM.lookup name <$> C.readMVar ioTasksM

Expand All @@ -190,7 +219,7 @@ getIOTask_ Worker{..} name = do
deleteIOTask :: Worker -> T.Text -> IO ()
deleteIOTask worker@Worker{..} taskName = do
E.catch
(stopIOTask worker taskName True False)
(stopIOTask worker taskName True)
(\(e :: E.SomeException) -> Log.info $ "try to stop io task:" <> Log.buildString (show e))
M.deleteIOTaskMeta workerHandle taskName
C.modifyMVar_ ioTasksM $ return . HM.delete taskName
Expand Down
16 changes: 15 additions & 1 deletion hstream/app/client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import qualified Data.List as L
import Data.Maybe (mapMaybe, maybeToList)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Text.IO as T
import qualified Data.Vector as V
import Network.GRPC.HighLevel.Generated (ClientError (..),
ClientResult (..),
Expand All @@ -33,7 +34,8 @@ import System.Timeout (timeout)
import Text.RawString.QQ (r)

import qualified HStream.Admin.Server.Command as Admin
import HStream.Client.Action (createSubscription',
import HStream.Client.Action (alterConnectorConfig,
createSubscription',
deleteStream,
deleteSubscription,
getStream, getSubscription,
Expand All @@ -54,6 +56,7 @@ import HStream.Client.SQL (commandExec,
import HStream.Client.Types (AppendContext (..),
AppendOpts (..), CliCmd (..),
Command (..),
ConnectorCommand (..),
HStreamCommand (..),
HStreamInitOpts (..),
HStreamNodes (..),
Expand Down Expand Up @@ -106,6 +109,7 @@ runCommand (CliCmd HStreamCommand{..}) = do
HStreamSql opts -> hstreamSQL rConnOpts opts
HStreamStream opts -> hstreamStream rConnOpts opts
HStreamSubscription opts -> hstreamSubscription rConnOpts opts
HStreamConnector opts -> hstreamConnector rConnOpts opts

hstreamSQL :: RefinedCliConnOpts -> HStreamSqlOpts -> IO ()
hstreamSQL connOpt HStreamSqlOpts{_updateInterval = updateInterval,
Expand Down Expand Up @@ -237,6 +241,16 @@ hstreamSubscription connOpts@RefinedCliConnOpts{..} = \case
ctx <- initCliContext connOpts
executeWithLookupResource_ ctx (Resource ResSubscription sid) (deleteSubscription sid bool) >>= printResult

hstreamConnector :: RefinedCliConnOpts -> ConnectorCommand -> IO ()
hstreamConnector connOpts@RefinedCliConnOpts{..} = \case
ConnectorCmdAlterConfig name configJson configPath -> do
cfg <- case (configJson, configPath) of
(Just x, _) -> pure x
(_, Just x) -> T.readFile (T.unpack x)
_ -> error "connector config is required(--config-json or --config-path)"
ctx <- initCliContext connOpts
executeWithLookupResource_ ctx (Resource ResConnector name) (alterConnectorConfig name cfg) >>= printResult

getNodes :: RefinedCliConnOpts -> IO DescribeClusterResponse
getNodes RefinedCliConnOpts{..} =
withGRPCClient clientConfig $ \client -> do
Expand Down
7 changes: 7 additions & 0 deletions hstream/src/HStream/Client/Action.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module HStream.Client.Action
, listConnectors
, pauseConnector
, resumeConnector
, alterConnectorConfig

, listQueries
, listViews
Expand Down Expand Up @@ -168,6 +169,12 @@ createConnector name typ target cfg API.HStreamApi{..} =
, API.createConnectorRequestTarget = target
, API.createConnectorRequestConfig = cfg })

alterConnectorConfig :: T.Text -> T.Text -> Action Empty
alterConnectorConfig name cfg API.HStreamApi{..} =
hstreamApiAlterConnectorConfig (mkClientNormalRequest' def
{ API.alterConnectorConfigRequestName = name
, API.alterConnectorConfigRequestConfig = cfg })

listShards :: T.Text -> Action API.ListShardsResponse
listShards sName API.HStreamApi{..} = do
hstreamApiListShards $ mkClientNormalRequest' def {
Expand Down
17 changes: 17 additions & 0 deletions hstream/src/HStream/Client/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ data Command
| HStreamInit HStreamInitOpts
| HStreamStream StreamCommand
| HStreamSubscription SubscriptionCommand
| HStreamConnector ConnectorCommand

commandParser :: O.Parser HStreamCommand
commandParser = HStreamCommand
Expand All @@ -68,6 +69,7 @@ commandParser = HStreamCommand
<> O.command "subscription" (O.info (HStreamSubscription <$> subscriptionCmdParser) (O.progDesc "Manage Subscriptions in HStreamDB (`sub` is an alias for this command)"))
-- Also see: https://github.com/pcapriotti/optparse-applicative#command-groups
<> O.command "sub" (O.info (HStreamSubscription <$> subscriptionCmdParser) (O.progDesc "Alias for the command `subscription`"))
<> O.command "connector" (O.info (HStreamConnector <$> connectorCmdParser) (O.progDesc "Manage Connector in HStreamDB"))
)

data StreamCommand
Expand Down Expand Up @@ -277,6 +279,21 @@ subscriptionCmdParser = O.hsubparser
(O.progDesc "Delete a subscription"))
)

data ConnectorCommand
= ConnectorCmdAlterConfig Text (Maybe Text) (Maybe Text)
deriving (Show)

connectorCmdParser :: O.Parser ConnectorCommand
connectorCmdParser = O.hsubparser
( O.command "alter-config" (O.info (ConnectorCmdAlterConfig <$> O.strArgument ( O.metavar "CONNECTOR_NAME"
<> O.help "The Name of the connector")
<*> (O.optional . O.option O.str) (O.long "config-json"
<> O.metavar "STRING" <> O.help "connector config json string")
<*> (O.optional . O.option O.str) (O.long "config-path"
<> O.metavar "STRING" <> O.help "connector config file path"))
(O.progDesc "alter connector config"))
)

data HStreamCliContext = HStreamCliContext
{ availableServers :: MVar [SocketAddr]
, currentServer :: MVar SocketAddr
Expand Down
2 changes: 1 addition & 1 deletion hstream/src/HStream/Server/Core/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ executeQuery sc@ServerContext{..} CommandQuery{..} = do
#endif
ExplainPlan plan -> pure $ API.CommandQueryResponse (mkVectorStruct plan "explain")
PausePlan (PauseObjectConnector name) -> do
IO.stopIOTask scIOWorker name False False
IO.stopIOTask scIOWorker name False
pure (CommandQueryResponse V.empty)
ResumePlan (ResumeObjectConnector name) -> do
IO.recoverTask scIOWorker name
Expand Down
1 change: 1 addition & 0 deletions hstream/src/HStream/Server/Handler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ handlers serverContext@ServerContext{..} =
hstreamApiDeleteConnector = deleteConnectorHandler serverContext,
hstreamApiPauseConnector = pauseConnectorHandler serverContext,
hstreamApiResumeConnector = resumeConnectorHandler serverContext,
hstreamApiAlterConnectorConfig = alterConnectorConfigHandler serverContext,

-- View
hstreamApiGetView = getViewHandler serverContext,
Expand Down
31 changes: 29 additions & 2 deletions hstream/src/HStream/Server/Handler/Connector.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module HStream.Server.Handler.Connector
, deleteConnectorHandler
, resumeConnectorHandler
, pauseConnectorHandler
, alterConnectorConfigHandler
-- * For hs-grpc-server
, handleCreateConnector
, handleListConnectors
Expand All @@ -25,6 +26,7 @@ module HStream.Server.Handler.Connector
, handleDeleteConnector
, handleResumeConnector
, handlePauseConnector
, handleAlterConnectorConfig
) where

import Control.Exception (throwIO)
Expand Down Expand Up @@ -175,15 +177,40 @@ pauseConnectorHandler sc@ServerContext{..}
ServerNode{..} <- lookupResource sc ResConnector pauseConnectorRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
IO.stopIOTask scIOWorker pauseConnectorRequestName False False
IO.stopIOTask scIOWorker pauseConnectorRequestName False
returnResp Empty

handlePauseConnector :: ServerContext -> G.UnaryHandler PauseConnectorRequest Empty
handlePauseConnector sc@ServerContext{..} _ PauseConnectorRequest{..} = catchDefaultEx $ do
ServerNode{..} <- lookupResource sc ResConnector pauseConnectorRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
IO.stopIOTask scIOWorker pauseConnectorRequestName False False >> pure Empty
IO.stopIOTask scIOWorker pauseConnectorRequestName False >> pure Empty

alterConnectorConfigHandler
:: ServerContext
-> ServerRequest 'Normal AlterConnectorConfigRequest Empty
-> IO (ServerResponse 'Normal Empty)
alterConnectorConfigHandler sc@ServerContext{..}
(ServerNormalRequest _metadata AlterConnectorConfigRequest{..}) = defaultExceptionHandle $ do
Log.info $ "Receive Alter Connector config Request. "
<> "Connector ID: " <> Log.build alterConnectorConfigRequestName
<> "Overrided Config: " <> Log.build alterConnectorConfigRequestConfig
ServerNode{..} <- lookupResource sc ResConnector alterConnectorConfigRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
IO.alterConnectorConfig scIOWorker alterConnectorConfigRequestName alterConnectorConfigRequestConfig
returnResp Empty

handleAlterConnectorConfig :: ServerContext -> G.UnaryHandler AlterConnectorConfigRequest Empty
handleAlterConnectorConfig sc@ServerContext{..} _ AlterConnectorConfigRequest{..} = catchDefaultEx $ do
Log.info $ "Receive Alter Connector config Request. "
<> "Connector ID: " <> Log.build alterConnectorConfigRequestName
<> "Overrided Config: " <> Log.build alterConnectorConfigRequestConfig
ServerNode{..} <- lookupResource sc ResConnector alterConnectorConfigRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
IO.alterConnectorConfig scIOWorker alterConnectorConfigRequestName alterConnectorConfigRequestConfig >> pure Empty

-- uncurry
createIOTaskFromRequest :: ServerContext -> CreateConnectorRequest -> IO Connector
Expand Down
1 change: 1 addition & 0 deletions hstream/src/HStream/Server/HsGrpcHandler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ handlers sc =
, unary (GRPC :: GRPC P.HStreamApi "deleteConnector") (H.handleDeleteConnector sc)
, unary (GRPC :: GRPC P.HStreamApi "resumeConnector") (H.handleResumeConnector sc)
, unary (GRPC :: GRPC P.HStreamApi "pauseConnector") (H.handlePauseConnector sc)
, unary (GRPC :: GRPC P.HStreamApi "alterConnectorConfig") (H.handleAlterConnectorConfig sc)
-- View
, unary (GRPC :: GRPC P.HStreamApi "getView") (H.handleGetView sc)
, unary (GRPC :: GRPC P.HStreamApi "listViews") (H.handleListView sc)
Expand Down

0 comments on commit 05db1f0

Please sign in to comment.