Skip to content

Commit

Permalink
Fix: address HeadOverflow caused by ShardReader (#1715)
Browse files Browse the repository at this point in the history
Reason: pass maxBound (instead of maxRecords) to the reader in readInternal function
  • Loading branch information
4eUeP committed Dec 18, 2023
1 parent a44d747 commit 5084bf8
Showing 1 changed file with 45 additions and 19 deletions.
64 changes: 45 additions & 19 deletions hstream/src/HStream/Server/Core/ShardReader.hs
Expand Up @@ -143,7 +143,7 @@ readShard ServerContext{..} API.ReadShardRequest{..} = do
readRecords r@ShardReader{..} = do
let cStreamName = textToCBytes targetStream
!read_start <- getPOSIXTime
records <- readInternal r (fromIntegral readShardRequestMaxRecords)
records <- readProcessGap r (fromIntegral readShardRequestMaxRecords)
Stats.serverHistogramAdd scStatsHolder Stats.SHL_ReadLatency =<< msecSince read_start
Stats.stream_stat_add_read_in_bytes scStatsHolder cStreamName (fromIntegral . sum $ map (BS.length . S.recordPayload) records)
Stats.stream_stat_add_read_in_batches scStatsHolder cStreamName (fromIntegral $ length records)
Expand All @@ -154,24 +154,6 @@ readShard ServerContext{..} API.ReadShardRequest{..} = do
<> " read " <> Log.build (V.length res) <> " batchRecords"
return res

readInternal r@ShardReader{..} maxRecords = do
S.readerReadAllowGap @ByteString shardReader maxRecords >>= \case
Left gap@S.GapRecord{..}
| gapType == S.GapTypeAccess -> do
Log.info $ "shardReader read stream " <> Log.build targetStream <> ", shard " <> Log.build targetShard <> " meet gap " <> Log.build (show gap)
throwIO $ HE.AccessGapError $ "shardReader read stream " <> show targetStream <> ", shard " <> show targetShard <> " meet gap"
| gapType == S.GapTypeNotInConfig -> do
Log.info $ "shardReader read stream " <> Log.build targetStream <> ", shard " <> Log.build targetShard <> " meet gap " <> Log.build (show gap)
throwIO $ HE.NotInConfigGapError $ "shardReader read stream " <> show targetStream <> ", shard " <> show targetShard <> " meet gap"
| gapType == S.GapTypeUnknown -> do
Log.warning $ "shardReader read stream " <> Log.build targetStream <> ", shard " <> Log.build targetShard <> " meet gap " <> Log.build (show gap)
throwIO $ HE.UnknownGapError $ "shardReader read stream " <> show targetStream <> ", shard " <> show targetShard <> " meet gap"
| gapType == S.GapTypeDataloss -> do
Log.fatal $ "shardReader read stream " <> Log.build targetStream <> ", shard " <> Log.build targetShard <> " meet gap " <> Log.build (show gap)
throwIO $ HE.DataLossGapError $ "shardReader read stream " <> show targetStream <> ", shard " <> show targetShard <> " meet gap"
| otherwise -> readInternal r maxBound
Right dataRecords -> return dataRecords

-----------------------------------------------------------------------------------------------------
--

Expand Down Expand Up @@ -718,3 +700,47 @@ getShardId scLDClient streamId key =
<> ", shard endKey=" <> Log.build (show endKey)
return shard
else lookupShard shards

readProcessGap
:: (HasCallStack, S.DataRecordFormat a)
=> ShardReader -> Int
-> IO [S.DataRecord a]
readProcessGap ShardReader{..} maxRecords = go
where
go = do
records <- S.readerReadAllowGap shardReader maxRecords
case records of
Left gap@S.GapRecord{..}
| gapType == S.GapTypeAccess -> do
Log.info $ "shardReader read stream "
<> Log.build targetStream <> ", shard "
<> Log.build targetShard <> " meet gap "
<> Log.build (show gap)
throwIO $ HE.AccessGapError $ "shardReader read stream "
<> show targetStream
<> ", shard " <> show targetShard <> " meet gap"
| gapType == S.GapTypeNotInConfig -> do
Log.info $ "shardReader read stream "
<> Log.build targetStream <> ", shard "
<> Log.build targetShard <> " meet gap "
<> Log.build (show gap)
throwIO $ HE.NotInConfigGapError $ "shardReader read stream "
<> show targetStream <> ", shard " <> show targetShard
<> " meet gap"
| gapType == S.GapTypeUnknown -> do
Log.warning $ "shardReader read stream "
<> Log.build targetStream <> ", shard "
<> Log.build targetShard <> " meet gap "
<> Log.build (show gap)
throwIO $ HE.UnknownGapError $ "shardReader read stream "
<> show targetStream <> ", shard " <> show targetShard
<> " meet gap"
| gapType == S.GapTypeDataloss -> do
Log.fatal $ "shardReader read stream " <> Log.build targetStream
<> ", shard " <> Log.build targetShard <> " meet gap "
<> Log.build (show gap)
throwIO $ HE.DataLossGapError $ "shardReader read stream "
<> show targetStream <> ", shard " <> show targetShard
<> " meet gap"
| otherwise -> go
Right dataRecords -> return dataRecords

0 comments on commit 5084bf8

Please sign in to comment.