From d18d3fa09a088420c3595f17940f226bf5696b76 Mon Sep 17 00:00:00 2001 From: Commelina Date: Fri, 10 Feb 2023 11:27:25 +0800 Subject: [PATCH] hserver: fix infinite block when unacked records > `maxUnackedRecords` (#1259) --- hstream/src/HStream/Server/Core/Subscription.hs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/hstream/src/HStream/Server/Core/Subscription.hs b/hstream/src/HStream/Server/Core/Subscription.hs index b27868c94..2e9dcacd3 100644 --- a/hstream/src/HStream/Server/Core/Subscription.hs +++ b/hstream/src/HStream/Server/Core/Subscription.hs @@ -447,8 +447,8 @@ sendRecords ServerContext{..} subState subCtx@SubscribeContext {..} = do checkAvailable subShardContexts checkAvailable subConsumerContexts atomically $ do - assignShards subAssignment assignWaitingConsumers subAssignment + addRead subLdCkpReader subAssignment subStartOffsets atomically checkUnackedRecords recordBatches <- readRecordBatches @@ -457,14 +457,19 @@ sendRecords ServerContext{..} subState subCtx@SubscribeContext {..} = do if isFirstSend then do writeIORef isFirstSendRef False - -- Note: automically kill child thread when the parent thread is killed - tid <- forkIO $ forever $ do + tid1 <- forkIO . forever $ do threadDelay (100 * 1000) updateClockAndDoResend + + tid2 <- forkIO . forever $ do + threadDelay (100 * 1000) + atomically $ assignShards subAssignment + -- FIXME: the same code successSendRecords <- sendReceivedRecordsVecs receivedRecordsVecs atomically $ addUnackedRecords subCtx successSendRecords - loop isFirstSendRef `onException` killThread tid + -- Note: automically kill child threads when the parent thread is killed + loop isFirstSendRef `onException` (killThread tid1 >> killThread tid2) else do -- FIXME: the same code successSendRecords <- sendReceivedRecordsVecs receivedRecordsVecs