Skip to content

Commit

Permalink
switch Bucket to simplified/optimized version of IntPSQ
Browse files Browse the repository at this point in the history
  • Loading branch information
mitchellwrosen committed Oct 3, 2023
1 parent fbafcc1 commit 06d1e2b
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 422 deletions.
16 changes: 8 additions & 8 deletions src/TimerWheel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ register TimerWheel {buckets, numTimers, resolution, timerIdSupply} delay action
let index = timestampToIndex buckets resolution timestamp
timerId <- incrCounter timerIdSupply
mask_ do
atomicModifyArray buckets index (Bucket.insert timerId (coerce @Timestamp @Word64 timestamp) (OneShot1 action))
atomicModifyArray buckets index (Bucket.insert timerId timestamp (OneShot1 action))
incrCounter_ numTimers
coerce @(IO (IO Bool)) @(IO (Timer Bool)) do
pure do
mask_ do
deleted <- atomicMaybeModifyArray buckets index (Bucket.delete timerId)
deleted <- atomicMaybeModifyArray buckets index (Bucket.deleteExpectingHit timerId)
when deleted (decrCounter_ numTimers)
pure deleted

Expand Down Expand Up @@ -214,7 +214,7 @@ recurring TimerWheel {buckets, numTimers, resolution, timerIdSupply} (Nanosecond
timerId <- incrCounter timerIdSupply
canceledRef <- newIORef False
mask_ do
atomicModifyArray buckets index (Bucket.insert timerId (coerce @Timestamp @Word64 timestamp) (Recurring1 action delay canceledRef))
atomicModifyArray buckets index (Bucket.insert timerId timestamp (Recurring1 action delay canceledRef))
incrCounter_ numTimers
coerce @(IO (IO ())) @(IO (Timer ())) do
pure do
Expand All @@ -236,7 +236,7 @@ recurring_ TimerWheel {buckets, numTimers, resolution, timerIdSupply} (Nanosecon
let index = timestampToIndex buckets resolution timestamp
timerId <- incrCounter timerIdSupply
mask_ do
atomicModifyArray buckets index (Bucket.insert timerId (coerce @Timestamp @Word64 timestamp) (Recurring1_ action delay))
atomicModifyArray buckets index (Bucket.insert timerId timestamp (Recurring1_ action delay))
incrCounter_ numTimers

-- | A registered timer, parameterized by the result of attempting to cancel it:
Expand Down Expand Up @@ -321,13 +321,13 @@ atomicMaybeModifyArray buckets index doDelete = do
if success then pure True else loop ticket1

atomicExtractExpiredTimersFromBucket :: MutableArray RealWorld (Bucket Timer0) -> Int -> Timestamp -> IO (Bucket Timer0)
atomicExtractExpiredTimersFromBucket buckets index (coerce @Timestamp @Word64 -> now) = do
atomicExtractExpiredTimersFromBucket buckets index now = do
ticket0 <- Atomics.readArrayElem buckets index
loop ticket0
where
loop :: Atomics.Ticket (Bucket Timer0) -> IO (Bucket Timer0)
loop ticket = do
let (expired, bucket1) = Bucket.partition now (Atomics.peekTicket ticket)
let Bucket.Pair expired bucket1 = Bucket.partition now (Atomics.peekTicket ticket)
if Bucket.isEmpty expired
then pure Bucket.empty
else do
Expand Down Expand Up @@ -467,7 +467,7 @@ runTimerReaperThread buckets numTimers resolution = do
case Bucket.pop bucket0 of
Bucket.PopNada -> pure ()
Bucket.PopAlgo timerId timestamp timer bucket1 -> do
expired2 <- fireTimer bucket1 timerId (coerce @Word64 @Timestamp timestamp) timer
expired2 <- fireTimer bucket1 timerId timestamp timer
fireTimerBucket expired2

fireTimer :: Bucket Timer0 -> TimerId -> Timestamp -> Timer0 -> IO (Bucket Timer0)
Expand Down Expand Up @@ -500,4 +500,4 @@ runTimerReaperThread buckets numTimers resolution = do
where
insertNextOccurrence :: Bucket Timer0 -> Bucket Timer0
insertNextOccurrence =
Bucket.insert timerId (coerce @Timestamp @Word64 nextOccurrence) timer
Bucket.insert timerId nextOccurrence timer
243 changes: 220 additions & 23 deletions src/TimerWheel/Internal/Bucket.hs
Original file line number Diff line number Diff line change
@@ -1,54 +1,251 @@
module TimerWheel.Internal.Bucket
( Bucket,
empty,

-- * Queries
isEmpty,
partition,

-- * Modifications
insert,
Pop (..),
pop,
delete,
deleteExpectingHit,

-- * Strict pair
Pair (..),
)
where

import Data.IntPSQ (IntPSQ)
import qualified Data.IntPSQ as IntPSQ
import Data.Bits
import TimerWheel.Internal.Prelude
import TimerWheel.Internal.Timestamp (Timestamp)

type Bucket a =
IntPSQ Word64 a
data Bucket a
= -- Invariants on `Bin k p v m l r`:
-- 1. `l` and `r` can't both be Nil
-- 2. `p` is <= all `p` in `l` and `r`
-- 3. `k` is not an element of `l` nor `r`
-- 4. `m` has one 1-bit, which is the highest bit position at which any two keys in `l` and `r` differ
-- 5. No key in `l` has the `m` bit set
-- 6. All keys in `r` have the `m` bit set
Bin {-# UNPACK #-} !TimerId {-# UNPACK #-} !Timestamp !a {-# UNPACK #-} !Mask !(Bucket a) !(Bucket a)
| Tip {-# UNPACK #-} !TimerId {-# UNPACK #-} !Timestamp !a
| Nil

type TimerId = Int
type Mask = Word64

type Timestamp = Word64
type TimerId = Int

-- | An empty bucket.
empty :: Bucket a
empty =
IntPSQ.empty
Nil

isEmpty :: Bucket a -> Bool
isEmpty =
IntPSQ.null
isEmpty = \case
Nil -> True
_ -> False

partition :: Timestamp -> Bucket a -> (Bucket a, Bucket a)
partition timestamp bucket =
let (expired0, alive) = IntPSQ.atMostView timestamp bucket
!expired1 = IntPSQ.fromList expired0
in (expired1, alive)
-- | Partition a bucket by timestamp (less-than-or-equal-to, greater-than).
partition :: forall a. Timestamp -> Bucket a -> Pair (Bucket a) (Bucket a)
partition q =
go empty
where
go :: Bucket a -> Bucket a -> Pair (Bucket a) (Bucket a)
go acc t =
case t of
Nil -> Pair acc t
Tip i p x
| p > q -> Pair acc t
| otherwise -> Pair (insert i p x acc) Nil
Bin i p x m l r
| p > q -> Pair acc t
| otherwise ->
case go acc l of
Pair acc1 l1 ->
case go acc1 r of
Pair acc2 r1 -> Pair (insert i p x acc2) (merge m l1 r1)

-- | Insert a new timer into a bucket.
insert :: TimerId -> Timestamp -> a -> Bucket a -> Bucket a
insert =
IntPSQ.unsafeInsertNew
--
-- If a timer with the given id is already in the bucket, behavior is undefined.
insert :: forall a. TimerId -> Timestamp -> a -> Bucket a -> Bucket a
insert i p x t =
case t of
Nil -> Tip i p x
Tip j q y
| (p, i) < (q, j) -> link i p x j t Nil
| otherwise -> link j q y i (Tip i p x) Nil
Bin j q y m l r
| prefixNotEqual m i j ->
if (p, i) < (q, j)
then link i p x j t Nil
else link j q y i (Tip i p x) (merge m l r)
| (p, i) < (q, j) ->
if goleft j m
then Bin i p x m (insert j q y l) r
else Bin i p x m l (insert j q y r)
| otherwise ->
if goleft i m
then Bin j q y m (insert i p x l) r
else Bin j q y m l (insert i p x r)

data Pop a
= PopAlgo !TimerId !Timestamp !a !(Bucket a)
| PopNada

pop :: Bucket a -> Pop a
pop =
maybe PopNada (\(timerId, timestamp, value, bucket) -> PopAlgo timerId timestamp value bucket) . IntPSQ.minView
pop = \case
Nil -> PopNada
Tip k p x -> PopAlgo k p x Nil
Bin k p x m l r -> PopAlgo k p x (merge m l r)
{-# INLINE pop #-}

-- | Delete a timer from a bucket, expecting it to be there.
deleteExpectingHit :: TimerId -> Bucket v -> Maybe (Bucket v)
deleteExpectingHit i =
go
where
go :: Bucket v -> Maybe (Bucket v)
go = \case
Nil -> Nothing
Tip j _ _
| i == j -> Just Nil
| otherwise -> Nothing
Bin j p x m l r
-- This commented out short-circuit is what makes this delete variant "expecting a hit"
-- | nomatch m i j -> Nothing
| i == j -> Just $! merge m l r
| goleft i m -> (\l1 -> bin j p x m l1 r) <$> go l
| otherwise -> bin j p x m l <$> go r

i2w :: TimerId -> Word64
i2w = fromIntegral
{-# INLINE i2w #-}

goleft :: TimerId -> Mask -> Bool
goleft i m =
i2w i .&. m == 0
{-# INLINE goleft #-}

-- m = 00001000000000000000000
-- i = IIII???????????????????
-- j = JJJJ???????????????????
--
-- prefixNotEqual m i j answers, is IIII not equal to JJJJ?
prefixNotEqual :: Mask -> TimerId -> TimerId -> Bool
prefixNotEqual (prefixMask -> e) i j =
i2w i .&. e /= i2w j .&. e
{-# INLINE prefixNotEqual #-}

-- m = 0000000000100000
-- prefixMask m = 1111111111000000
prefixMask :: Word64 -> Word64
prefixMask m = -m `xor` m
{-# INLINE prefixMask #-}

onlyHighestBit :: Word64 -> Mask
onlyHighestBit w = unsafeShiftL 1 (63 - countLeadingZeros w)
{-# INLINE onlyHighestBit #-}

link :: TimerId -> Timestamp -> v -> TimerId -> Bucket v -> Bucket v -> Bucket v
link i p x j l r
| goleft j m = Bin i p x m l r
| otherwise = Bin i p x m r l
where
m = onlyHighestBit (i2w i `xor` i2w j)

-- | 'Bin' smart constructor, respecting the invariant that both children can't be 'Nil'.
bin :: TimerId -> Timestamp -> v -> Mask -> Bucket v -> Bucket v -> Bucket v
bin i p x _ Nil Nil = Tip i p x
bin i p x m l r = Bin i p x m l r
{-# INLINE bin #-}

-- Merge two disjoint buckets that have the same mask.
merge :: Mask -> Bucket v -> Bucket v -> Bucket v
merge m l r =
case (l, r) of
(Nil, _) -> r
(_, Nil) -> l
--
-- ip jq
--
(Tip i p x, Tip j q y)
--
-- ip
-- / \
-- nil jq
--
| (p, i) < (q, j) -> Bin i p x m Nil r
--
-- jq
-- / \
-- ip nil
--
| otherwise -> Bin j q y m l Nil
--
-- ip jq
-- / \
-- rl rr
--
(Tip i p x, Bin j q y n rl rr)
--
-- ip
-- / \
-- nil jq
-- / \
-- rl rr
--
| (p, i) < (q, j) -> Bin i p x m Nil r
--
-- jq
-- / \
-- ip rl+rr
--
| otherwise -> Bin j q y m l (merge n rl rr)
--
-- ip jq
-- / \
-- ll lr
--
(Bin i p x n ll lr, Tip j q y)
--
-- ip
-- / \
-- ll+lr jq
--
| (p, i) < (q, j) -> Bin i p x m (merge n ll lr) r
--
-- jq
-- / \
-- ip nil
-- / \
-- ll lr
--
| otherwise -> Bin j q y m l Nil
--
-- ip jq
-- / \ / \
-- ll lr rl rr
--
(Bin i p x n ll lr, Bin j q y o rl rr)
--
-- ip
-- / \
-- ll+lr jq
-- / \
-- rl rr
--
| (p, i) < (q, j) -> Bin i p x m (merge n ll lr) r
--
-- jq
-- / \
-- ip rl+rr
-- / \
-- ll lr
--
| otherwise -> Bin j q y m l (merge o rl rr)

delete :: TimerId -> Bucket a -> Maybe (Bucket a)
delete timerId =
fmap (\(_, _, bucket) -> bucket) . IntPSQ.deleteView timerId
data Pair a b
= Pair !a !b
Loading

0 comments on commit 06d1e2b

Please sign in to comment.