Skip to content

Commit

Permalink
hserver: fix infinite block when unacked records > maxUnackedRecords (
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Feb 10, 2023
1 parent 783c413 commit d18d3fa
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions hstream/src/HStream/Server/Core/Subscription.hs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ sendRecords ServerContext{..} subState subCtx@SubscribeContext {..} = do
checkAvailable subShardContexts
checkAvailable subConsumerContexts
atomically $ do
assignShards subAssignment
assignWaitingConsumers subAssignment

addRead subLdCkpReader subAssignment subStartOffsets
atomically checkUnackedRecords
recordBatches <- readRecordBatches
Expand All @@ -457,14 +457,19 @@ sendRecords ServerContext{..} subState subCtx@SubscribeContext {..} = do
if isFirstSend
then do
writeIORef isFirstSendRef False
-- Note: automically kill child thread when the parent thread is killed
tid <- forkIO $ forever $ do
tid1 <- forkIO . forever $ do
threadDelay (100 * 1000)
updateClockAndDoResend

tid2 <- forkIO . forever $ do
threadDelay (100 * 1000)
atomically $ assignShards subAssignment

-- FIXME: the same code
successSendRecords <- sendReceivedRecordsVecs receivedRecordsVecs
atomically $ addUnackedRecords subCtx successSendRecords
loop isFirstSendRef `onException` killThread tid
-- Note: automically kill child threads when the parent thread is killed
loop isFirstSendRef `onException` (killThread tid1 >> killThread tid2)
else do
-- FIXME: the same code
successSendRecords <- sendReceivedRecordsVecs receivedRecordsVecs
Expand Down

0 comments on commit d18d3fa

Please sign in to comment.