Skip to content

Commit

Permalink
use timewheel handle time afterFunc deal memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
DanPlayer committed Aug 5, 2022
1 parent 7b6b026 commit 5eb95e5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 134 deletions.
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ module github.com/DanPlayer/timewatch

go 1.17

require github.com/go-redis/redis/v8 v8.11.5
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/rfyiamcool/go-timewheel v1.1.0
)

require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/stretchr/testify v1.5.1 // indirect
)
106 changes: 0 additions & 106 deletions go.sum

This file was deleted.

67 changes: 40 additions & 27 deletions timewatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@ package timewatch
import (
"encoding/json"
"errors"
"github.com/rfyiamcool/go-timewheel"
"time"
)

type TimeWatch struct {
key string // marked key
watch Watch // watched attributes
cache Cache // Redis and MemoryCache and more...
Timer map[string]*time.Timer // watch map key is Watch.Field
outTimeAct bool // out time to action
key string // marked key
watch Watch // watched attributes
cache Cache // Redis and MemoryCache and more...
Timer map[string]*timewheel.Timer // watch map key is Watch.Field
outTimeAct bool // out time to action
wheel *timewheel.TimeWheel // time wheel timer
}

type Options struct {
Key string // marked key
Cache Cache // Redis and MemoryCache and more...
OutTimeAct bool // out time to action
Key string // marked key
Cache Cache // Redis and MemoryCache and more...
OutTimeAct bool // out time to action
Tick time.Duration // time wheel scale, default 1 * time.Second
BucketsNum int // Time Roulette, default 360
}

type Watch struct {
Expand All @@ -31,10 +35,25 @@ func Service(options Options) *TimeWatch {
key: options.Key,
cache: options.Cache,
outTimeAct: options.OutTimeAct,
Timer: map[string]*time.Timer{},
Timer: map[string]*timewheel.Timer{},
wheel: newWheel(options.Tick, options.BucketsNum),
}
}

func newWheel(tick time.Duration, buckets int) *timewheel.TimeWheel {
if tick == 0 {
tick = 1 * time.Second
}
if buckets == 0 {
buckets = 360
}
tw, err := timewheel.NewTimeWheel(tick, buckets, timewheel.TickSafeMode())
if err != nil {
panic(err)
}
return tw
}

func (w *TimeWatch) Start() error {
locked, err := w.lock()
if err != nil {
Expand Down Expand Up @@ -88,7 +107,7 @@ func (w *TimeWatch) StartWithCheckRestart(fc func(c Watch)) error {

left := time.Duration(time.Now().Unix()-info.TouchOffUnix) * time.Second
if left > 0 {
time.AfterFunc(left, func() {
w.wheel.AfterFunc(left, func() {
fc(info)
})
} else {
Expand All @@ -100,7 +119,7 @@ func (w *TimeWatch) StartWithCheckRestart(fc func(c Watch)) error {
return nil
}

func (w *TimeWatch) AfterFunc(t time.Duration, c Watch, f func()) (r *time.Timer, err error) {
func (w *TimeWatch) AfterFunc(t time.Duration, c Watch, f func()) (r *timewheel.Timer, err error) {
if c.Field == "" {
return nil, errors.New("field is empty")
}
Expand All @@ -112,52 +131,46 @@ func (w *TimeWatch) AfterFunc(t time.Duration, c Watch, f func()) (r *time.Timer
if err != nil {
return
}
timer := time.AfterFunc(t, func() {
timer := w.wheel.AfterFunc(t, func() {
_ = w.cache.HDel(w.key, c.Field)
f()
})
w.Timer[c.Field] = timer
return timer, nil
}

func (w *TimeWatch) Stop(field string) bool {
func (w *TimeWatch) Stop(field string) {
timer, ok := w.Timer[field]
if !ok {
return false
return
}
_ = w.cache.HDel(w.key, field)
return timer.Stop()
timer.Stop()
}

func (w *TimeWatch) Reset(field string, d time.Duration) bool {
func (w *TimeWatch) Reset(field string, d time.Duration) {
timer, ok := w.Timer[field]
if !ok {
return false
return
}

get, err := w.cache.HGet(w.key, field)
if err != nil {
return false
return
}
var c Watch
err = json.Unmarshal([]byte(get), &c)
if err != nil {
return false
return
}
c.TouchOffUnix = time.Now().Unix() + int64(d.Seconds())
bytes, _ := json.Marshal(c)
err = w.cache.HSet(w.key, c.Field, string(bytes))
if err != nil {
return false
return
}

if !timer.Stop() {
select {
case <-timer.C: // try to drain the channel
default:
}
}
return timer.Reset(d)
timer.Reset(d)
}

const LockKey = "CheckLock"
Expand Down

0 comments on commit 5eb95e5

Please sign in to comment.