Skip to content

Commit

Permalink
kafka: auto trim CheckpointedOffsetStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Apr 19, 2024
1 parent 405cc6a commit ca45dfa
Showing 1 changed file with 34 additions and 19 deletions.
53 changes: 34 additions & 19 deletions hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
module HStream.Kafka.Group.OffsetsStore
( OffsetStorage(..)
, mkCkpOffsetStorage
)
where

import Control.Monad (unless)
import Data.Bifunctor (bimap)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import Data.Word (Word64)
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)
, deleteCkpOffsetStorage
) where

import Control.Monad (unless)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import Data.Word (Word64)

import HStream.Base.Timer (CompactedWorker, startCompactedWorker,
stopCompactedWorker,
triggerCompactedWorker)
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)

type LogID = Word64
type LSN = Word64
Expand All @@ -24,10 +27,11 @@ class OffsetStorage s where
--------------------------------------------------------------------------------

data CkpOffsetStorage = CkpOffsetStorage
{ ckpStore :: S.LDCheckpointStore
, ckpStoreName :: T.Text
, ckpStoreId :: Word64
{ ckpStore :: !S.LDCheckpointStore
, ckpStoreName :: !T.Text
, ckpStoreId :: !Word64
-- ^ __consumer_offsets logID
, trimCkpWorker :: !CompactedWorker
}

mkCkpOffsetStorage :: S.LDClient -> T.Text -> IO CkpOffsetStorage
Expand All @@ -38,13 +42,24 @@ mkCkpOffsetStorage client ckpStoreName = do
S.initOffsetCheckpointDir client logAttrs
ckpStoreId <- S.allocOffsetCheckpointId client cbGroupName
ckpStore <- S.newRSMBasedCheckpointStore client ckpStoreId 5000
Log.info $ "mkCkpOffsetStorage with name: " <> Log.build ckpStoreName <> ", storeId: " <> Log.build ckpStoreId
Log.info $ "mkCkpOffsetStorage with name: " <> Log.build ckpStoreName
<> ", storeId: " <> Log.build ckpStoreId
trimCkpWorker <- startCompactedWorker (60 * 1000000){- 60s -} $ do
Log.debug $ "Compacting checkpoint store of " <> Log.build ckpStoreName
S.trimLastBefore 1 client ckpStoreId
return CkpOffsetStorage{..}

-- FIXME: there may other resources(in memory or...) need to be released
deleteCkpOffsetStorage :: S.LDClient -> CkpOffsetStorage -> IO ()
deleteCkpOffsetStorage ldclient CkpOffsetStorage{..} = do
stopCompactedWorker trimCkpWorker
S.freeOffsetCheckpointId ldclient (textToCBytes ckpStoreName)

instance OffsetStorage CkpOffsetStorage where
commitOffsets CkpOffsetStorage{..} offsetsKey offsets = do
unless (Map.null offsets) $ do
S.ckpStoreUpdateMultiLSN ckpStore (textToCBytes offsetsKey) offsets
loadOffsets CkpOffsetStorage{..} offsetKey = do
checkpoints <- S.ckpStoreGetAllCheckpoints' ckpStore (textToCBytes offsetKey)
return . Map.fromList $ map (bimap fromIntegral fromIntegral) checkpoints
triggerCompactedWorker trimCkpWorker

loadOffsets CkpOffsetStorage{..} offsetKey =
Map.fromList <$> S.ckpStoreGetAllCheckpoints' ckpStore (textToCBytes offsetKey)

0 comments on commit ca45dfa

Please sign in to comment.