From 6a3f6ad05c72d1409db487579dcf84fb38eca3f8 Mon Sep 17 00:00:00 2001 From: YangKian <45479280+YangKian@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:04:00 +0800 Subject: [PATCH] fix: an error while executing trimShards should not interrupt the execution (#1707) --- hstream/src/HStream/Server/Core/Stream.hs | 42 ++++++++++++++------ hstream/src/HStream/Server/Exception.hs | 20 +++++++--- hstream/src/HStream/Server/Handler/Stream.hs | 3 +- 3 files changed, 46 insertions(+), 19 deletions(-) diff --git a/hstream/src/HStream/Server/Core/Stream.hs b/hstream/src/HStream/Server/Core/Stream.hs index ea56b5454..95d97c851 100644 --- a/hstream/src/HStream/Server/Core/Stream.hs +++ b/hstream/src/HStream/Server/Core/Stream.hs @@ -1,8 +1,10 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternGuards #-} module HStream.Server.Core.Stream ( createStream @@ -24,7 +26,9 @@ module HStream.Server.Core.Stream ) where import Control.Concurrent (getNumCapabilities) -import Control.Exception (catch, throwIO) +import Control.Exception (Exception (displayException, fromException, toException), + SomeException, catch, + throwIO, try) import Control.Monad (forM, forM_, unless, void, when) import qualified Data.Attoparsec.Text as AP @@ -34,7 +38,7 @@ import Data.Either (partitionEithers) import Data.Functor ((<&>)) import qualified Data.List as L import qualified Data.Map.Strict as M -import Data.Maybe (fromMaybe) +import Data.Maybe (fromJust, fromMaybe) import qualified Data.Text as T import Data.Vector (Vector) import qualified Data.Vector as V @@ -270,17 +274,31 @@ trimShards ServerContext{..} streamName recordIds = do let streamId = S.transToStreamName streamName shards <- M.elems <$> S.listStreamPartitions scLDClient streamId concurrentCap <- getNumCapabilities - res <- limitedMapConcurrently (min 8 concurrentCap) (trim shards) points - return $ M.fromList res + (errors, res) <- partitionEithers <$> limitedMapConcurrently (min 8 concurrentCap) (trim shards) points + if null errors + then return $ M.fromList res + else throwIO @HE.SomeHStreamException . fromJust . fromException $ head errors where - trim shards r@Rid{..} = do - unless (rShardId `elem` shards) $ - throwIO . HE.ShardNotExists $ "shard " <> show rShardId <> " doesn't belong to stream " <> show streamName - S.trim scLDClient rShardId (rBatchId - 1) - Log.info $ "trim to " <> Log.build (show $ rBatchId - 1) - <> " for shard " <> Log.build (show rShardId) - <> ", stream " <> Log.build streamName - return (rShardId, T.pack . show $ r) + trim shards r@Rid{..} + | rShardId `elem` shards = do + try (S.trim scLDClient rShardId (rBatchId - 1)) >>= \case + Left (e:: SomeException) + | Just e' <- fromException @S.TOOBIG e -> do + Log.warning $ "trim shard " <> Log.build rShardId <> " with stream " <> Log.build streamName + <> " return error: " <> Log.build (displayException e') + return . Left . toException . HE.InvalidRecordId $ "recordId " <> show r <> " is beyond the tail of log: " <> displayException e' + | otherwise -> do + Log.warning $ "trim shard " <> Log.build rShardId <> " with stream " <> Log.build streamName + <> " return error: " <> Log.build (displayException e) + return . Left . toException . HE.SomeStoreInternal $ "trim shard with recordId " <> show r <> " error: " <> displayException e + Right _ -> do + Log.info $ "trim to " <> Log.build (show $ rBatchId - 1) + <> " for shard " <> Log.build (show rShardId) + <> ", stream " <> Log.build streamName + return . Right $ (rShardId, T.pack . show $ r) + | otherwise = do + Log.warning $ "trim shards error, shard " <> Log.build rShardId <> " doesn't belong to stream " <> Log.build streamName + return . Left . toException . HE.ShardNotExists $ "shard " <> show rShardId <> " doesn't belong to stream " <> show streamName getStreamInfo :: ServerContext -> S.StreamId -> IO API.Stream getStreamInfo ServerContext{..} stream = do diff --git a/hstream/src/HStream/Server/Exception.hs b/hstream/src/HStream/Server/Exception.hs index 6d4a86681..b09854a78 100644 --- a/hstream/src/HStream/Server/Exception.hs +++ b/hstream/src/HStream/Server/Exception.hs @@ -67,12 +67,16 @@ finalExceptionHandlers = [ ] storeExceptionHandlers :: [Handler (StatusCode, StatusDetails)] -storeExceptionHandlers = [ - Handler $ \(err :: Store.EXISTS) -> do - Log.warning $ Log.buildString' err - return (StatusAlreadyExists, HE.mkStatusDetails $ HE.SomeStoreInternal "Stream or view with same name already exists in store") - , - Handler $ \(err :: Store.SomeHStoreException) -> do +storeExceptionHandlers = + [ Handler $ \(err :: Store.EXISTS) -> do + Log.warning $ Log.buildString' err + return (StatusAlreadyExists, HE.mkStatusDetails $ HE.SomeStoreInternal "Stream or view with same name already exists in store") + + , Handler $ \(err :: Store.TOOBIG) -> do + Log.warning $ Log.buildString' err + HsGrpc.throwGrpcError $ HE.mkGrpcStatus err HsGrpc.StatusInvalidArgument + + , Handler $ \(err :: Store.SomeHStoreException) -> do Log.warning $ Log.buildString' err return (StatusInternal, HE.mkStatusDetails (HE.SomeStoreInternal (displayException err))) ] @@ -109,6 +113,10 @@ hStoreExHandlers = let x = Just "Stream or view with same name already exists in store" HsGrpc.throwGrpcError $ HsGrpc.GrpcStatus HsGrpc.StatusAlreadyExists x Nothing + , Handler $ \(err :: Store.TOOBIG) -> do + Log.warning $ Log.buildString' err + HsGrpc.throwGrpcError $ HE.mkGrpcStatus err HsGrpc.StatusInvalidArgument + , Handler $ \(err :: Store.SomeHStoreException) -> do Log.fatal $ Log.buildString' err HsGrpc.throwGrpcError $ HE.mkGrpcStatus err HsGrpc.StatusInternal diff --git a/hstream/src/HStream/Server/Handler/Stream.hs b/hstream/src/HStream/Server/Handler/Stream.hs index 1cec6984d..158c0c590 100644 --- a/hstream/src/HStream/Server/Handler/Stream.hs +++ b/hstream/src/HStream/Server/Handler/Stream.hs @@ -170,7 +170,7 @@ trimStreamHandler sc (ServerNormalRequest _metadata request@TrimStreamRequest{.. handleTrimStream :: ServerContext -> G.UnaryHandler TrimStreamRequest Empty handleTrimStream sc _ request@TrimStreamRequest{..} = catchDefaultEx $ do - Log.info $ "Receive trim stram Request: " <> Log.buildString' request + Log.info $ "Receive trim stream Request: " <> Log.buildString' request validateNameAndThrow ResStream trimStreamRequestStreamName when (isNothing trimStreamRequestTrimPoint) $ throwIO . HE.InvalidTrimPoint $ "invalid trim point: " <> show trimStreamRequestTrimPoint @@ -188,6 +188,7 @@ trimShardsHandler sc (ServerNormalRequest _metadata request@TrimShardsRequest{.. C.trimShards sc trimShardsRequestStreamName trimShardsRequestRecordIds >>= returnResp . TrimShardsResponse +-- FIXME: update TrimShardsResponse to return successed and failed result handleTrimShards :: ServerContext -> G.UnaryHandler TrimShardsRequest TrimShardsResponse handleTrimShards sc _ request@TrimShardsRequest{..} = catchDefaultEx $ do Log.info $ "Receive trim Shards Request: " <> Log.buildString' request