Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scdEnabled, localScdEnabled and stickyCopySets log attributes #1818

Merged
merged 2 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/hstream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ kafka:
# fetch-mode: 1 # TODO: Currently, only mode 1 is supported
# fetch-reader-timeout: 50 # 50ms, default timeout of each read, 0 means nonblocking
# fetch-maxlen: 1000 # default max size of each read
# scd-enabled: false # enable Single Copy Delivery mode, default is false
# local-scd-enabled: false
# sticky-copysets: false # enable sticky copyset, default is false

# Configuration for HStream Store
# The configuration for hstore is **Optional**. When the values are not provided,
Expand Down
3 changes: 3 additions & 0 deletions hstream-admin/store/HStream/Admin/Store/Command/Logs.hs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ printLogAttributes level LogAttributes{..} = do
emit $ _SHOW_ATTR(logSyncedCopies)
emit $ _SHOW_ATTR(logBacklogDuration)
emit $ _SHOW_ATTR(logReplicateAcross)
emit $ _SHOW_ATTR(logScdEnabled)
emit $ _SHOW_ATTR(logLocalScdEnabled)
emit $ _SHOW_ATTR(logStickyCopySets)
forM_ (Map.toList logAttrsExtras) $ \(k, v) ->
emit $ Just $ unpack k <> ": " <> unpack v
#undef _SHOW_ATTR
Expand Down
3 changes: 3 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ parseJSONToOptions CliOptions{..} obj = do
storageCfg <- nodeCfgObj .:? "storage" .!= mempty
fetchReaderTimeout <- storageCfg .:? "fetch-reader-timeout" .!= 50
fetchMaxLen <- storageCfg .:? "fetch-maxlen" .!= 1000
scdEnabled <- storageCfg .:? "scd-enabled" .!= False
localScdEnabled <- storageCfg .:? "local-scd-enabled" .!= False
stickyCopysets <- storageCfg .:? "sticky-copysets" .!= False
let _storage = StorageOptions{..}

-- SASL config
Expand Down
3 changes: 3 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Config/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ parseMetaStoreAddr t =
data StorageOptions = StorageOptions
{ fetchReaderTimeout :: Int
, fetchMaxLen :: Int
, scdEnabled :: Bool
, localScdEnabled :: Bool
, stickyCopysets :: Bool
} deriving (Show, Eq)

data ExperimentalFeature
Expand Down
5 changes: 5 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Core/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import GHC.Stack (HasCallStack)

