Skip to content

Commit

Permalink
processing: fix a bug that may cause task recovering failure (#1392)
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Apr 28, 2023
1 parent 433fea1 commit f9c1f26
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 10 deletions.
1 change: 1 addition & 0 deletions hstream-processing/src/HStream/Processing/Connector.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ data SourceConnector = SourceConnector
data SourceConnectorWithoutCkp = SourceConnectorWithoutCkp
{ subscribeToStreamWithoutCkp :: StreamName -> API.SpecialOffset -> IO (),
unSubscribeToStreamWithoutCkp :: StreamName -> IO (),
isSubscribedToStreamWithoutCkp :: StreamName -> IO Bool,
-- readRecordsWithoutCkp :: StreamName -> IO [SourceRecord]
withReadRecordsWithoutCkp ::
StreamName ->
Expand Down
15 changes: 10 additions & 5 deletions hstream-processing/src/HStream/Processing/Processor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,19 @@ runTask statsHolder SourceConnectorWithoutCkp {..} sinkConnector taskBuilder@Tas
logOptions <- logOptionsHandle stderr True
withLogFunc logOptions $ \lf -> do
ctx <- buildTaskContext task lf changeLogger snapshotter
let offset = API.SpecialOffsetLATEST
forM_ sourceStreamNames (flip subscribeToStreamWithoutCkp offset)
forM_ sourceStreamNames $ \stream -> do
isSubscribedToStreamWithoutCkp stream >>= \case
True -> return ()
False -> subscribeToStreamWithoutCkp stream API.SpecialOffsetLATEST

chan <- newTChanIO

withAsync (forConcurrently_ sourceStreamNames (f chan connectorClosed)) $ \a ->
withAsync (g task ctx chan) $ \b -> do
waitEither_ a b `finally` forM_ sourceStreamNames unSubscribeToStreamWithoutCkp
withAsync (g task ctx chan) $ \b ->
waitEither_ a b `finally` do
forM_ sourceStreamNames (\stream -> do
isSubscribedToStreamWithoutCkp stream >>= \case
True -> unSubscribeToStreamWithoutCkp stream
False -> return () )
where
qid = getTaskName taskBuilder
f :: TChan ([SourceRecord], MVar ()) -> TVar Bool -> T.Text -> IO ()
Expand Down
1 change: 1 addition & 0 deletions hstream/src/HStream/Server/ConnectorTypes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ data SourceConnector = SourceConnector
data SourceConnectorWithoutCkp = SourceConnectorWithoutCkp
{ subscribeToStreamWithoutCkp :: StreamName -> API.SpecialOffset -> IO (),
unSubscribeToStreamWithoutCkp :: StreamName -> IO (),
isSubscribedToStreamWithoutCkp :: StreamName -> IO Bool,
--readRecordsWithoutCkp :: StreamName -> IO [SourceRecord]
withReadRecordsWithoutCkp :: StreamName
-> (BL.ByteString -> Maybe BL.ByteString)
Expand Down
4 changes: 4 additions & 0 deletions hstream/src/HStream/Server/Core/Subscription.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ import HStream.Utils (decompressBatchedRecord,

--------------------------------------------------------------------------------

checkSubscriptionExist :: ServerContext -> Text -> IO Bool
checkSubscriptionExist ServerContext{..} sid =
M.checkMetaExists @SubscriptionWrap sid metaHandle

listSubscriptions :: ServerContext -> IO (V.Vector Subscription)
listSubscriptions sc = CC.listSubscriptions sc Nothing

Expand Down
8 changes: 8 additions & 0 deletions hstream/src/HStream/Server/HStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ hstoreSourceConnectorWithoutCkp :: ServerContext -> T.Text -> TVar Bool -> Sourc
hstoreSourceConnectorWithoutCkp ctx consumerName consumerClosed = SourceConnectorWithoutCkp {
subscribeToStreamWithoutCkp = subscribeToHStoreStream' ctx consumerName,
unSubscribeToStreamWithoutCkp = unSubscribeToHStoreStream' ctx consumerName,
isSubscribedToStreamWithoutCkp = isSubscribedToHStoreStream' ctx consumerName,
withReadRecordsWithoutCkp = withReadRecordsFromHStore' ctx consumerName,
connectorClosed = consumerClosed
}
Expand Down Expand Up @@ -134,6 +135,13 @@ unSubscribeToHStoreStream' ctx consumerName streamName = do
}
Core.deleteSubscription ctx req

isSubscribedToHStoreStream' :: ServerContext
-> T.Text
-> HCT.StreamName
-> IO Bool
isSubscribedToHStoreStream' ctx consumerName streamName =
Core.checkSubscriptionExist ctx (hstoreSubscriptionPrefix <> streamName <> "_" <> consumerName)

dataRecordToSourceRecord :: S.LDClient -> Payload -> IO SourceRecord
dataRecordToSourceRecord ldclient Payload {..} = do
streamName <- S.streamName . fst <$> S.getStreamIdFromLogId ldclient pLogID
Expand Down
9 changes: 4 additions & 5 deletions hstream/src/HStream/Server/Handler/Subscription.hs
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,17 @@ checkSubscriptionExistHandler
:: ServerContext
-> ServerRequest 'Normal CheckSubscriptionExistRequest CheckSubscriptionExistResponse
-> IO (ServerResponse 'Normal CheckSubscriptionExistResponse)
checkSubscriptionExistHandler ServerContext {..} (ServerNormalRequest _metadata req@CheckSubscriptionExistRequest {..}) = defaultExceptionHandle $ do
checkSubscriptionExistHandler sc (ServerNormalRequest _metadata req@CheckSubscriptionExistRequest {..}) = defaultExceptionHandle $ do
Log.debug $ "Receive checkSubscriptionExistHandler request: " <> Log.buildString (show req)
let sid = checkSubscriptionExistRequestSubscriptionId
res <- M.checkMetaExists @SubscriptionWrap sid metaHandle
res <- Core.checkSubscriptionExist sc checkSubscriptionExistRequestSubscriptionId
returnResp . CheckSubscriptionExistResponse $ res

handleCheckSubscriptionExist
:: ServerContext
-> G.UnaryHandler CheckSubscriptionExistRequest CheckSubscriptionExistResponse
handleCheckSubscriptionExist ServerContext{..} _ req = catchDefaultEx $ do
handleCheckSubscriptionExist sc _ req = catchDefaultEx $ do
let sid = checkSubscriptionExistRequestSubscriptionId req
res <- M.checkMetaExists @SubscriptionWrap sid metaHandle
res <- Core.checkSubscriptionExist sc sid
pure $ CheckSubscriptionExistResponse res

-------------------------------------------------------------------------------
Expand Down

0 comments on commit f9c1f26

Please sign in to comment.