Skip to content

Commit

Permalink
Fix cli calculateShardId (#1553)
Browse files Browse the repository at this point in the history
* refactor: move ShardKey type to Common.Type

* fix calculateShardId
  • Loading branch information
YangKian committed Aug 11, 2023
1 parent 9bd3923 commit f1b1f93
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 83 deletions.
40 changes: 40 additions & 0 deletions common/hstream/HStream/Common/Types.hs
@@ -1,20 +1,34 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module HStream.Common.Types
( fromInternalServerNode
, fromInternalServerNodeWithKey
, getHStreamVersion
, ShardKey(..)
, hashShardKey
, keyToCBytes
, cBytesToKey
, devideKeySpace
) where

import Data.Foldable (foldl')
import qualified Data.Map.Strict as Map
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import qualified Data.Vector as V

import qualified Crypto.Hash as CH
import Data.Bits (Bits (..))
import qualified Data.ByteArray as BA
import Data.Hashable (Hashable)
import Data.List (iterate')
import Data.Maybe (fromMaybe)
import Data.Text.Encoding (encodeUtf8)
import qualified HStream.Logger as Log
import qualified HStream.Server.HStreamApi as A
import qualified HStream.Server.HStreamInternal as I
import HStream.Version (hstreamCommit, hstreamVersion)
import qualified Z.Data.CBytes as CB

-- | Simple convert internal ServerNode to client known ServerNode
fromInternalServerNode :: I.ServerNode -> A.ServerNode
Expand Down Expand Up @@ -53,3 +67,29 @@ getHStreamVersion = do
return ver
where
prefix = Text.singleton 'v'

newtype ShardKey = ShardKey Integer
deriving (Show, Eq, Ord, Integral, Real, Enum, Num, Hashable)

instance Bounded ShardKey where
minBound = ShardKey 0
maxBound = ShardKey ((1 `shiftL` 128) - 1)

hashShardKey :: Text -> ShardKey
hashShardKey key =
let w8KeyList = BA.unpack (CH.hash . encodeUtf8 $ key :: CH.Digest CH.MD5)
in ShardKey $ foldl' (\acc c -> (.|.) (acc `shiftL` 8) (fromIntegral c)) (0 :: Integer) w8KeyList

keyToCBytes :: ShardKey -> CB.CBytes
keyToCBytes (ShardKey key) = CB.pack . show $ key

cBytesToKey :: CB.CBytes -> ShardKey
cBytesToKey = ShardKey . read . CB.unpack

-- Devide the key space into N parts, return [(startKey, endKey)]
devideKeySpace :: Int -> [(ShardKey, ShardKey)]
devideKeySpace num =
let startKeys = take num $ iterate' (+cnt) minBound
cnt = maxBound @ShardKey `div` fromIntegral num
in zipWith (\idx s -> if idx == num then (s, maxBound) else (s, s + cnt - 1)) [1..] startKeys

2 changes: 2 additions & 0 deletions common/hstream/hstream-common.cabal
Expand Up @@ -132,6 +132,8 @@ library
, zlib
, zoovisitor
, zstd
, memory
, cryptonite

cxx-options: -std=c++17
cpp-options: -std=c++17
Expand Down
6 changes: 4 additions & 2 deletions hstream/app/client.hs
Expand Up @@ -70,7 +70,8 @@ import HStream.Client.Types (AppendArgs (..), CliCmd (..),
import HStream.Client.Utils (calculateShardId,
mkClientNormalRequest',
printResult)
import HStream.Common.Types (getHStreamVersion)
import HStream.Common.Types (getHStreamVersion,
hashShardKey)
import HStream.Server.HStreamApi (DescribeClusterResponse (..),
HStreamApi (..),
ServerNode (..),
Expand Down Expand Up @@ -210,7 +211,8 @@ hstreamStream connOpts@RefinedCliConnOpts{..} cmd = do
StreamCmdAppend AppendArgs{..} -> do
ctx <- initCliContext connOpts
shards <- fmap API.listShardsResponseShards . getServerResp =<< simpleExecute clientConfig (listShards appendStream)
case calculateShardId appendRecordKey (V.toList shards) of
let shardKey = hashShardKey appendRecordKey
case calculateShardId shardKey (V.toList shards) of
Just sid -> do
let payload = if isHRecord then map toHRecord appendRecord else appendRecord
executeWithLookupResource_ ctx (Resource ResShard (T.pack $ show sid))
Expand Down
1 change: 0 additions & 1 deletion hstream/hstream.cabal
Expand Up @@ -138,7 +138,6 @@ library
, bytestring
, containers
, cryptohash-md5
, cryptonite
, data-default
, deepseq
, diff-flow
Expand Down
20 changes: 8 additions & 12 deletions hstream/src/HStream/Client/SQL.hs
Expand Up @@ -19,15 +19,13 @@ import Control.Concurrent (forkFinally, myThreadId,
import Control.Exception (SomeException, handle, try)
import Control.Monad (forM_, forever, void, (>=>))
import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString.Lazy as BL
import Data.Char (toUpper)
import qualified Data.Map as M
import qualified Data.Text as T
import qualified Data.Vector as V
import Network.GRPC.HighLevel.Client (ClientRequest (..),
ClientResult (..))
import Network.GRPC.HighLevel.Generated (withGRPCClient)
import qualified Proto3.Suite as PB
import qualified System.Console.Haskeline as RL
import qualified System.Console.Haskeline.History as RL
import Text.RawString.QQ (r)
Expand All @@ -36,17 +34,13 @@ import qualified Data.Aeson.Text as J
import qualified Data.Text.Lazy as TL
import HStream.Client.Action (createConnector,
createStream,
createStreamBySelect,
createStreamBySelectWithCustomQueryName,
dropAction, executeViewQuery,
insertIntoStream,
insertIntoStream',
listShards, pauseConnector,
pauseQuery, resumeConnector,
resumeQuery, retry,
terminateQuery)
insertIntoStream, listShards,
pauseConnector, pauseQuery,
resumeConnector, resumeQuery,
retry, terminateQuery)
import HStream.Client.Execute (execute, executeShowPlan,
executeWithLookupResource,
executeWithLookupResource_,
execute_, updateClusterInfo)
import HStream.Client.Internal (cliFetch)
Expand All @@ -55,6 +49,7 @@ import HStream.Client.Types (HStreamCliContext (..),
Resource (..))
import HStream.Client.Utils (calculateShardId,
dropPlanToResType)
import HStream.Common.Types (hashShardKey)
import HStream.Server.HStreamApi (CommandQuery (..),
CommandQueryResponse (..),
HStreamApi (..),
Expand All @@ -76,7 +71,7 @@ import HStream.SQL.Exception (SomeSQLException,
import HStream.Utils (HStreamClientApi,
ResourceType (..),
formatCommandQueryResponse,
formatResult, getServerResp,
formatResult,
mkGRPCClientConfWithSSL,
newRandomText)

Expand Down Expand Up @@ -149,7 +144,8 @@ commandExec HStreamSqlContext{hstreamCliContext = cliCtx@HStreamCliContext{..},.
result <- execute cliCtx $ listShards sName
case result of
Just (API.ListShardsResponse shards) -> do
case calculateShardId "" (V.toList shards) of
let shardKey = hashShardKey ""
case calculateShardId shardKey (V.toList shards) of
Nothing -> putStrLn "Failed to calculate shard id"
Just sid -> executeWithLookupResource_ cliCtx (Resource ResShard (T.pack $ show sid)) (retry retryLimit retryInterval $ insertIntoStream sName sid (insertType == JsonFormat) payload)
Nothing -> putStrLn "No shards found"
Expand Down
26 changes: 9 additions & 17 deletions hstream/src/HStream/Client/Utils.hs
Expand Up @@ -21,14 +21,11 @@ module HStream.Client.Utils

import Control.Concurrent (threadDelay)
import Control.Exception (finally)
import Crypto.Hash.MD5 (hash)
import qualified Data.ByteString as BS
import Data.Char (toUpper)
import Data.Functor ((<&>))
import Data.Int (Int32)
import Data.String (IsString)
import qualified Data.Text as T
import qualified Data.Text.Encoding as BS
import Data.Word (Word64)
import Network.GRPC.HighLevel.Client
import Network.GRPC.HighLevel.Generated (withGRPCClient)
Expand All @@ -41,13 +38,15 @@ import System.Posix (Handler (Catch),

import HStream.Base (genUnique)
import HStream.Client.Types (Resource (..))
import HStream.Common.Types (ShardKey, cBytesToKey)
import qualified HStream.Server.HStreamApi as API
import HStream.SQL (DropObject (..))
import HStream.Utils (Format (formatResult),
ResourceType (..),
SocketAddr (..),
mkClientNormalRequest,
mkGRPCClientConfWithSSL)
mkGRPCClientConfWithSSL,
textToCBytes)

terminateMsg :: IsString a => a
terminateMsg = "\x1b[32mTerminated\x1b[0m"
Expand Down Expand Up @@ -99,20 +98,13 @@ waitForServerToStart t addr clientSSLConfig = withGRPCClient (mkGRPCClientConfWi
if newTimeout <= 0 then return Nothing
else loop newTimeout api

calculateShardId :: T.Text -> [API.Shard] -> Maybe Word64
calculateShardId :: ShardKey -> [API.Shard] -> Maybe Word64
calculateShardId key (API.Shard{..}:ss) =
case (compareHash result start, compareHash result end) of
(GT, LT) -> Just shardShardId
(EQ, _) -> Just shardShardId
(_, EQ) -> Just shardShardId
_ -> calculateShardId key ss
where
compareHash x y = if BS.length x == BS.length y
then x `compare` y
else BS.length x `compare` BS.length y
start = BS.encodeUtf8 shardStartHashRangeKey
end = BS.encodeUtf8 shardEndHashRangeKey
result = hash (BS.encodeUtf8 key)
let startKey = cBytesToKey . textToCBytes $ shardStartHashRangeKey
endKey = cBytesToKey . textToCBytes $ shardEndHashRangeKey
in if key >= startKey && key <= endKey
then Just shardShardId
else calculateShardId key ss
calculateShardId _ _ = Nothing

dropPlanToResType :: DropObject -> Resource
Expand Down
4 changes: 2 additions & 2 deletions hstream/src/HStream/Server/Core/ShardReader.hs
Expand Up @@ -37,15 +37,15 @@ import qualified Data.Vector as V
import Data.Word (Word64)
import GHC.Stack (HasCallStack)
import HsGrpc.Server (whileM)
import HStream.Common.Types
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as M
import HStream.Server.Core.Common (decodeRecordBatch)
import HStream.Server.HStreamApi (CreateShardReaderRequest (..))
import qualified HStream.Server.HStreamApi as API
import qualified HStream.Server.MetaData as P
import HStream.Server.Shard (cBytesToKey, hashShardKey,
shardEndKey, shardStartKey)
import HStream.Server.Shard (shardEndKey, shardStartKey)
import HStream.Server.Types (BiStreamReader (..),
BiStreamReaderReceiver,
BiStreamReaderSender,
Expand Down
8 changes: 3 additions & 5 deletions hstream/src/HStream/Server/Core/Stream.hs
Expand Up @@ -42,16 +42,14 @@ import qualified Z.Data.CBytes as CB
import qualified ZooKeeper.Exception as ZK

import HStream.Base.Time (getSystemNsTimestamp)
import HStream.Common.Types
import qualified HStream.Common.ZookeeperSlotAlloc as Slot
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.Server.HStreamApi as API
import qualified HStream.Server.MetaData as P
import HStream.Server.Shard (Shard (..), createShard,
devideKeySpace,
mkShardAttrs,
mkShardWithDefaultId,
mkSharedShardMapWithShards)
import HStream.Server.Shard (createShard, mkShardAttrs,
mkShardWithDefaultId)
import HStream.Server.Types (ServerContext (..),
ServerInternalOffset (..),
ToOffset (..),
Expand Down
4 changes: 2 additions & 2 deletions hstream/src/HStream/Server/HStore.hs
Expand Up @@ -35,14 +35,14 @@ import Proto3.Suite (Enumerated (..))
import qualified Proto3.Suite as PB
import Z.Data.Vector (Bytes)

import HStream.Common.Types
import HStream.Exception (StreamNotFound (..),
WrongOffset (..))
import qualified HStream.Logger as Log
import qualified HStream.Server.Core.Stream as Core
import qualified HStream.Server.Core.Subscription as Core
import qualified HStream.Server.HStreamApi as API
import HStream.Server.Shard (cBytesToKey, hashShardKey,
shardStartKey)
import HStream.Server.Shard (shardStartKey)
import HStream.Server.Types
import qualified HStream.Store as S
import HStream.Utils
Expand Down
39 changes: 2 additions & 37 deletions hstream/src/HStream/Server/Shard.hs
Expand Up @@ -4,13 +4,11 @@

module HStream.Server.Shard
( Shard (..)
, ShardKey (..)
, mkShard
, mkShardWithDefaultId
, splitShardByKey
, halfSplit
, mergeShard
, devideKeySpace
, createShard
, mkShardAttrs

Expand All @@ -32,9 +30,6 @@ module HStream.Server.Shard
, splitHalf
, mergeTwoShard

, hashShardKey
, keyToCBytes
, cBytesToKey
, shardStartKey
, shardEndKey
, shardEpoch
Expand All @@ -43,54 +38,24 @@ module HStream.Server.Shard
import Control.Concurrent.STM (STM, TMVar, atomically, newTMVarIO,
putTMVar, readTMVar, swapTMVar,
takeTMVar)
import Control.Exception (Exception (fromException, toException),
SomeException, bracket, throwIO)
import qualified Crypto.Hash as CH
import Control.Exception (bracket, throwIO)
import Data.Bits (shiftL, shiftR, (.|.))
import qualified Data.ByteArray as BA
import Data.Foldable (foldl', forM_)
import Data.Hashable (Hashable (hash))
import Data.List (iterate')
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
import Data.Typeable (cast)
import Data.Vector (Vector)
import qualified Data.Vector as V
import Data.Word (Word32, Word64)
import qualified Z.Data.CBytes as CB

import HStream.Common.Types
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.Store as S

newtype ShardKey = ShardKey Integer
deriving (Show, Eq, Ord, Integral, Real, Enum, Num, Hashable)

instance Bounded ShardKey where
minBound = ShardKey 0
maxBound = ShardKey ((1 `shiftL` 128) - 1)

hashShardKey :: T.Text -> ShardKey
hashShardKey key =
let w8KeyList = BA.unpack (CH.hash . encodeUtf8 $ key :: CH.Digest CH.MD5)
in ShardKey $ foldl' (\acc c -> (.|.) (acc `shiftL` 8) (fromIntegral c)) (0 :: Integer) w8KeyList

keyToCBytes :: ShardKey -> CB.CBytes
keyToCBytes (ShardKey key) = CB.pack . show $ key

cBytesToKey :: CB.CBytes -> ShardKey
cBytesToKey = ShardKey . read . CB.unpack

-- Devide the key space into N parts, return [(startKey, endKey)]
devideKeySpace :: Int -> [(ShardKey, ShardKey)]
devideKeySpace num =
let startKeys = take num $ iterate' (+cnt) minBound
cnt = maxBound @ShardKey `div` fromIntegral num
in zipWith (\idx s -> if idx == num then (s, maxBound) else (s, s + cnt - 1)) [1..] startKeys

---------------------------------------------------------------------------------------------------------------
---- Shard

Expand Down
2 changes: 1 addition & 1 deletion hstream/src/HStream/Server/Types.hs
Expand Up @@ -37,6 +37,7 @@ import Data.IORef (IORef)
import Data.Maybe (fromJust)
import HStream.Base.Timer (CompactedWorker)
import HStream.Common.ConsistentHashing (HashRing)
import HStream.Common.Types (ShardKey)
import qualified HStream.Exception as HE
import HStream.Gossip.Types (Epoch, GossipContext)
import qualified HStream.IO.Types as IO
Expand All @@ -45,7 +46,6 @@ import HStream.MetaStore.Types (MetaHandle)
import HStream.Server.Config
import qualified HStream.Server.ConnectorTypes as HCT
import qualified HStream.Server.HStreamApi as API
import HStream.Server.Shard (ShardKey, SharedShardMap)
import qualified HStream.Stats as Stats
import qualified HStream.Store as HS
import qualified HStream.Store as S
Expand Down
9 changes: 5 additions & 4 deletions hstream/test/HStream/ShardSpec.hs
Expand Up @@ -10,11 +10,12 @@ import Data.Foldable (foldl')
import qualified Data.Map.Strict as M
import Data.Maybe (fromJust)
import Data.Word (Word64)
import HStream.Common.Types
import qualified HStream.Logger as Log
import HStream.Server.Shard (Shard (..), ShardKey (ShardKey),
ShardMap, deleteShard, getShard,
getShardMapIdx, insertShard, mergeShard,
mkShard, mkShardMap, splitShardByKey)
import HStream.Server.Shard (Shard (..), ShardMap, deleteShard,
getShard, getShardMapIdx, insertShard,
mergeShard, mkShard, mkShardMap,
splitShardByKey)
import qualified HStream.Store as S
import Test.Hspec
import Test.Hspec.QuickCheck
Expand Down

0 comments on commit f1b1f93

Please sign in to comment.