import qualified HStream.Base.Time as BaseTime
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.Kafka.Server.Config.Types (ServerOpts (..),
StorageOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
Expand Down Expand Up @@ -53,6 +55,9 @@ createTopic ServerContext{..} name replicationFactor numPartitions configs = do
attrs = S.def { S.logReplicationFactor = S.defAttr1 replica
, S.logAttrsExtras = extraAttr
, S.logBacklogDuration = S.defAttr1 (getBacklogDuration topicConfigs)
, S.logScdEnabled = S.defAttr1 serverOpts._storage.scdEnabled
, S.logLocalScdEnabled = S.defAttr1 serverOpts._storage.localScdEnabled
, S.logStickyCopySets = S.defAttr1 serverOpts._storage.stickyCopysets
}
try (S.createStream scLDClient streamId attrs) >>= \case
Left (e :: SomeException)
Expand Down
47 changes: 37 additions & 10 deletions hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ data LogAttributes = LogAttributes
-- -- ^ Maximum amount of time to artificially delay delivery of newly written
-- -- records (increases delivery latency but improves server and client
-- -- performance), in milliseconds.
-- , logScdEnabled :: Attribute Bool
-- -- ^ Indicate whether or not the Single Copy Delivery optimization should be
-- -- used.
-- , logLocalScdEnabled :: Attribute Bool
-- -- ^ Indicate whether or not to use Local Single Copy Delivery. This is
-- -- ignored if scdEnabled_ is false.
-- , logStickyCopySets :: Attribute Bool
-- -- ^ True if copysets on this log should be "sticky". See docblock in
-- -- StickyCopySetManager.h
, logScdEnabled :: Attribute Bool
-- ^ Indicate whether or not the Single Copy Delivery optimization should be
-- used.
, logLocalScdEnabled :: Attribute Bool
-- ^ Indicate whether or not to use Local Single Copy Delivery. This is
-- ignored if scdEnabled_ is false.
, logStickyCopySets :: Attribute Bool
-- ^ True if copysets on this log should be "sticky". See docblock in
-- StickyCopySetManager.h
-- , logMutablePerEpochLogMetadataEnabled ::Attribute Bool
-- -- ^ If true, write mutable per-epoch metadata along with every data record.
-- , logSequencerAffinity :: Attribute (Maybe CBytes)
Expand Down Expand Up @@ -149,6 +149,9 @@ pokeLogAttributes LogAttributes{..} =
withAllocMaybePrim id _ARG(logSyncReplicationScope)
withPrimListPairUnsafe _MAYBE_LIST_PAIR_ARG(logReplicateAcross)
withAllocMaybePrim2 fromIntegral _MAYBE_ARG(logBacklogDuration)
withAllocMaybePrim bool2cbool _ARG(logScdEnabled)
withAllocMaybePrim bool2cbool _ARG(logLocalScdEnabled)
withAllocMaybePrim bool2cbool _ARG(logStickyCopySets)
withHsCBytesMapUnsafe logAttrsExtras $ \l ks vs -> do
#define _ARG_TO(name) name##' (attrInherited name)
#define _MAYBE_ARG_TO(name) name##_flag name##' (attrInherited name)
Expand All @@ -160,6 +163,9 @@ pokeLogAttributes LogAttributes{..} =
_ARG_TO(logSyncReplicationScope)
_MAYBE_LIST_PAIR_TO(logReplicateAcross)
_MAYBE_ARG_TO(logBacklogDuration)
_ARG_TO(logScdEnabled)
_ARG_TO(logLocalScdEnabled)
_ARG_TO(logStickyCopySets)
l ks vs
newForeignPtr free_log_attributes_fun i
#undef _ARG
Expand All @@ -183,14 +189,20 @@ peekLogAttributes ptr = do
, ( logSyncReplicationScope
, ( logReplicateAcross
, ( logBacklogDuration
, _))))))) <-
, ( logScdEnabled
, ( logLocalScdEnabled
, ( logStickyCopySets
, _)))))))))) <-
runPeek id $ \_ARG(replicationFactor) ->
runPeek id $ \_ARG(syncedCopies) ->
runPeek id $ \_ARG(maxWritesInFlight) ->
runPeek cbool2bool $ \_ARG(singleWriter) ->
runPeek NodeLocationScope $ \_ARG(syncReplicationScope) ->
runPeekMaybeListPair replicateAcross_size $ \_MAYBE_LIST_PAIR(replicateAcross) ->
runPeekMaybe id $ \_MAYBE_ARG(backlogDuration) ->
runPeek cbool2bool $ \_ARG(scdEnabled) ->
runPeek cbool2bool $ \_ARG(localScdEnabled) ->
runPeek cbool2bool $ \_ARG(stickyCopySets) ->
peek_log_attributes
ptr
_ARG(replicationFactor)
Expand All @@ -200,6 +212,9 @@ peekLogAttributes ptr = do
_ARG(syncReplicationScope)
_MAYBE_LIST_PAIR(replicateAcross)
_MAYBE_ARG(backlogDuration)
_ARG(scdEnabled)
_ARG(localScdEnabled)
_ARG(stickyCopySets)
logAttrsExtras <- peekLogAttributesExtras ptr
return LogAttributes{..}
#undef _ARG
Expand Down Expand Up @@ -244,6 +259,12 @@ foreign import ccall unsafe "hs_logdevice.h poke_log_attributes"
-- ^ logReplicateAcross
-> Bool -> Ptr CInt -> Bool
-- ^ logBacklogDuration
-> Ptr CBool -> Bool
-- ^ logScdEnabled
-> Ptr CBool -> Bool
-- ^ logLocalScdEnabled
-> Ptr CBool -> Bool
-- ^ logStickyCopySets
-> Int -> BAArray# Word8 -> BAArray# Word8
-- ^ extras
-> IO (Ptr LogDeviceLogAttributes)
Expand All @@ -265,6 +286,12 @@ foreign import ccall unsafe "hs_logdevice.h peek_log_attributes"
-- ^ logReplicateAcross
-> MBA# CBool -> MBA# CBool -> MBA# Int -> MBA# CBool
-- ^ logBacklogDuration
-> MBA# CBool -> MBA# CBool -> MBA# CBool
-- ^ logScdEnabled
-> MBA# CBool -> MBA# CBool -> MBA# CBool
-- ^ logLocalScdEnabled
-> MBA# CBool -> MBA# CBool -> MBA# CBool
-- ^ logStickyCopySets
-> IO ()

