Skip to content

Commit

Permalink
fix: an error while executing trimShards should not interrupt the exe…
Browse files Browse the repository at this point in the history
…cution (#1707)
  • Loading branch information
YangKian committed Dec 7, 2023
1 parent fa52319 commit 6a3f6ad
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 19 deletions.
42 changes: 30 additions & 12 deletions hstream/src/HStream/Server/Core/Stream.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}

module HStream.Server.Core.Stream
( createStream
Expand All @@ -24,7 +26,9 @@ module HStream.Server.Core.Stream
) where

import Control.Concurrent (getNumCapabilities)
import Control.Exception (catch, throwIO)
import Control.Exception (Exception (displayException, fromException, toException),
SomeException, catch,
throwIO, try)
import Control.Monad (forM, forM_, unless, void,
when)
import qualified Data.Attoparsec.Text as AP
Expand All @@ -34,7 +38,7 @@ import Data.Either (partitionEithers)
import Data.Functor ((<&>))
import qualified Data.List as L
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe)
import Data.Maybe (fromJust, fromMaybe)
import qualified Data.Text as T
import Data.Vector (Vector)
import qualified Data.Vector as V
Expand Down Expand Up @@ -270,17 +274,31 @@ trimShards ServerContext{..} streamName recordIds = do
let streamId = S.transToStreamName streamName
shards <- M.elems <$> S.listStreamPartitions scLDClient streamId
concurrentCap <- getNumCapabilities
res <- limitedMapConcurrently (min 8 concurrentCap) (trim shards) points
return $ M.fromList res
(errors, res) <- partitionEithers <$> limitedMapConcurrently (min 8 concurrentCap) (trim shards) points
if null errors
then return $ M.fromList res
else throwIO @HE.SomeHStreamException . fromJust . fromException $ head errors
where
trim shards r@Rid{..} = do
unless (rShardId `elem` shards) $
throwIO . HE.ShardNotExists $ "shard " <> show rShardId <> " doesn't belong to stream " <> show streamName
S.trim scLDClient rShardId (rBatchId - 1)
Log.info $ "trim to " <> Log.build (show $ rBatchId - 1)
<> " for shard " <> Log.build (show rShardId)
<> ", stream " <> Log.build streamName
return (rShardId, T.pack . show $ r)
trim shards r@Rid{..}
| rShardId `elem` shards = do
try (S.trim scLDClient rShardId (rBatchId - 1)) >>= \case
Left (e:: SomeException)
| Just e' <- fromException @S.TOOBIG e -> do
Log.warning $ "trim shard " <> Log.build rShardId <> " with stream " <> Log.build streamName
<> " return error: " <> Log.build (displayException e')
return . Left . toException . HE.InvalidRecordId $ "recordId " <> show r <> " is beyond the tail of log: " <> displayException e'
| otherwise -> do
Log.warning $ "trim shard " <> Log.build rShardId <> " with stream " <> Log.build streamName
<> " return error: " <> Log.build (displayException e)
return . Left . toException . HE.SomeStoreInternal $ "trim shard with recordId " <> show r <> " error: " <> displayException e
Right _ -> do
Log.info $ "trim to " <> Log.build (show $ rBatchId - 1)
<> " for shard " <> Log.build (show rShardId)
<> ", stream " <> Log.build streamName
return . Right $ (rShardId, T.pack . show $ r)
| otherwise = do
Log.warning $ "trim shards error, shard " <> Log.build rShardId <> " doesn't belong to stream " <> Log.build streamName
return . Left . toException . HE.ShardNotExists $ "shard " <> show rShardId <> " doesn't belong to stream " <> show streamName

getStreamInfo :: ServerContext -> S.StreamId -> IO API.Stream
getStreamInfo ServerContext{..} stream = do
Expand Down
20 changes: 14 additions & 6 deletions hstream/src/HStream/Server/Exception.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ finalExceptionHandlers = [
]

storeExceptionHandlers :: [Handler (StatusCode, StatusDetails)]
storeExceptionHandlers = [
Handler $ \(err :: Store.EXISTS) -> do
Log.warning $ Log.buildString' err
return (StatusAlreadyExists, HE.mkStatusDetails $ HE.SomeStoreInternal "Stream or view with same name already exists in store")
,
Handler $ \(err :: Store.SomeHStoreException) -> do
storeExceptionHandlers =
[ Handler $ \(err :: Store.EXISTS) -> do
Log.warning $ Log.buildString' err
return (StatusAlreadyExists, HE.mkStatusDetails $ HE.SomeStoreInternal "Stream or view with same name already exists in store")

, Handler $ \(err :: Store.TOOBIG) -> do
Log.warning $ Log.buildString' err
HsGrpc.throwGrpcError $ HE.mkGrpcStatus err HsGrpc.StatusInvalidArgument

, Handler $ \(err :: Store.SomeHStoreException) -> do
Log.warning $ Log.buildString' err
return (StatusInternal, HE.mkStatusDetails (HE.SomeStoreInternal (displayException err)))
]
Expand Down Expand Up @@ -109,6 +113,10 @@ hStoreExHandlers =
let x = Just "Stream or view with same name already exists in store"
HsGrpc.throwGrpcError $ HsGrpc.GrpcStatus HsGrpc.StatusAlreadyExists x Nothing

, Handler $ \(err :: Store.TOOBIG) -> do
Log.warning $ Log.buildString' err
HsGrpc.throwGrpcError $ HE.mkGrpcStatus err HsGrpc.StatusInvalidArgument

, Handler $ \(err :: Store.SomeHStoreException) -> do
Log.fatal $ Log.buildString' err
HsGrpc.throwGrpcError $ HE.mkGrpcStatus err HsGrpc.StatusInternal
Expand Down
3 changes: 2 additions & 1 deletion hstream/src/HStream/Server/Handler/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ trimStreamHandler sc (ServerNormalRequest _metadata request@TrimStreamRequest{..

handleTrimStream :: ServerContext -> G.UnaryHandler TrimStreamRequest Empty
handleTrimStream sc _ request@TrimStreamRequest{..} = catchDefaultEx $ do
Log.info $ "Receive trim stram Request: " <> Log.buildString' request
Log.info $ "Receive trim stream Request: " <> Log.buildString' request
validateNameAndThrow ResStream trimStreamRequestStreamName
when (isNothing trimStreamRequestTrimPoint) $
throwIO . HE.InvalidTrimPoint $ "invalid trim point: " <> show trimStreamRequestTrimPoint
Expand All @@ -188,6 +188,7 @@ trimShardsHandler sc (ServerNormalRequest _metadata request@TrimShardsRequest{..
C.trimShards sc trimShardsRequestStreamName trimShardsRequestRecordIds
>>= returnResp . TrimShardsResponse

-- FIXME: update TrimShardsResponse to return successed and failed result
handleTrimShards :: ServerContext -> G.UnaryHandler TrimShardsRequest TrimShardsResponse
handleTrimShards sc _ request@TrimShardsRequest{..} = catchDefaultEx $ do
Log.info $ "Receive trim Shards Request: " <> Log.buildString' request
Expand Down

0 comments on commit 6a3f6ad

Please sign in to comment.