Skip to content

Commit

Permalink
feat(kafka): store and recover groups (#1683)
Browse files Browse the repository at this point in the history
  • Loading branch information
s12f committed Nov 16, 2023
1 parent d641377 commit b7e3cfb
Show file tree
Hide file tree
Showing 14 changed files with 603 additions and 192 deletions.
2 changes: 2 additions & 0 deletions common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module HStream.Common.Server.Lookup
-- * Internals
, lookupNode
, lookupNodePersist

, kafkaResourceMetaId
) where

import Control.Concurrent.STM
Expand Down
52 changes: 52 additions & 0 deletions common/server/HStream/Common/Server/MetaData.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ module HStream.Common.Server.MetaData
, clusterStartTimeId
, TaskAllocation (..), renderTaskAllocationsToTable

, GroupMetadataValue(..)
, MemberMetadataValue(..)

, initKafkaZkPaths
, initKafkaRqTables
, initKafkaFileTables
Expand All @@ -27,6 +30,9 @@ import System.FileLock (SharedExclusive (Exclusi
import Z.Data.CBytes (CBytes)
import ZooKeeper.Types (ZHandle)

import Data.Int (Int32)
import qualified Data.Text as T
import qualified Data.Vector as V
import HStream.Common.Server.MetaData.Values
import HStream.Exception (RQLiteTableAlreadyExists)
import HStream.MetaStore.FileUtils (Contents, createTables)
Expand Down Expand Up @@ -63,6 +69,49 @@ renderTaskAllocationsToTable relations =
rows = map (\TaskAllocation{..} -> [taskAllocationServerId]) relations
in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows]

-------------------- Group Metadata Value -------------------------
data GroupMetadataValue
= GroupMetadataValue
{ groupId :: T.Text
, generationId :: Int32

-- protocol
, protocolType :: T.Text
, prototcolName :: Maybe T.Text

, leader :: Maybe T.Text
, members :: V.Vector MemberMetadataValue
} deriving (Show, Eq, Generic)

instance FromJSON GroupMetadataValue
instance ToJSON GroupMetadataValue

instance HasPath GroupMetadataValue ZHandle where
myRootPath = rootPath <> "/groups"

instance HasPath GroupMetadataValue RHandle where
myRootPath = "groups"

instance HasPath GroupMetadataValue FHandle where
myRootPath = "groups"


data MemberMetadataValue
= MemberMetadataValue
{ memberId :: T.Text
, clientId :: T.Text
, clientHost :: T.Text
, sessionTimeout :: Int32
, rebalanceTimeout :: Int32

-- base64
, subscription :: T.Text
, assignment :: T.Text
} deriving (Show, Eq, Generic)

instance FromJSON MemberMetadataValue
instance ToJSON MemberMetadataValue

-------------------------------------------------------------------------------

initializeZkPaths :: HasCallStack => ZHandle -> [CBytes] -> IO ()
Expand All @@ -89,16 +138,19 @@ kafkaZkPaths =
, textToCBytes kafkaRootPath
, textToCBytes $ myRootPath @Proto.Timestamp @ZHandle
, textToCBytes $ myRootPath @TaskAllocation @ZHandle
, textToCBytes $ myRootPath @GroupMetadataValue @ZHandle
]

kafkaRqTables :: [Text]
kafkaRqTables =
[ myRootPath @TaskAllocation @RHandle
, myRootPath @GroupMetadataValue @RHandle
]

kafkaFileTables :: [Text]
kafkaFileTables =
[ myRootPath @TaskAllocation @FHandle
, myRootPath @GroupMetadataValue @FHandle
]

initKafkaZkPaths :: HasCallStack => ZHandle -> IO ()
Expand Down
97 changes: 97 additions & 0 deletions common/server/HStream/Common/Server/TaskManager.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
{-# LANGUAGE OverloadedRecordDot #-}

module HStream.Common.Server.TaskManager where

import Control.Concurrent (forkIO)
import qualified Control.Concurrent as C
import qualified Control.Concurrent.STM as C
import qualified Control.Exception as E
import qualified Control.Monad as M
import Data.Int
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word (Word32)
import HStream.Common.ConsistentHashing (HashRing)
import HStream.Common.Server.Lookup (lookupNodePersist)
import qualified HStream.Exception as HE
import HStream.Gossip.Types (Epoch, GossipContext)
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaHandle)
import HStream.Server.HStreamApi (ServerNode (serverNodeId))

