diff --git a/hstream/app/client.hs b/hstream/app/client.hs index d08d4e101..5ac12684d 100644 --- a/hstream/app/client.hs +++ b/hstream/app/client.hs @@ -9,14 +9,13 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# OPTIONS_GHC -Werror=incomplete-patterns #-} +{-# LANGUAGE NamedFieldPuns #-} module Main where import Control.Concurrent (threadDelay) import Control.Monad (when) import Data.Aeson as Aeson -import qualified Data.ByteString as BS -import qualified Data.ByteString.Lazy as BSL import qualified Data.Char as Char import qualified Data.List as L import Data.Maybe (mapMaybe, maybeToList) @@ -28,7 +27,6 @@ import Network.GRPC.HighLevel.Generated (ClientError (..), withGRPCClient) import qualified Options.Applicative as O import Proto3.Suite (def) -import qualified Proto3.Suite as PB import System.Exit (exitFailure) import System.Timeout (timeout) import Text.RawString.QQ (r) @@ -38,13 +36,13 @@ import HStream.Client.Action (createSubscription', deleteStream, deleteSubscription, getStream, getSubscription, - insertIntoStream', listShards, listStreams, listSubscriptions, readShard, readStream) import HStream.Client.Execute (executeWithLookupResource_, initCliContext, simpleExecute) +import HStream.Client.Internal (interactiveAppend) #ifdef HStreamEnableSchema import HStream.Client.SQLNew (commandExec, interactiveSQLApp) @@ -52,7 +50,8 @@ import HStream.Client.SQLNew (commandExec, import HStream.Client.SQL (commandExec, interactiveSQLApp) #endif -import HStream.Client.Types (AppendArgs (..), CliCmd (..), +import HStream.Client.Types (AppendContext (..), + AppendOpts (..), CliCmd (..), Command (..), HStreamCommand (..), HStreamInitOpts (..), @@ -65,13 +64,11 @@ import HStream.Client.Types (AppendArgs (..), CliCmd (..), Resource (..), StreamCommand (..), SubscriptionCommand (..), - cliCmdParser, + cliCmdParser, mkShardMap, refineCliConnOpts) -import HStream.Client.Utils (calculateShardId, - mkClientNormalRequest', +import HStream.Client.Utils (mkClientNormalRequest', printResult) -import HStream.Common.Types (getHStreamVersion, - hashShardKey) +import HStream.Common.Types (getHStreamVersion) import HStream.Server.HStreamApi (DescribeClusterResponse (..), HStreamApi (..), ServerNode (..), @@ -82,7 +79,6 @@ import HStream.Utils (ResourceType (..), fillWithJsonString', formatResult, getServerResp, handleGRPCIOError, - jsonObjectToStruct, newRandomText, pattern EnumPB) import qualified HStream.Utils.Aeson as AesonComp @@ -208,22 +204,19 @@ hstreamStream connOpts@RefinedCliConnOpts{..} cmd = do } ctx <- initCliContext connOpts executeWithLookupResource_ ctx (Resource ResShardReader (API.readStreamRequestReaderId req)) (readStream req) - StreamCmdAppend AppendArgs{..} -> do + StreamCmdAppend AppendOpts{..} -> do ctx <- initCliContext connOpts - shards <- fmap API.listShardsResponseShards . getServerResp =<< simpleExecute clientConfig (listShards appendStream) - let shardKey = hashShardKey appendRecordKey - case calculateShardId shardKey (V.toList shards) of - Just sid -> do - let payload = if isHRecord then map toHRecord appendRecord else appendRecord - executeWithLookupResource_ ctx (Resource ResShard (T.pack $ show sid)) - (insertIntoStream' appendStream sid isHRecord (V.fromList payload) appendCompressionType appendRecordKey) - Nothing -> errorWithoutStackTrace $ "Failed to calculate shardId with stream: " - <> show appendStream <> ", parition key: " <> show appendRecordKey - where - toHRecord payload = case Aeson.eitherDecode . BS.fromStrict $ payload of - Left e -> errorWithoutStackTrace $ "invalied HRecord: " <> show e - Right p -> BSL.toStrict . PB.toLazyByteString . jsonObjectToStruct $ p - + shards <- fmap API.listShardsResponseShards . getServerResp =<< simpleExecute clientConfig (listShards _appStream) + let shardMap = mkShardMap shards + let appendCtx = AppendContext + { cliCtx = ctx + , appStream = _appStream + , appKeySeparator = _appKeySeparator + , appRetryLimit = _appRetryLimit + , appRetryInterval = _appRetryInterval + , appShardMap = shardMap + } + interactiveAppend appendCtx hstreamSubscription :: RefinedCliConnOpts -> SubscriptionCommand -> IO () hstreamSubscription connOpts@RefinedCliConnOpts{..} = \case diff --git a/hstream/src/HStream/Client/Internal.hs b/hstream/src/HStream/Client/Internal.hs index ba8b25dd3..cd40d2ba4 100644 --- a/hstream/src/HStream/Client/Internal.hs +++ b/hstream/src/HStream/Client/Internal.hs @@ -7,32 +7,44 @@ module HStream.Client.Internal ( streamingFetch , cliFetch , cliFetch' + , interactiveAppend ) where import Control.Concurrent (threadDelay) import Control.Monad (void, when) +import Control.Monad.IO.Class +import Data.Aeson as Aeson +import qualified Data.ByteString as BS +import qualified Data.ByteString.Char8 as BC +import qualified Data.ByteString.Lazy as BSL import Data.IORef (IORef, newIORef, readIORef, writeIORef) import qualified Data.Text as T +import qualified Data.Text.Encoding as TE import qualified Data.Vector as V import Network.GRPC.HighLevel.Generated (ClientRequest (..)) import qualified Proto3.Suite as PB +import qualified System.Console.Haskeline as RL import Text.StringRandom (stringRandomIO) import HStream.Client.Action import HStream.Client.Execute -import HStream.Client.Types (HStreamCliContext, - Resource (..)) +import HStream.Client.Types (AppendContext (..), + HStreamCliContext (..), + Resource (..), + getShardIdByKey) import HStream.Client.Utils +import HStream.Common.Types (hashShardKey) import qualified HStream.Server.HStreamApi as API import HStream.SQL (DropObject (..)) import qualified HStream.ThirdParty.Protobuf as PB import HStream.Utils (ResourceType (..), + clientDefaultKey, decompressBatchedRecord, formatResult, getServerResp, + jsonObjectToStruct, newRandomText) - streamingFetch :: HStreamCliContext -> T.Text -> API.HStreamApi ClientRequest response -> IO () streamingFetch = streamingFetch' (putStr . formatResult @PB.Struct) False @@ -102,3 +114,41 @@ genRandomSinkStreamSQL sql = do randomName <- stringRandomIO "[a-zA-Z]{20}" let streamName = "cli_generated_stream_" <> randomName return (streamName, "CREATE STREAM " <> streamName <> " AS " <> sql) + +interactiveAppend :: AppendContext -> IO () +interactiveAppend AppendContext{..} = do + RL.runInputT settings loop + where + settings = RL.Settings RL.noCompletion Nothing False + + loop = RL.withInterrupt . RL.handleInterrupt loop $ do + RL.getInputLine "> " >>= \case + Nothing -> pure () + Just str -> do + let items = splitOn appKeySeparator (BC.pack str) + when (null items || length items > 2) $ + errorWithoutStackTrace "invalid input: specific multiple keys" + + let partitionKey = if length items == 1 then clientDefaultKey else TE.decodeUtf8 . head $ items + let record = if length items == 1 then head items else last items + let shardKey = hashShardKey partitionKey + case getShardIdByKey shardKey appShardMap of + Just sid -> do + let (isHRecord, payload) = toHRecord record + liftIO $ executeWithLookupResource_ cliCtx (Resource ResShard (T.pack $ show sid)) + (retry appRetryLimit appRetryInterval $ insertIntoStream' appStream sid isHRecord (V.fromList [payload]) API.CompressionTypeNone partitionKey) + loop + Nothing -> errorWithoutStackTrace $ "Failed to calculate shardId with stream: " + <> show appStream <> ", parition key: " <> show (head items) + + toHRecord payload = case Aeson.eitherDecode . BS.fromStrict $ payload of + Left _ -> (False, payload) + Right p -> (True, BSL.toStrict . PB.toLazyByteString . jsonObjectToStruct $ p) + + -- Break a ByteString into pieces separated by the first ByteString argument, consuming the delimiter + splitOn :: BS.ByteString -> BS.ByteString -> [BS.ByteString] + splitOn "" = error "delimiter shouldn't be empty." + splitOn delimiter = go + where + go s = let (pre, post) = BS.breakSubstring delimiter s + in pre : if BS.null post then [] else go (BS.drop (BS.length delimiter) post) diff --git a/hstream/src/HStream/Client/Types.hs b/hstream/src/HStream/Client/Types.hs index 262b49eeb..57a098956 100644 --- a/hstream/src/HStream/Client/Types.hs +++ b/hstream/src/HStream/Client/Types.hs @@ -7,20 +7,25 @@ import Control.Concurrent (MVar) import qualified Data.Attoparsec.Text as AP import Data.ByteString (ByteString) import qualified Data.ByteString as BS +import Data.Foldable (foldl') import Data.Functor (($>)) import Data.Int (Int64) +import qualified Data.Map as M import Data.Maybe (isNothing) import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T +import Data.Vector (Vector) import Data.Word (Word32, Word64) import HStream.Common.CliParsers (streamParser, subscriptionParser) +import HStream.Common.Types import qualified HStream.Server.HStreamApi as API import HStream.Server.Types (ServerID) +import qualified HStream.Store as S import HStream.Utils (ResourceType, SocketAddr (..), - clientDefaultKey, - mkGRPCClientConfWithSSL) + mkGRPCClientConfWithSSL, + textToCBytes) import Network.GRPC.HighLevel.Client (ClientConfig (..), ClientSSLConfig (..), ClientSSLKeyCertPair (..)) @@ -72,7 +77,7 @@ data StreamCommand | StreamCmdListShard Text | StreamCmdReadShard ReadShardArgs | StreamCmdReadStream ReadStreamArgs - | StreamCmdAppend AppendArgs + | StreamCmdAppend AppendOpts deriving (Show) streamCmdParser :: O.Parser StreamCommand @@ -88,7 +93,7 @@ streamCmdParser = O.hsubparser <> O.short 'f' <> O.help "Whether to enable force deletion" )) (O.progDesc "Delete a stream")) - <> O.command "append" (O.info (StreamCmdAppend <$> appendArgsParser) + <> O.command "append" (O.info (StreamCmdAppend <$> appendOptsParser) (O.progDesc "Append record into stream")) <> O.command "list-shard" (O.info (StreamCmdListShard <$> O.strArgument ( O.metavar "STREAM_NAME" @@ -100,14 +105,42 @@ streamCmdParser = O.hsubparser (O.progDesc "Read records from specific stream")) ) -data AppendArgs = AppendArgs - { appendStream :: T.Text - , appendRecordKey :: T.Text - , appendRecord :: [ByteString] - , appendCompressionType :: API.CompressionType - , isHRecord :: Bool +data AppendOpts = AppendOpts + { _appStream :: T.Text + , _appKeySeparator :: BS.ByteString + , _appRetryInterval :: Word32 + , _appRetryLimit :: Word32 } deriving (Show) +appendOptsParser :: O.Parser AppendOpts +appendOptsParser = AppendOpts + <$> O.strArgument ( O.metavar "STREAM_NAME" <> O.help "The stream you want to write to") + <*> O.option O.str (O.long "separator" <> O.metavar "String" <> O.showDefault <> O.value "@" <> O.help "Separator of key. e.g. key1@value") + <*> O.option O.auto (O.long "retry-interval" <> O.metavar "INT" <> O.showDefault <> O.value 5 <> O.help "Interval to retry request to server") + <*> O.option O.auto (O.long "retry-limit" <> O.metavar "INT" <> O.showDefault <> O.value 3 <> O.help "Maximum number of retries allowed") + +type ShardMap = M.Map ShardKey S.C_LogID + +mkShardMap :: Vector API.Shard -> ShardMap +mkShardMap = + foldl' + ( + \acc API.Shard{shardShardId=sId, shardStartHashRangeKey=startKey} -> + M.insert (cBytesToKey . textToCBytes $ startKey) sId acc + ) M.empty + +getShardIdByKey :: ShardKey -> ShardMap ->Maybe S.C_LogID +getShardIdByKey key mp = snd <$> M.lookupLE key mp + +data AppendContext = AppendContext + { cliCtx :: HStreamCliContext + , appStream :: T.Text + , appKeySeparator :: BS.ByteString + , appRetryInterval :: Word32 + , appRetryLimit :: Word32 + , appShardMap :: M.Map ShardKey S.C_LogID + } + instance Read API.CompressionType where readPrec = do l <- Read.lexP @@ -117,26 +150,6 @@ instance Read API.CompressionType where Read.Ident "zstd" -> return API.CompressionTypeZstd x -> errorWithoutStackTrace $ "cannot parse compression type: " <> show x -appendArgsParser :: O.Parser AppendArgs -appendArgsParser = AppendArgs - <$> O.strArgument ( O.metavar "STREAM_NAME" <> O.help "The stream you want to write to") - <*> O.strOption ( O.long "partition-key" - <> O.short 'k' - <> O.metavar "TEXT" - <> O.value clientDefaultKey - <> O.showDefault - <> O.help "Partition key of append record" - ) - <*> O.many (O.strOption ( O.long "payload" <> O.short 'p' <> O.help "Records you want to append")) - <*> O.option O.auto ( O.long "compression" - <> O.short 'o' - <> O.metavar "[none|gzip|zstd]" - <> O.value API.CompressionTypeNone - <> O.showDefault - <> O.help "Compresstion type" - ) - <*> O.switch ( O.long "json" <> O.short 'j' <> O.help "Is json record") - data ReadStreamArgs = ReadStreamArgs { readStreamStreamNameArgs :: T.Text , readStreamStartOffset :: Maybe API.StreamOffset