foreign import ccall unsafe "hs_logdevice.h free_log_attributes"
Expand Down
14 changes: 11 additions & 3 deletions hstream-store/cbits/logdevice/hs_log_attributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ poke_log_attributes(_ARG(int, replicationFactor), _ARG(int, syncedCopies),
syncReplicationScope),
_LIST_PAIR(replicateAcross,
facebook::logdevice::NodeLocationScope, HsInt),
_MAYBE_ARG(int, backlogDuration),
_MAYBE_ARG(int, backlogDuration), _ARG(bool, scdEnabled),
_ARG(bool, localScdEnabled), _ARG(bool, stickyCopySets),
//
HsInt extras_len, StgArrBytes** keys, StgArrBytes** vals) {
#undef _ARG
Expand Down Expand Up @@ -52,6 +53,9 @@ poke_log_attributes(_ARG(int, replicationFactor), _ARG(int, syncedCopies),
attrs = attrs.with_replicateAcross(rs);
}
ADD_MAYBE_ATTR(backlogDuration, std::chrono::seconds, std::chrono::seconds)
ADD_ATTR(scdEnabled)
ADD_ATTR(localScdEnabled)
ADD_ATTR(stickyCopySets)
#undef ADD_ATTR
#undef ADD_MAYBE_ATTR

Expand Down Expand Up @@ -79,7 +83,8 @@ void peek_log_attributes(
ARG(facebook::logdevice::NodeLocationScope, syncReplicationScope),
ARG_LIST_PAIR(replicateAcross, facebook::logdevice::NodeLocationScope,
HsInt),
ARG_MAYBE(HsInt, backlogDuration))
ARG_MAYBE(HsInt, backlogDuration), ARG(bool, scdEnabled),
ARG(bool, localScdEnabled), ARG(bool, stickyCopySets))
#undef ARG
#undef ARG_MAYBE
#undef ARG_LIST_PAIR
Expand All @@ -104,7 +109,7 @@ void peek_log_attributes(
*name##_inh = attrs->name().isInherited(); \
if (name##_len > 0 && attrs->name().hasValue()) { \
auto& val = attrs->name().value(); \
for (int i = 0; i < name##_len; i++) { \
for (int i = 0; i < name##_len; i++) { \
name##_keys[i] = val[i].first; \
name##_vals[i] = val[i].second; \
} \
Expand All @@ -117,6 +122,9 @@ void peek_log_attributes(
PEEK_LIST_PAIR(replicateAcross, facebook::logdevice::NodeLocationScope,
HsInt);
PEEK_MAYBE(backlogDuration, .count());
PEEK(scdEnabled);
PEEK(localScdEnabled);
PEEK(stickyCopySets);
#undef PEEK
#undef PEEK_MAYBE
#undef PEEK_LIST_PAIR
Expand Down
14 changes: 13 additions & 1 deletion hstream-store/test/HStream/Store/LogDeviceSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ logdirSpec :: Spec
logdirSpec = describe "LogDirectory" $ do
let attrs = S.def{ I.logReplicationFactor = I.defAttr1 1
, I.logBacklogDuration = I.defAttr1 (Just 60)
, I.logScdEnabled = I.defAttr1 False
, I.logLocalScdEnabled = I.defAttr1 True
, I.logStickyCopySets = I.defAttr1 False
, I.logAttrsExtras = Map.fromList [("A", "B")]
}

Expand Down Expand Up @@ -84,6 +87,9 @@ logdirSpec = describe "LogDirectory" $ do
I.logReplicationFactor attrs' `shouldBe` I.Attribute (Just 1) True
I.logBacklogDuration attrs' `shouldBe` I.Attribute (Just (Just 60)) True
Map.lookup "A" (I.logAttrsExtras attrs') `shouldBe` Just "B"
I.logScdEnabled attrs' `shouldBe` I.Attribute (Just False) True
I.logLocalScdEnabled attrs' `shouldBe` I.Attribute (Just True) True
I.logStickyCopySets attrs' `shouldBe` I.Attribute (Just False) True
I.syncLogsConfigVersion client =<< I.removeLogDirectory client dirname True

loggroupAround' :: SpecWith (CBytes, S.C_LogID) -> Spec
Expand All @@ -92,9 +98,12 @@ loggroupAround' =
, I.logBacklogDuration = I.defAttr1 (Just 60)
, I.logSingleWriter = I.defAttr1 True
, I.logSyncReplicationScope = I.defAttr1 S.NodeLocationScope_DATA_CENTER
, I.logScdEnabled = I.defAttr1 False
, I.logLocalScdEnabled = I.defAttr1 True
, I.logStickyCopySets = I.defAttr1 False
, I.logAttrsExtras = Map.fromList [("A", "B")]
}
logid = 104
logid = 105
logname = "LogDeviceSpec_LogGroupSpec"
in loggroupAround logid logname attrs

Expand All @@ -106,6 +115,9 @@ loggroupSpec = describe "LogGroup" $ loggroupAround' $ parallel $ do
I.logReplicationFactor attrs' `shouldBe` I.defAttr1 1
I.logBacklogDuration attrs' `shouldBe` I.defAttr1 (Just 60)
I.logSingleWriter attrs' `shouldBe` I.defAttr1 True
I.logScdEnabled attrs' `shouldBe` I.defAttr1 False
I.logLocalScdEnabled attrs' `shouldBe` I.defAttr1 True
I.logStickyCopySets attrs' `shouldBe` I.defAttr1 False
I.logSyncReplicationScope attrs' `shouldBe` I.defAttr1 S.NodeLocationScope_DATA_CENTER
Map.lookup "A" (I.logAttrsExtras attrs') `shouldBe` Just "B"

Expand Down
Loading