-------------------------------------------------------------------------------

-- e.g. IO.Worker, QueryManager, GroupCoordinator
class TaskManager tm where
resourceName :: tm -> T.Text
mkMetaId :: tm -> T.Text -> T.Text
listLocalTasks :: tm -> IO (Set.Set T.Text)

listAllTasks :: tm -> IO (V.Vector T.Text)
-- ^ typically list from metastore,
-- Nothing means metastore is unavailable

loadTaskAsync :: tm -> T.Text -> IO ()
unloadTaskAsync :: tm -> T.Text -> IO ()

-- TODO: for Network Partition
-- unloadAllTasks :: tm -> IO ()

newtype TaskDetectorConfig
= TaskDetectorConfig
{ intervalMs :: Int32
}

-- Only run on a Single Thread
data TaskDetector
= forall tm. TaskManager tm => TaskDetector
{ managers :: V.Vector tm
, config :: TaskDetectorConfig

, metaHandle :: MetaHandle
, gossipContext :: GossipContext
, loadBalanceHashRing :: C.TVar (Epoch, HashRing)
, advertisedListenersKey :: Maybe T.Text
, serverID :: Word32
}

runTaskDetector :: TaskDetector -> IO ()
runTaskDetector td = do
tid <- forkIO loop
Log.info $ "running task detector on thread:" <> Log.buildString' tid
where
loop = do
detectTasks td
C.threadDelay $ (fromIntegral td.config.intervalMs) * 1000
loop

-- detect all tasks, and load/unload tasks in TaskManager
detectTasks :: TaskDetector -> IO ()
detectTasks TaskDetector{..} = do
V.forM_ managers $ \tm -> do
allTasks <- listAllTasks tm
localTasks <- listLocalTasks tm
-- TODO: lookup diff all tasks and instead of lookup a single task
V.forM_ allTasks $ \task -> do
(flip E.catches) (handleExceptions task) $ do
let metaId = mkMetaId tm task
loaded = Set.member task localTasks
lookupResult <- lookupNodePersist metaHandle gossipContext loadBalanceHashRing task metaId advertisedListenersKey
if (lookupResult.serverNodeId == serverID) then do
M.unless loaded $ do
Log.info $ "loading task, resourceName:" <> Log.build (resourceName tm)
<> ", taskId:" <> Log.build task
loadTaskAsync tm task
else do
M.when loaded $ do
Log.info $ "unloading task, resourceName:" <> Log.build (resourceName tm)
<> ", taskId:" <> Log.build task
unloadTaskAsync tm task
where handleExceptions task = [
E.Handler (\(_ :: HE.QueryAlreadyTerminated) -> return ())
, E.Handler (\(err :: E.SomeException) ->
Log.warning $ "detech and load/unload task failed, task:" <> Log.build task
<> ", reason:" <> Log.build (show err)
)
]
1 change: 1 addition & 0 deletions common/server/hstream-common-server.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ library
HStream.Common.Server.Lookup
HStream.Common.Server.MetaData
HStream.Common.Server.Shard
HStream.Common.Server.TaskManager
HStream.Common.Server.Types

other-modules: HStream.Common.Server.MetaData.Values
Expand Down
10 changes: 10 additions & 0 deletions hstream-kafka/HStream/Kafka/Common/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ module HStream.Kafka.Common.Utils where
import Control.Exception (throw)
import qualified Control.Monad as M
import qualified Control.Monad.ST as ST
import qualified Data.ByteString as BS
import qualified Data.ByteString.Base64 as Base64
import qualified Data.HashTable.IO as H
import qualified Data.HashTable.ST.Basic as HB
import qualified Data.IORef as IO
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Vector as V
import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException))
import qualified Kafka.Protocol.Encoding as K
Expand Down Expand Up @@ -70,3 +74,9 @@ unlessIORefEq :: (Eq a) => IO.IORef a -> a -> (a -> IO ()) -> IO ()
unlessIORefEq ioRefVal expected action = do
IO.readIORef ioRefVal >>= \val -> do
M.unless (expected == val) $ action val

encodeBase64 :: BS.ByteString -> T.Text
encodeBase64 = Base64.encodeBase64

decodeBase64 :: T.Text -> BS.ByteString
decodeBase64 = Base64.decodeBase64Lenient . T.encodeUtf8
Loading

0 comments on commit b7e3cfb

Please sign in to comment.