Skip to content

Commit

Permalink
refactoring and a minor bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
mitchellwrosen committed Sep 28, 2023
1 parent e83bc41 commit af6372d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
replaced with sensible defaults
- `recurring` / `recurring_` no longer throw an exception if given a negative delay
- Replace `array` with `primitive`
- Make calling `cancel` more than once on a recurring timer not enter an infinite loop
- Slightly improve timer insert performance

## [0.4.0.1] - 2022-11-05
Expand Down
43 changes: 31 additions & 12 deletions src/TimerWheel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ with config action =
-- seconds.
--
-- Returns an action that, when called, attempts to cancel the timer, and returns whether or not it was successful
-- (@False@ means the timer has already fired, or was already cancelled).
-- (@False@ means the timer has already fired, or has already been cancelled).
register ::
-- | The timer wheel
TimerWheel ->
Expand Down Expand Up @@ -183,7 +183,8 @@ registerImpl TimerWheel {buckets, numTimers, resolution, timerIdSupply, totalMic
incrCounter_ numTimers
timerId <- incrCounter timerIdSupply
let index = timestampToIndex buckets resolution (now `Timestamp.plus` delay)
atomicInsertIntoBucket buckets totalMicros index timerId delay action
let c = unMicros (delay `Micros.div` totalMicros)
atomicInsertIntoBucket buckets index (Entries.insert timerId c action)
pure (atomicDeleteFromBucket buckets index timerId)

-- | @recurring wheel action delay@ registers an action __@action@__ in timer wheel __@wheel@__ to fire every
Expand Down Expand Up @@ -232,7 +233,9 @@ recurring wheel (Micros.fromSeconds -> delay) action = do
tryCancel <- readIORef tryCancelRef
tryCancel >>= \case
False -> cancel
True -> pure ()
-- Writing `pure False` here just allows bad user-code that calls `cancel` after it's returned `True` to
-- return `False` immediately (like a normal one-shot timer does), rather than busy loop to death.
True -> writeIORef tryCancelRef (pure False)
pure cancel

-- | Like 'recurring', but for when you don't intend to cancel the timer.
Expand Down Expand Up @@ -279,6 +282,24 @@ count :: TimerWheel -> IO Int
count TimerWheel {numTimers} =
readCounter numTimers

-- `timestampToIndex buckets resolution timestamp` figures out which index `timestamp` corresponds to in `buckets`,
-- where each bucket corresponds to `resolution` microseconds.
--
-- For example, consider a three-element `buckets` with resolution `10000us`.
--
-- +-----------------------------+
-- | 10000us | 10000us | 10000us |
-- +-----------------------------+
--
-- Some timestamp like `1053298012387` gets binned to one of the three indices 0, 1, or 2, with quick and easy maffs:
--
-- 1. Figure out which index the timestamp corresponds to, if there were infinitely many:
--
-- 1053298012387 `div` 10000 = 105329801
--
-- 2. Wrap around per the actual length of the array:
--
-- 105329801 `rem` 3 = 2
timestampToIndex :: MutableArray RealWorld Entries -> Micros -> Timestamp -> Int
timestampToIndex buckets resolution timestamp =
-- This downcast is safe because there are at most `maxBound :: Int` buckets (not that anyone would ever have that
Expand All @@ -289,19 +310,15 @@ timestampToIndex buckets resolution timestamp =
------------------------------------------------------------------------------------------------------------------------
-- Atomic operations on buckets

atomicInsertIntoBucket :: MutableArray RealWorld Entries -> Micros -> Int -> TimerId -> Micros -> IO () -> IO ()
atomicInsertIntoBucket buckets totalMicros index timerId delay action = do
atomicInsertIntoBucket :: MutableArray RealWorld Entries -> Int -> (Entries -> Entries) -> IO ()
atomicInsertIntoBucket buckets index doInsert = do
ticket0 <- Atomics.readArrayElem buckets index
loop ticket0
where
loop ticket = do
(success, ticket1) <- Atomics.casArrayElem buckets index ticket (doInsert (Atomics.peekTicket ticket))
if success then pure () else loop ticket1

doInsert :: Entries -> Entries
doInsert =
Entries.insert timerId (unMicros (delay `Micros.div` totalMicros)) action

atomicDeleteFromBucket :: MutableArray RealWorld Entries -> Int -> TimerId -> IO Bool
atomicDeleteFromBucket buckets index timerId = do
ticket0 <- Atomics.readArrayElem buckets index
Expand Down Expand Up @@ -333,19 +350,21 @@ atomicExtractExpiredTimersFromBucket buckets index = do

runTimerReaperThread :: MutableArray RealWorld Entries -> Counter -> Micros -> IO void
runTimerReaperThread buckets numTimers resolution = do
-- Sleep until the very first bucket of timers expires
now <- Timestamp.now
let remainingBucketMicros = resolution `Micros.minus` (now `Timestamp.rem` resolution)
Micros.sleep remainingBucketMicros

loop
(now `Timestamp.plus` remainingBucketMicros `Timestamp.plus` resolution)
(timestampToIndex buckets resolution now)
where
loop :: Timestamp -> Int -> IO a
loop :: Timestamp -> Int -> IO void
loop !nextTime !index = do
expired <- atomicExtractExpiredTimersFromBucket buckets index
for_ expired \action -> do
action
decrCounter_ numTimers
afterTime <- Timestamp.now
when (afterTime < nextTime) (Micros.sleep (nextTime `Timestamp.minus` afterTime))
now <- Timestamp.now
when (now < nextTime) (Micros.sleep (nextTime `Timestamp.minus` now))
loop (nextTime `Timestamp.plus` resolution) ((index + 1) `rem` Array.sizeofMutableArray buckets)
11 changes: 3 additions & 8 deletions src/TimerWheel/Internal/Entries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ partition :: Entries -> ([IO ()], Entries)
partition (Entries entries) =
case IntPSQ.atMostView 0 entries of
(expired, alive) ->
(map f expired, Entries (IntPSQ.unsafeMapMonotonic g alive))
where
f :: (Int, Word64, IO ()) -> IO ()
f (_, _, m) =
m
g :: Int -> Word64 -> IO () -> (Word64, IO ())
g _ n m =
(n - 1, m)
( map (\(_, _, m) -> m) expired,
Entries (IntPSQ.unsafeMapMonotonic (\_ n m -> (n - 1, m)) alive)
)
{-# INLINEABLE partition #-}
1 change: 1 addition & 0 deletions src/TimerWheel/Internal/Timestamp.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import GHC.Clock (getMonotonicTimeNSec)
import TimerWheel.Internal.Micros (Micros (..))
import TimerWheel.Internal.Prelude

-- Monotonic time, in microseconds
newtype Timestamp
= Timestamp Word64
deriving stock (Eq, Ord)
Expand Down

0 comments on commit af6372d

Please sign in to comment.