From baa708dba22cfae966af38c5290aab1bde5a1ef0 Mon Sep 17 00:00:00 2001 From: CharLemAznable <545541819@qq.com> Date: Tue, 19 Dec 2023 19:56:44 +0800 Subject: [PATCH] Upgrade event listeners, some refactor. --- bulkhead/bulkhead.go | 9 +- bulkhead/bulkhead_test.go | 4 +- bulkhead/decorate.go | 16 +-- bulkhead/event_listener.go | 96 +++++++++---- cache/cache.go | 2 - cache/cache_test.go | 4 +- cache/event_listener.go | 60 +++++--- circuitbreaker/circuitbreaker.go | 14 +- circuitbreaker/circuitbreaker_test.go | 24 ++-- circuitbreaker/decorate.go | 8 +- circuitbreaker/event_listener.go | 192 +++++++++++++++++++------- go.mod | 2 +- promhelper/circuitbreaker.go | 4 +- ratelimiter/decorate.go | 8 +- ratelimiter/event_listener.go | 64 ++++++--- ratelimiter/ratelimiter.go | 5 +- ratelimiter/ratelimiter_test.go | 4 +- retry/decorate.go | 8 +- retry/event_listener.go | 96 +++++++++---- retry/retry.go | 5 +- retry/retry_test.go | 8 +- timelimiter/decorate.go | 8 +- timelimiter/event_listener.go | 96 +++++++++---- timelimiter/timelimiter.go | 5 +- timelimiter/timelimiter_test.go | 4 +- 25 files changed, 498 insertions(+), 248 deletions(-) diff --git a/bulkhead/bulkhead.go b/bulkhead/bulkhead.go index f22c33e..e0dd277 100644 --- a/bulkhead/bulkhead.go +++ b/bulkhead/bulkhead.go @@ -10,9 +10,8 @@ type Bulkhead interface { Name() string Metrics() Metrics EventListener() EventListener - - acquire() error - release() + Acquire() error + Release() } func NewBulkhead(name string, configs ...ConfigBuilder) Bulkhead { @@ -51,7 +50,7 @@ func (bulkhead *semaphoreBulkhead) EventListener() EventListener { return bulkhead.eventListener } -func (bulkhead *semaphoreBulkhead) acquire() error { +func (bulkhead *semaphoreBulkhead) Acquire() error { permitted := func() bool { timeout, cancelFn := context.WithTimeout( bulkhead.rootContext, @@ -75,7 +74,7 @@ func (bulkhead *semaphoreBulkhead) acquire() error { return &FullError{name: bulkhead.name} } -func (bulkhead *semaphoreBulkhead) release() { +func (bulkhead *semaphoreBulkhead) Release() { bulkhead.semaphore.Release(1) bulkhead.metrics.release(1) bulkhead.eventListener.consumeEvent(newFinishedEvent(bulkhead.name)) diff --git a/bulkhead/bulkhead_test.go b/bulkhead/bulkhead_test.go index 5bf8b10..306e730 100644 --- a/bulkhead/bulkhead_test.go +++ b/bulkhead/bulkhead_test.go @@ -51,7 +51,7 @@ func TestBulkheadPublishEvents(t *testing.T) { } finished.Add(1) } - eventListener.OnPermitted(onPermitted).OnRejected(onRejected).OnFinished(onFinished) + eventListener.OnPermittedFunc(onPermitted).OnRejectedFunc(onRejected).OnFinishedFunc(onFinished) // 创建一个可运行的函数 fn := func() error { @@ -86,5 +86,5 @@ func TestBulkheadPublishEvents(t *testing.T) { if finished.Load() != 1 { t.Errorf("Expected 1 finished call, but got '%d'", finished.Load()) } - eventListener.Dismiss(onPermitted).Dismiss(onRejected).Dismiss(onFinished) + eventListener.DismissPermittedFunc(onPermitted).DismissRejectedFunc(onRejected).DismissFinishedFunc(onFinished) } diff --git a/bulkhead/decorate.go b/bulkhead/decorate.go index 2116c62..35086b7 100644 --- a/bulkhead/decorate.go +++ b/bulkhead/decorate.go @@ -4,40 +4,40 @@ import "github.com/CharLemAznable/ge" func DecorateRunnable(bulkhead Bulkhead, fn func() error) func() error { return func() error { - if err := bulkhead.acquire(); err != nil { + if err := bulkhead.Acquire(); err != nil { return err } - defer bulkhead.release() + defer bulkhead.Release() return fn() } } func DecorateSupplier[T any](bulkhead Bulkhead, fn func() (T, error)) func() (T, error) { return func() (T, error) { - if err := bulkhead.acquire(); err != nil { + if err := bulkhead.Acquire(); err != nil { return ge.Zero[T](), err } - defer bulkhead.release() + defer bulkhead.Release() return fn() } } func DecorateConsumer[T any](bulkhead Bulkhead, fn func(T) error) func(T) error { return func(t T) error { - if err := bulkhead.acquire(); err != nil { + if err := bulkhead.Acquire(); err != nil { return err } - defer bulkhead.release() + defer bulkhead.Release() return fn(t) } } func DecorateFunction[T any, R any](bulkhead Bulkhead, fn func(T) (R, error)) func(T) (R, error) { return func(t T) (R, error) { - if err := bulkhead.acquire(); err != nil { + if err := bulkhead.Acquire(); err != nil { return ge.Zero[R](), err } - defer bulkhead.release() + defer bulkhead.Release() return fn(t) } } diff --git a/bulkhead/event_listener.go b/bulkhead/event_listener.go index aba12ac..9ac8906 100644 --- a/bulkhead/event_listener.go +++ b/bulkhead/event_listener.go @@ -6,59 +6,99 @@ import ( ) type EventListener interface { - OnPermitted(func(PermittedEvent)) EventListener - OnRejected(func(RejectedEvent)) EventListener - OnFinished(func(FinishedEvent)) EventListener - Dismiss(any) EventListener + OnPermittedFunc(func(PermittedEvent)) EventListener + OnRejectedFunc(func(RejectedEvent)) EventListener + OnFinishedFunc(func(FinishedEvent)) EventListener + DismissPermittedFunc(func(PermittedEvent)) EventListener + DismissRejectedFunc(func(RejectedEvent)) EventListener + DismissFinishedFunc(func(FinishedEvent)) EventListener + + OnPermitted(ge.Action[PermittedEvent]) EventListener + OnRejected(ge.Action[RejectedEvent]) EventListener + OnFinished(ge.Action[FinishedEvent]) EventListener + DismissPermitted(ge.Action[PermittedEvent]) EventListener + DismissRejected(ge.Action[RejectedEvent]) EventListener + DismissFinished(ge.Action[FinishedEvent]) EventListener } func newEventListener() *eventListener { return &eventListener{ - onPermitted: make([]func(PermittedEvent), 0), - onRejected: make([]func(RejectedEvent), 0), - onFinished: make([]func(FinishedEvent), 0), + onPermitted: make([]ge.Action[PermittedEvent], 0), + onRejected: make([]ge.Action[RejectedEvent], 0), + onFinished: make([]ge.Action[FinishedEvent], 0), } } type eventListener struct { sync.RWMutex - onPermitted []func(PermittedEvent) - onRejected []func(RejectedEvent) - onFinished []func(FinishedEvent) + onPermitted []ge.Action[PermittedEvent] + onRejected []ge.Action[RejectedEvent] + onFinished []ge.Action[FinishedEvent] +} + +func (listener *eventListener) OnPermittedFunc(consumer func(PermittedEvent)) EventListener { + return listener.OnPermitted(ge.ActionFunc[PermittedEvent](consumer)) +} + +func (listener *eventListener) OnRejectedFunc(consumer func(RejectedEvent)) EventListener { + return listener.OnRejected(ge.ActionFunc[RejectedEvent](consumer)) +} + +func (listener *eventListener) OnFinishedFunc(consumer func(FinishedEvent)) EventListener { + return listener.OnFinished(ge.ActionFunc[FinishedEvent](consumer)) +} + +func (listener *eventListener) DismissPermittedFunc(consumer func(PermittedEvent)) EventListener { + return listener.DismissPermitted(ge.ActionFunc[PermittedEvent](consumer)) +} + +func (listener *eventListener) DismissRejectedFunc(consumer func(RejectedEvent)) EventListener { + return listener.DismissRejected(ge.ActionFunc[RejectedEvent](consumer)) +} + +func (listener *eventListener) DismissFinishedFunc(consumer func(FinishedEvent)) EventListener { + return listener.DismissFinished(ge.ActionFunc[FinishedEvent](consumer)) } -func (listener *eventListener) OnPermitted(consumer func(PermittedEvent)) EventListener { +func (listener *eventListener) OnPermitted(action ge.Action[PermittedEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onPermitted = ge.AppendElementUnique(listener.onPermitted, consumer) + listener.onPermitted = ge.AppendElementUnique(listener.onPermitted, action) return listener } -func (listener *eventListener) OnRejected(consumer func(RejectedEvent)) EventListener { +func (listener *eventListener) OnRejected(action ge.Action[RejectedEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onRejected = ge.AppendElementUnique(listener.onRejected, consumer) + listener.onRejected = ge.AppendElementUnique(listener.onRejected, action) return listener } -func (listener *eventListener) OnFinished(consumer func(FinishedEvent)) EventListener { +func (listener *eventListener) OnFinished(action ge.Action[FinishedEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onFinished = ge.AppendElementUnique(listener.onFinished, consumer) + listener.onFinished = ge.AppendElementUnique(listener.onFinished, action) return listener } -func (listener *eventListener) Dismiss(consumer any) EventListener { +func (listener *eventListener) DismissPermitted(action ge.Action[PermittedEvent]) EventListener { listener.Lock() defer listener.Unlock() - switch c := consumer.(type) { - case func(PermittedEvent): - listener.onPermitted = ge.RemoveElementByValue(listener.onPermitted, c) - case func(RejectedEvent): - listener.onRejected = ge.RemoveElementByValue(listener.onRejected, c) - case func(FinishedEvent): - listener.onFinished = ge.RemoveElementByValue(listener.onFinished, c) - } + listener.onPermitted = ge.RemoveElementByValue(listener.onPermitted, action) + return listener +} + +func (listener *eventListener) DismissRejected(action ge.Action[RejectedEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onRejected = ge.RemoveElementByValue(listener.onRejected, action) + return listener +} + +func (listener *eventListener) DismissFinished(action ge.Action[FinishedEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onFinished = ge.RemoveElementByValue(listener.onFinished, action) return listener } @@ -68,11 +108,11 @@ func (listener *eventListener) consumeEvent(event Event) { defer listener.RUnlock() switch e := event.(type) { case *permittedEvent: - ge.ConsumeEach(listener.onPermitted, PermittedEvent(e)) + ge.ForEach(listener.onPermitted, PermittedEvent(e)) case *rejectedEvent: - ge.ConsumeEach(listener.onRejected, RejectedEvent(e)) + ge.ForEach(listener.onRejected, RejectedEvent(e)) case *finishedEvent: - ge.ConsumeEach(listener.onFinished, FinishedEvent(e)) + ge.ForEach(listener.onFinished, FinishedEvent(e)) } }() } diff --git a/cache/cache.go b/cache/cache.go index 66dac6f..167551e 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -10,9 +10,7 @@ type Cache[K any, V any] interface { Name() string Metrics() Metrics EventListener() EventListener - WithMarshalFn(func(V) any, func(any) V) Cache[K, V] - GetOrLoad(key K, loader func(K) (V, error)) (V, error) } diff --git a/cache/cache_test.go b/cache/cache_test.go index 940856d..0515aab 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -46,7 +46,7 @@ func TestCache(t *testing.T) { } misses.Add(1) } - eventListener.OnCacheHit(onCacheHit).OnCacheMiss(onCacheMiss) + eventListener.OnCacheHitFunc(onCacheHit).OnCacheMissFunc(onCacheMiss) // fail with no error, max retries exceeded fn := func(key string) (string, error) { @@ -92,5 +92,5 @@ func TestCache(t *testing.T) { if misses.Load() != 2 { t.Errorf("Expected 2 miss calls, but got '%d'", misses.Load()) } - eventListener.Dismiss(onCacheHit).Dismiss(onCacheMiss) + eventListener.DismissCacheHitFunc(onCacheHit).DismissCacheMissFunc(onCacheMiss) } diff --git a/cache/event_listener.go b/cache/event_listener.go index a1babaf..53de9af 100644 --- a/cache/event_listener.go +++ b/cache/event_listener.go @@ -6,47 +6,71 @@ import ( ) type EventListener interface { - OnCacheHit(func(HitEvent)) EventListener - OnCacheMiss(func(MissEvent)) EventListener - Dismiss(any) EventListener + OnCacheHitFunc(func(HitEvent)) EventListener + OnCacheMissFunc(func(MissEvent)) EventListener + DismissCacheHitFunc(func(HitEvent)) EventListener + DismissCacheMissFunc(func(MissEvent)) EventListener + + OnCacheHit(ge.Action[HitEvent]) EventListener + OnCacheMiss(ge.Action[MissEvent]) EventListener + DismissCacheHit(ge.Action[HitEvent]) EventListener + DismissCacheMiss(ge.Action[MissEvent]) EventListener } func newEventListener() *eventListener { return &eventListener{ - onCacheHit: make([]func(HitEvent), 0), - onCacheMiss: make([]func(MissEvent), 0), + onCacheHit: make([]ge.Action[HitEvent], 0), + onCacheMiss: make([]ge.Action[MissEvent], 0), } } type eventListener struct { sync.RWMutex - onCacheHit []func(HitEvent) - onCacheMiss []func(MissEvent) + onCacheHit []ge.Action[HitEvent] + onCacheMiss []ge.Action[MissEvent] +} + +func (listener *eventListener) OnCacheHitFunc(consumer func(HitEvent)) EventListener { + return listener.OnCacheHit(ge.ActionFunc[HitEvent](consumer)) +} + +func (listener *eventListener) OnCacheMissFunc(consumer func(MissEvent)) EventListener { + return listener.OnCacheMiss(ge.ActionFunc[MissEvent](consumer)) +} + +func (listener *eventListener) DismissCacheHitFunc(consumer func(HitEvent)) EventListener { + return listener.DismissCacheHit(ge.ActionFunc[HitEvent](consumer)) } -func (listener *eventListener) OnCacheHit(consumer func(HitEvent)) EventListener { +func (listener *eventListener) DismissCacheMissFunc(consumer func(MissEvent)) EventListener { + return listener.DismissCacheMiss(ge.ActionFunc[MissEvent](consumer)) +} + +func (listener *eventListener) OnCacheHit(consumer ge.Action[HitEvent]) EventListener { listener.Lock() defer listener.Unlock() listener.onCacheHit = ge.AppendElementUnique(listener.onCacheHit, consumer) return listener } -func (listener *eventListener) OnCacheMiss(consumer func(MissEvent)) EventListener { +func (listener *eventListener) OnCacheMiss(consumer ge.Action[MissEvent]) EventListener { listener.Lock() defer listener.Unlock() listener.onCacheMiss = ge.AppendElementUnique(listener.onCacheMiss, consumer) return listener } -func (listener *eventListener) Dismiss(consumer any) EventListener { +func (listener *eventListener) DismissCacheHit(consumer ge.Action[HitEvent]) EventListener { listener.Lock() defer listener.Unlock() - switch c := consumer.(type) { - case func(HitEvent): - listener.onCacheHit = ge.RemoveElementByValue(listener.onCacheHit, c) - case func(MissEvent): - listener.onCacheMiss = ge.RemoveElementByValue(listener.onCacheMiss, c) - } + listener.onCacheHit = ge.RemoveElementByValue(listener.onCacheHit, consumer) + return listener +} + +func (listener *eventListener) DismissCacheMiss(consumer ge.Action[MissEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onCacheMiss = ge.RemoveElementByValue(listener.onCacheMiss, consumer) return listener } @@ -56,9 +80,9 @@ func (listener *eventListener) consumeEvent(event Event) { defer listener.RUnlock() switch e := event.(type) { case *hitEvent: - ge.ConsumeEach(listener.onCacheHit, HitEvent(e)) + ge.ForEach(listener.onCacheHit, HitEvent(e)) case *missEvent: - ge.ConsumeEach(listener.onCacheMiss, MissEvent(e)) + ge.ForEach(listener.onCacheMiss, MissEvent(e)) } }() } diff --git a/circuitbreaker/circuitbreaker.go b/circuitbreaker/circuitbreaker.go index 662e8d9..8c2ff8a 100644 --- a/circuitbreaker/circuitbreaker.go +++ b/circuitbreaker/circuitbreaker.go @@ -17,11 +17,11 @@ type CircuitBreaker interface { TransitionToClosedState() error TransitionToOpenState() error TransitionToHalfOpenState() error + Execute(func() (any, error)) (any, error) - config() *Config - execute(func() (any, error)) (any, error) acquirePermission() error publishThresholdsExceededEvent(metricsResult, Metrics) + config() *Config } func NewCircuitBreaker(name string, configs ...ConfigBuilder) CircuitBreaker { @@ -108,11 +108,7 @@ func (machine *stateMachine) stateTransition(newStateName State, generator func( return err } -func (machine *stateMachine) config() *Config { - return machine.conf -} - -func (machine *stateMachine) execute(fn func() (any, error)) (any, error) { +func (machine *stateMachine) Execute(fn func() (any, error)) (any, error) { if err := machine.acquirePermission(); err != nil { machine.publishEvent(newNotPermittedEvent(machine.name)) return nil, err @@ -166,6 +162,10 @@ func (machine *stateMachine) publishThresholdsExceededEvent(result metricsResult } } +func (machine *stateMachine) config() *Config { + return machine.conf +} + func (machine *stateMachine) loadState() *state { return machine.state.Load() } diff --git a/circuitbreaker/circuitbreaker_test.go b/circuitbreaker/circuitbreaker_test.go index 938c79c..ce253c9 100644 --- a/circuitbreaker/circuitbreaker_test.go +++ b/circuitbreaker/circuitbreaker_test.go @@ -77,9 +77,9 @@ func TestCircuitBreaker(t *testing.T) { onSlowCallRateExceeded := func(event circuitbreaker.SlowCallRateExceededEvent) { t.Error("should not listen slow call rate exceeded event") } - listener.OnSuccess(onSuccess).OnError(onError). - OnNotPermitted(onNotPermitted).OnStateTransition(onStateTransition). - OnFailureRateExceeded(onFailureRateExceeded).OnSlowCallRateExceeded(onSlowCallRateExceeded) + listener.OnSuccessFunc(onSuccess).OnErrorFunc(onError). + OnNotPermittedFunc(onNotPermitted).OnStateTransitionFunc(onStateTransition). + OnFailureRateExceededFunc(onFailureRateExceeded).OnSlowCallRateExceededFunc(onSlowCallRateExceeded) // 创建一个可运行的函数 var count atomic.Int64 @@ -207,9 +207,9 @@ func TestCircuitBreaker(t *testing.T) { } time.Sleep(time.Second) - listener.Dismiss(onSuccess).Dismiss(onError). - Dismiss(onNotPermitted).Dismiss(onStateTransition). - Dismiss(onFailureRateExceeded).Dismiss(onSlowCallRateExceeded) + listener.DismissSuccessFunc(onSuccess).DismissErrorFunc(onError). + DismissNotPermittedFunc(onNotPermitted).DismissStateTransitionFunc(onStateTransition). + DismissFailureRateExceededFunc(onFailureRateExceeded).DismissSlowCallRateExceededFunc(onSlowCallRateExceeded) } func TestCircuitBreakerSlow(t *testing.T) { @@ -266,9 +266,9 @@ func TestCircuitBreakerSlow(t *testing.T) { t.Errorf("Expected event message '%s', but got '%s'", expectedMsg, event) } } - listener.OnSuccess(onSuccess).OnError(onError). - OnNotPermitted(onNotPermitted).OnStateTransition(onStateTransition). - OnFailureRateExceeded(onFailureRateExceeded).OnSlowCallRateExceeded(onSlowCallRateExceeded) + listener.OnSuccessFunc(onSuccess).OnErrorFunc(onError). + OnNotPermittedFunc(onNotPermitted).OnStateTransitionFunc(onStateTransition). + OnFailureRateExceededFunc(onFailureRateExceeded).OnSlowCallRateExceededFunc(onSlowCallRateExceeded) // 创建一个可运行的函数 fn := func(str string) (string, error) { @@ -371,9 +371,9 @@ func TestCircuitBreakerSlow(t *testing.T) { } time.Sleep(time.Second) - listener.Dismiss(onSuccess).Dismiss(onError). - Dismiss(onNotPermitted).Dismiss(onStateTransition). - Dismiss(onFailureRateExceeded).Dismiss(onSlowCallRateExceeded) + listener.DismissSuccessFunc(onSuccess).DismissErrorFunc(onError). + DismissNotPermittedFunc(onNotPermitted).DismissStateTransitionFunc(onStateTransition). + DismissFailureRateExceededFunc(onFailureRateExceeded).DismissSlowCallRateExceededFunc(onSlowCallRateExceeded) } func TestCircuitBreakerHalfOpenError(t *testing.T) { diff --git a/circuitbreaker/decorate.go b/circuitbreaker/decorate.go index 58aa904..8ef1f73 100644 --- a/circuitbreaker/decorate.go +++ b/circuitbreaker/decorate.go @@ -4,7 +4,7 @@ import "github.com/CharLemAznable/ge" func DecorateRunnable(breaker CircuitBreaker, fn func() error) func() error { return func() error { - _, err := breaker.execute(func() (any, error) { + _, err := breaker.Execute(func() (any, error) { return nil, fn() }) return err @@ -13,7 +13,7 @@ func DecorateRunnable(breaker CircuitBreaker, fn func() error) func() error { func DecorateSupplier[T any](breaker CircuitBreaker, fn func() (T, error)) func() (T, error) { return func() (T, error) { - ret, err := breaker.execute(func() (any, error) { + ret, err := breaker.Execute(func() (any, error) { return fn() }) return ge.CastQuietly[T](ret), err @@ -22,7 +22,7 @@ func DecorateSupplier[T any](breaker CircuitBreaker, fn func() (T, error)) func( func DecorateConsumer[T any](breaker CircuitBreaker, fn func(T) error) func(T) error { return func(t T) error { - _, err := breaker.execute(func() (any, error) { + _, err := breaker.Execute(func() (any, error) { return nil, fn(t) }) return err @@ -31,7 +31,7 @@ func DecorateConsumer[T any](breaker CircuitBreaker, fn func(T) error) func(T) e func DecorateFunction[T any, R any](breaker CircuitBreaker, fn func(T) (R, error)) func(T) (R, error) { return func(t T) (R, error) { - ret, err := breaker.execute(func() (any, error) { + ret, err := breaker.Execute(func() (any, error) { return fn(t) }) return ge.CastQuietly[R](ret), err diff --git a/circuitbreaker/event_listener.go b/circuitbreaker/event_listener.go index f5c2777..57583d2 100644 --- a/circuitbreaker/event_listener.go +++ b/circuitbreaker/event_listener.go @@ -6,95 +6,183 @@ import ( ) type EventListener interface { - OnSuccess(func(SuccessEvent)) EventListener - OnError(func(ErrorEvent)) EventListener - OnNotPermitted(func(NotPermittedEvent)) EventListener - OnStateTransition(func(StateTransitionEvent)) EventListener - OnFailureRateExceeded(func(FailureRateExceededEvent)) EventListener - OnSlowCallRateExceeded(func(SlowCallRateExceededEvent)) EventListener - Dismiss(any) EventListener + OnSuccessFunc(func(SuccessEvent)) EventListener + OnErrorFunc(func(ErrorEvent)) EventListener + OnNotPermittedFunc(func(NotPermittedEvent)) EventListener + OnStateTransitionFunc(func(StateTransitionEvent)) EventListener + OnFailureRateExceededFunc(func(FailureRateExceededEvent)) EventListener + OnSlowCallRateExceededFunc(func(SlowCallRateExceededEvent)) EventListener + DismissSuccessFunc(func(SuccessEvent)) EventListener + DismissErrorFunc(func(ErrorEvent)) EventListener + DismissNotPermittedFunc(func(NotPermittedEvent)) EventListener + DismissStateTransitionFunc(func(StateTransitionEvent)) EventListener + DismissFailureRateExceededFunc(func(FailureRateExceededEvent)) EventListener + DismissSlowCallRateExceededFunc(func(SlowCallRateExceededEvent)) EventListener + + OnSuccess(ge.Action[SuccessEvent]) EventListener + OnError(ge.Action[ErrorEvent]) EventListener + OnNotPermitted(ge.Action[NotPermittedEvent]) EventListener + OnStateTransition(ge.Action[StateTransitionEvent]) EventListener + OnFailureRateExceeded(ge.Action[FailureRateExceededEvent]) EventListener + OnSlowCallRateExceeded(ge.Action[SlowCallRateExceededEvent]) EventListener + DismissSuccess(ge.Action[SuccessEvent]) EventListener + DismissError(ge.Action[ErrorEvent]) EventListener + DismissNotPermitted(ge.Action[NotPermittedEvent]) EventListener + DismissStateTransition(ge.Action[StateTransitionEvent]) EventListener + DismissFailureRateExceeded(ge.Action[FailureRateExceededEvent]) EventListener + DismissSlowCallRateExceeded(ge.Action[SlowCallRateExceededEvent]) EventListener } func newEventListener() *eventListener { return &eventListener{ - onSuccess: make([]func(SuccessEvent), 0), - onError: make([]func(ErrorEvent), 0), - onNotPermitted: make([]func(NotPermittedEvent), 0), - onStateTransition: make([]func(StateTransitionEvent), 0), - onFailureRateExceeded: make([]func(FailureRateExceededEvent), 0), - onSlowCallRateExceeded: make([]func(SlowCallRateExceededEvent), 0), + onSuccess: make([]ge.Action[SuccessEvent], 0), + onError: make([]ge.Action[ErrorEvent], 0), + onNotPermitted: make([]ge.Action[NotPermittedEvent], 0), + onStateTransition: make([]ge.Action[StateTransitionEvent], 0), + onFailureRateExceeded: make([]ge.Action[FailureRateExceededEvent], 0), + onSlowCallRateExceeded: make([]ge.Action[SlowCallRateExceededEvent], 0), } } type eventListener struct { sync.RWMutex - onSuccess []func(SuccessEvent) - onError []func(ErrorEvent) - onNotPermitted []func(NotPermittedEvent) - onStateTransition []func(StateTransitionEvent) - onFailureRateExceeded []func(FailureRateExceededEvent) - onSlowCallRateExceeded []func(SlowCallRateExceededEvent) + onSuccess []ge.Action[SuccessEvent] + onError []ge.Action[ErrorEvent] + onNotPermitted []ge.Action[NotPermittedEvent] + onStateTransition []ge.Action[StateTransitionEvent] + onFailureRateExceeded []ge.Action[FailureRateExceededEvent] + onSlowCallRateExceeded []ge.Action[SlowCallRateExceededEvent] +} + +func (listener *eventListener) OnSuccessFunc(consumer func(SuccessEvent)) EventListener { + return listener.OnSuccess(ge.ActionFunc[SuccessEvent](consumer)) +} + +func (listener *eventListener) OnErrorFunc(consumer func(ErrorEvent)) EventListener { + return listener.OnError(ge.ActionFunc[ErrorEvent](consumer)) +} + +func (listener *eventListener) OnNotPermittedFunc(consumer func(NotPermittedEvent)) EventListener { + return listener.OnNotPermitted(ge.ActionFunc[NotPermittedEvent](consumer)) +} + +func (listener *eventListener) OnStateTransitionFunc(consumer func(StateTransitionEvent)) EventListener { + return listener.OnStateTransition(ge.ActionFunc[StateTransitionEvent](consumer)) +} + +func (listener *eventListener) OnFailureRateExceededFunc(consumer func(FailureRateExceededEvent)) EventListener { + return listener.OnFailureRateExceeded(ge.ActionFunc[FailureRateExceededEvent](consumer)) +} + +func (listener *eventListener) OnSlowCallRateExceededFunc(consumer func(SlowCallRateExceededEvent)) EventListener { + return listener.OnSlowCallRateExceeded(ge.ActionFunc[SlowCallRateExceededEvent](consumer)) } -func (listener *eventListener) OnSuccess(consumer func(SuccessEvent)) EventListener { +func (listener *eventListener) DismissSuccessFunc(consumer func(SuccessEvent)) EventListener { + return listener.DismissSuccess(ge.ActionFunc[SuccessEvent](consumer)) +} + +func (listener *eventListener) DismissErrorFunc(consumer func(ErrorEvent)) EventListener { + return listener.DismissError(ge.ActionFunc[ErrorEvent](consumer)) +} + +func (listener *eventListener) DismissNotPermittedFunc(consumer func(NotPermittedEvent)) EventListener { + return listener.DismissNotPermitted(ge.ActionFunc[NotPermittedEvent](consumer)) +} + +func (listener *eventListener) DismissStateTransitionFunc(consumer func(StateTransitionEvent)) EventListener { + return listener.DismissStateTransition(ge.ActionFunc[StateTransitionEvent](consumer)) +} + +func (listener *eventListener) DismissFailureRateExceededFunc(consumer func(FailureRateExceededEvent)) EventListener { + return listener.DismissFailureRateExceeded(ge.ActionFunc[FailureRateExceededEvent](consumer)) +} + +func (listener *eventListener) DismissSlowCallRateExceededFunc(consumer func(SlowCallRateExceededEvent)) EventListener { + return listener.DismissSlowCallRateExceeded(ge.ActionFunc[SlowCallRateExceededEvent](consumer)) +} + +func (listener *eventListener) OnSuccess(action ge.Action[SuccessEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onSuccess = ge.AppendElementUnique(listener.onSuccess, consumer) + listener.onSuccess = ge.AppendElementUnique(listener.onSuccess, action) return listener } -func (listener *eventListener) OnError(consumer func(ErrorEvent)) EventListener { +func (listener *eventListener) OnError(action ge.Action[ErrorEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onError = ge.AppendElementUnique(listener.onError, consumer) + listener.onError = ge.AppendElementUnique(listener.onError, action) return listener } -func (listener *eventListener) OnNotPermitted(consumer func(NotPermittedEvent)) EventListener { +func (listener *eventListener) OnNotPermitted(action ge.Action[NotPermittedEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onNotPermitted = ge.AppendElementUnique(listener.onNotPermitted, consumer) + listener.onNotPermitted = ge.AppendElementUnique(listener.onNotPermitted, action) return listener } -func (listener *eventListener) OnStateTransition(consumer func(StateTransitionEvent)) EventListener { +func (listener *eventListener) OnStateTransition(action ge.Action[StateTransitionEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onStateTransition = ge.AppendElementUnique(listener.onStateTransition, consumer) + listener.onStateTransition = ge.AppendElementUnique(listener.onStateTransition, action) return listener } -func (listener *eventListener) OnFailureRateExceeded(consumer func(FailureRateExceededEvent)) EventListener { +func (listener *eventListener) OnFailureRateExceeded(action ge.Action[FailureRateExceededEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onFailureRateExceeded = ge.AppendElementUnique(listener.onFailureRateExceeded, consumer) + listener.onFailureRateExceeded = ge.AppendElementUnique(listener.onFailureRateExceeded, action) return listener } -func (listener *eventListener) OnSlowCallRateExceeded(consumer func(SlowCallRateExceededEvent)) EventListener { +func (listener *eventListener) OnSlowCallRateExceeded(action ge.Action[SlowCallRateExceededEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onSlowCallRateExceeded = ge.AppendElementUnique(listener.onSlowCallRateExceeded, consumer) + listener.onSlowCallRateExceeded = ge.AppendElementUnique(listener.onSlowCallRateExceeded, action) return listener } -func (listener *eventListener) Dismiss(consumer any) EventListener { +func (listener *eventListener) DismissSuccess(action ge.Action[SuccessEvent]) EventListener { listener.Lock() defer listener.Unlock() - switch c := consumer.(type) { - case func(SuccessEvent): - listener.onSuccess = ge.RemoveElementByValue(listener.onSuccess, c) - case func(ErrorEvent): - listener.onError = ge.RemoveElementByValue(listener.onError, c) - case func(NotPermittedEvent): - listener.onNotPermitted = ge.RemoveElementByValue(listener.onNotPermitted, c) - case func(StateTransitionEvent): - listener.onStateTransition = ge.RemoveElementByValue(listener.onStateTransition, c) - case func(FailureRateExceededEvent): - listener.onFailureRateExceeded = ge.RemoveElementByValue(listener.onFailureRateExceeded, c) - case func(SlowCallRateExceededEvent): - listener.onSlowCallRateExceeded = ge.RemoveElementByValue(listener.onSlowCallRateExceeded, c) - } + listener.onSuccess = ge.RemoveElementByValue(listener.onSuccess, action) + return listener +} + +func (listener *eventListener) DismissError(action ge.Action[ErrorEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onError = ge.RemoveElementByValue(listener.onError, action) + return listener +} + +func (listener *eventListener) DismissNotPermitted(action ge.Action[NotPermittedEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onNotPermitted = ge.RemoveElementByValue(listener.onNotPermitted, action) + return listener +} + +func (listener *eventListener) DismissStateTransition(action ge.Action[StateTransitionEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onStateTransition = ge.RemoveElementByValue(listener.onStateTransition, action) + return listener +} + +func (listener *eventListener) DismissFailureRateExceeded(action ge.Action[FailureRateExceededEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onFailureRateExceeded = ge.RemoveElementByValue(listener.onFailureRateExceeded, action) + return listener +} + +func (listener *eventListener) DismissSlowCallRateExceeded(action ge.Action[SlowCallRateExceededEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onSlowCallRateExceeded = ge.RemoveElementByValue(listener.onSlowCallRateExceeded, action) return listener } @@ -104,17 +192,17 @@ func (listener *eventListener) consumeEvent(event Event) { defer listener.RUnlock() switch e := event.(type) { case *successEvent: - ge.ConsumeEach(listener.onSuccess, SuccessEvent(e)) + ge.ForEach(listener.onSuccess, SuccessEvent(e)) case *errorEvent: - ge.ConsumeEach(listener.onError, ErrorEvent(e)) + ge.ForEach(listener.onError, ErrorEvent(e)) case *notPermittedEvent: - ge.ConsumeEach(listener.onNotPermitted, NotPermittedEvent(e)) + ge.ForEach(listener.onNotPermitted, NotPermittedEvent(e)) case *stateTransitionEvent: - ge.ConsumeEach(listener.onStateTransition, StateTransitionEvent(e)) + ge.ForEach(listener.onStateTransition, StateTransitionEvent(e)) case *failureRateExceededEvent: - ge.ConsumeEach(listener.onFailureRateExceeded, FailureRateExceededEvent(e)) + ge.ForEach(listener.onFailureRateExceeded, FailureRateExceededEvent(e)) case *slowCallRateExceededEvent: - ge.ConsumeEach(listener.onSlowCallRateExceeded, SlowCallRateExceededEvent(e)) + ge.ForEach(listener.onSlowCallRateExceeded, SlowCallRateExceededEvent(e)) } }() } diff --git a/go.mod b/go.mod index 4275151..9953ab7 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/CharLemAznable/resilience4go go 1.20 require ( - github.com/CharLemAznable/ge v0.1.0 + github.com/CharLemAznable/ge v0.2.0 github.com/dgraph-io/ristretto v0.1.1 github.com/prometheus/client_golang v1.17.0 github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 diff --git a/promhelper/circuitbreaker.go b/promhelper/circuitbreaker.go index 54c2304..49ce17d 100644 --- a/promhelper/circuitbreaker.go +++ b/promhelper/circuitbreaker.go @@ -21,10 +21,10 @@ func CircuitBreakerRegistry(entry circuitbreaker.CircuitBreaker, histogramBucket collectors = append(collectors, notPermittedCallsCounter(entry)) registerFn, unregisterFn := buildRegisterFn(collectors...), buildUnregisterFn(collectors...) return func(registerer prometheus.Registerer) error { - entry.EventListener().OnSuccess(onSuccess).OnError(onError) + entry.EventListener().OnSuccessFunc(onSuccess).OnErrorFunc(onError) return registerFn(registerer) }, func(registerer prometheus.Registerer) bool { - entry.EventListener().Dismiss(onSuccess).Dismiss(onError) + entry.EventListener().DismissSuccessFunc(onSuccess).DismissErrorFunc(onError) return unregisterFn(registerer) } } diff --git a/ratelimiter/decorate.go b/ratelimiter/decorate.go index eaa75e2..398ca0a 100644 --- a/ratelimiter/decorate.go +++ b/ratelimiter/decorate.go @@ -4,7 +4,7 @@ import "github.com/CharLemAznable/ge" func DecorateRunnable(limiter RateLimiter, fn func() error) func() error { return func() error { - if err := limiter.acquirePermission(); err != nil { + if err := limiter.AcquirePermission(); err != nil { return err } return fn() @@ -13,7 +13,7 @@ func DecorateRunnable(limiter RateLimiter, fn func() error) func() error { func DecorateSupplier[T any](limiter RateLimiter, fn func() (T, error)) func() (T, error) { return func() (T, error) { - if err := limiter.acquirePermission(); err != nil { + if err := limiter.AcquirePermission(); err != nil { return ge.Zero[T](), err } return fn() @@ -22,7 +22,7 @@ func DecorateSupplier[T any](limiter RateLimiter, fn func() (T, error)) func() ( func DecorateConsumer[T any](limiter RateLimiter, fn func(T) error) func(T) error { return func(t T) error { - if err := limiter.acquirePermission(); err != nil { + if err := limiter.AcquirePermission(); err != nil { return err } return fn(t) @@ -31,7 +31,7 @@ func DecorateConsumer[T any](limiter RateLimiter, fn func(T) error) func(T) erro func DecorateFunction[T any, R any](limiter RateLimiter, fn func(T) (R, error)) func(T) (R, error) { return func(t T) (R, error) { - if err := limiter.acquirePermission(); err != nil { + if err := limiter.AcquirePermission(); err != nil { return ge.Zero[R](), err } return fn(t) diff --git a/ratelimiter/event_listener.go b/ratelimiter/event_listener.go index 38bb53d..95af91f 100644 --- a/ratelimiter/event_listener.go +++ b/ratelimiter/event_listener.go @@ -6,47 +6,71 @@ import ( ) type EventListener interface { - OnSuccess(func(SuccessEvent)) EventListener - OnFailure(func(FailureEvent)) EventListener - Dismiss(any) EventListener + OnSuccessFunc(func(SuccessEvent)) EventListener + OnFailureFunc(func(FailureEvent)) EventListener + DismissSuccessFunc(func(SuccessEvent)) EventListener + DismissFailureFunc(func(FailureEvent)) EventListener + + OnSuccess(ge.Action[SuccessEvent]) EventListener + OnFailure(ge.Action[FailureEvent]) EventListener + DismissSuccess(ge.Action[SuccessEvent]) EventListener + DismissFailure(ge.Action[FailureEvent]) EventListener } func newEventListener() *eventListener { return &eventListener{ - onSuccess: make([]func(SuccessEvent), 0), - onFailure: make([]func(FailureEvent), 0), + onSuccess: make([]ge.Action[SuccessEvent], 0), + onFailure: make([]ge.Action[FailureEvent], 0), } } type eventListener struct { sync.RWMutex - onSuccess []func(SuccessEvent) - onFailure []func(FailureEvent) + onSuccess []ge.Action[SuccessEvent] + onFailure []ge.Action[FailureEvent] +} + +func (listener *eventListener) OnSuccessFunc(consumer func(SuccessEvent)) EventListener { + return listener.OnSuccess(ge.ActionFunc[SuccessEvent](consumer)) +} + +func (listener *eventListener) OnFailureFunc(consumer func(FailureEvent)) EventListener { + return listener.OnFailure(ge.ActionFunc[FailureEvent](consumer)) +} + +func (listener *eventListener) DismissSuccessFunc(consumer func(SuccessEvent)) EventListener { + return listener.DismissSuccess(ge.ActionFunc[SuccessEvent](consumer)) } -func (listener *eventListener) OnSuccess(consumer func(SuccessEvent)) EventListener { +func (listener *eventListener) DismissFailureFunc(consumer func(FailureEvent)) EventListener { + return listener.DismissFailure(ge.ActionFunc[FailureEvent](consumer)) +} + +func (listener *eventListener) OnSuccess(action ge.Action[SuccessEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onSuccess = ge.AppendElementUnique(listener.onSuccess, consumer) + listener.onSuccess = ge.AppendElementUnique(listener.onSuccess, action) return listener } -func (listener *eventListener) OnFailure(consumer func(FailureEvent)) EventListener { +func (listener *eventListener) OnFailure(action ge.Action[FailureEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onFailure = ge.AppendElementUnique(listener.onFailure, consumer) + listener.onFailure = ge.AppendElementUnique(listener.onFailure, action) return listener } -func (listener *eventListener) Dismiss(consumer any) EventListener { +func (listener *eventListener) DismissSuccess(action ge.Action[SuccessEvent]) EventListener { listener.Lock() defer listener.Unlock() - switch c := consumer.(type) { - case func(SuccessEvent): - listener.onSuccess = ge.RemoveElementByValue(listener.onSuccess, c) - case func(FailureEvent): - listener.onFailure = ge.RemoveElementByValue(listener.onFailure, c) - } + listener.onSuccess = ge.RemoveElementByValue(listener.onSuccess, action) + return listener +} + +func (listener *eventListener) DismissFailure(action ge.Action[FailureEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onFailure = ge.RemoveElementByValue(listener.onFailure, action) return listener } @@ -56,9 +80,9 @@ func (listener *eventListener) consumeEvent(event Event) { defer listener.RUnlock() switch e := event.(type) { case *successEvent: - ge.ConsumeEach(listener.onSuccess, SuccessEvent(e)) + ge.ForEach(listener.onSuccess, SuccessEvent(e)) case *failureEvent: - ge.ConsumeEach(listener.onFailure, FailureEvent(e)) + ge.ForEach(listener.onFailure, FailureEvent(e)) } }() } diff --git a/ratelimiter/ratelimiter.go b/ratelimiter/ratelimiter.go index b127ee2..d1ed154 100644 --- a/ratelimiter/ratelimiter.go +++ b/ratelimiter/ratelimiter.go @@ -11,8 +11,7 @@ type RateLimiter interface { Name() string Metrics() Metrics EventListener() EventListener - - acquirePermission() error + AcquirePermission() error } func NewRateLimiter(name string, configs ...ConfigBuilder) RateLimiter { @@ -65,7 +64,7 @@ func (limiter *atomicRateLimiter) EventListener() EventListener { return limiter.eventListener } -func (limiter *atomicRateLimiter) acquirePermission() error { +func (limiter *atomicRateLimiter) AcquirePermission() error { timeoutInNanos := limiter.config.timeoutDuration.Nanoseconds() modifiedState := limiter.updateStateWithBackOff(timeoutInNanos) if limiter.waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait) { diff --git a/ratelimiter/ratelimiter_test.go b/ratelimiter/ratelimiter_test.go index 8088dc7..e210a3b 100644 --- a/ratelimiter/ratelimiter_test.go +++ b/ratelimiter/ratelimiter_test.go @@ -47,7 +47,7 @@ func TestRateLimiterPublishEvents(t *testing.T) { } failure.Add(1) } - eventListener.OnSuccess(onSuccess).OnFailure(onFailure) + eventListener.OnSuccessFunc(onSuccess).OnFailureFunc(onFailure) // 创建一个可运行的函数 fn := func() error { @@ -70,5 +70,5 @@ func TestRateLimiterPublishEvents(t *testing.T) { if failure.Load() != 1 { t.Errorf("Expected 1 failure call, but got '%d'", failure.Load()) } - eventListener.Dismiss(onSuccess).Dismiss(onFailure) + eventListener.DismissSuccessFunc(onSuccess).DismissFailureFunc(onFailure) } diff --git a/retry/decorate.go b/retry/decorate.go index 981e584..d0c007f 100644 --- a/retry/decorate.go +++ b/retry/decorate.go @@ -4,7 +4,7 @@ import "github.com/CharLemAznable/ge" func DecorateRunnable(retry Retry, fn func() error) func() error { return func() error { - _, err := retry.execute(func() (any, error) { + _, err := retry.Execute(func() (any, error) { return nil, fn() }) return err @@ -13,7 +13,7 @@ func DecorateRunnable(retry Retry, fn func() error) func() error { func DecorateSupplier[T any](retry Retry, fn func() (T, error)) func() (T, error) { return func() (T, error) { - ret, err := retry.execute(func() (any, error) { + ret, err := retry.Execute(func() (any, error) { return fn() }) return ge.CastQuietly[T](ret), err @@ -22,7 +22,7 @@ func DecorateSupplier[T any](retry Retry, fn func() (T, error)) func() (T, error func DecorateConsumer[T any](retry Retry, fn func(T) error) func(T) error { return func(t T) error { - _, err := retry.execute(func() (any, error) { + _, err := retry.Execute(func() (any, error) { return nil, fn(t) }) return err @@ -31,7 +31,7 @@ func DecorateConsumer[T any](retry Retry, fn func(T) error) func(T) error { func DecorateFunction[T any, R any](retry Retry, fn func(T) (R, error)) func(T) (R, error) { return func(t T) (R, error) { - ret, err := retry.execute(func() (any, error) { + ret, err := retry.Execute(func() (any, error) { return fn(t) }) return ge.CastQuietly[R](ret), err diff --git a/retry/event_listener.go b/retry/event_listener.go index 98b9a29..b912c1a 100644 --- a/retry/event_listener.go +++ b/retry/event_listener.go @@ -6,59 +6,99 @@ import ( ) type EventListener interface { - OnSuccess(func(SuccessEvent)) EventListener - OnRetry(func(RetryEvent)) EventListener - OnError(func(ErrorEvent)) EventListener - Dismiss(any) EventListener + OnSuccessFunc(func(SuccessEvent)) EventListener + OnRetryFunc(func(RetryEvent)) EventListener + OnErrorFunc(func(ErrorEvent)) EventListener + DismissSuccessFunc(func(SuccessEvent)) EventListener + DismissRetryFunc(func(RetryEvent)) EventListener + DismissErrorFunc(func(ErrorEvent)) EventListener + + OnSuccess(ge.Action[SuccessEvent]) EventListener + OnRetry(ge.Action[RetryEvent]) EventListener + OnError(ge.Action[ErrorEvent]) EventListener + DismissSuccess(ge.Action[SuccessEvent]) EventListener + DismissRetry(ge.Action[RetryEvent]) EventListener + DismissError(ge.Action[ErrorEvent]) EventListener } func newEventListener() *eventListener { return &eventListener{ - onSuccess: make([]func(SuccessEvent), 0), - onRetry: make([]func(RetryEvent), 0), - onError: make([]func(ErrorEvent), 0), + onSuccess: make([]ge.Action[SuccessEvent], 0), + onRetry: make([]ge.Action[RetryEvent], 0), + onError: make([]ge.Action[ErrorEvent], 0), } } type eventListener struct { sync.RWMutex - onSuccess []func(SuccessEvent) - onRetry []func(RetryEvent) - onError []func(ErrorEvent) + onSuccess []ge.Action[SuccessEvent] + onRetry []ge.Action[RetryEvent] + onError []ge.Action[ErrorEvent] +} + +func (listener *eventListener) OnSuccessFunc(consumer func(SuccessEvent)) EventListener { + return listener.OnSuccess(ge.ActionFunc[SuccessEvent](consumer)) +} + +func (listener *eventListener) OnRetryFunc(consumer func(RetryEvent)) EventListener { + return listener.OnRetry(ge.ActionFunc[RetryEvent](consumer)) +} + +func (listener *eventListener) OnErrorFunc(consumer func(ErrorEvent)) EventListener { + return listener.OnError(ge.ActionFunc[ErrorEvent](consumer)) +} + +func (listener *eventListener) DismissSuccessFunc(consumer func(SuccessEvent)) EventListener { + return listener.DismissSuccess(ge.ActionFunc[SuccessEvent](consumer)) +} + +func (listener *eventListener) DismissRetryFunc(consumer func(RetryEvent)) EventListener { + return listener.DismissRetry(ge.ActionFunc[RetryEvent](consumer)) +} + +func (listener *eventListener) DismissErrorFunc(consumer func(ErrorEvent)) EventListener { + return listener.DismissError(ge.ActionFunc[ErrorEvent](consumer)) } -func (listener *eventListener) OnSuccess(consumer func(SuccessEvent)) EventListener { +func (listener *eventListener) OnSuccess(action ge.Action[SuccessEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onSuccess = ge.AppendElementUnique(listener.onSuccess, consumer) + listener.onSuccess = ge.AppendElementUnique(listener.onSuccess, action) return listener } -func (listener *eventListener) OnRetry(consumer func(RetryEvent)) EventListener { +func (listener *eventListener) OnRetry(action ge.Action[RetryEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onRetry = ge.AppendElementUnique(listener.onRetry, consumer) + listener.onRetry = ge.AppendElementUnique(listener.onRetry, action) return listener } -func (listener *eventListener) OnError(consumer func(ErrorEvent)) EventListener { +func (listener *eventListener) OnError(action ge.Action[ErrorEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onError = ge.AppendElementUnique(listener.onError, consumer) + listener.onError = ge.AppendElementUnique(listener.onError, action) return listener } -func (listener *eventListener) Dismiss(consumer any) EventListener { +func (listener *eventListener) DismissSuccess(action ge.Action[SuccessEvent]) EventListener { listener.Lock() defer listener.Unlock() - switch c := consumer.(type) { - case func(SuccessEvent): - listener.onSuccess = ge.RemoveElementByValue(listener.onSuccess, c) - case func(RetryEvent): - listener.onRetry = ge.RemoveElementByValue(listener.onRetry, c) - case func(ErrorEvent): - listener.onError = ge.RemoveElementByValue(listener.onError, c) - } + listener.onSuccess = ge.RemoveElementByValue(listener.onSuccess, action) + return listener +} + +func (listener *eventListener) DismissRetry(action ge.Action[RetryEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onRetry = ge.RemoveElementByValue(listener.onRetry, action) + return listener +} + +func (listener *eventListener) DismissError(action ge.Action[ErrorEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onError = ge.RemoveElementByValue(listener.onError, action) return listener } @@ -68,11 +108,11 @@ func (listener *eventListener) consumeEvent(event Event) { defer listener.RUnlock() switch e := event.(type) { case *successEvent: - ge.ConsumeEach(listener.onSuccess, SuccessEvent(e)) + ge.ForEach(listener.onSuccess, SuccessEvent(e)) case *retryEvent: - ge.ConsumeEach(listener.onRetry, RetryEvent(e)) + ge.ForEach(listener.onRetry, RetryEvent(e)) case *errorEvent: - ge.ConsumeEach(listener.onError, ErrorEvent(e)) + ge.ForEach(listener.onError, ErrorEvent(e)) } }() } diff --git a/retry/retry.go b/retry/retry.go index ff767fb..9592426 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -10,8 +10,7 @@ type Retry interface { Name() string Metrics() Metrics EventListener() EventListener - - execute(func() (any, error)) (any, error) + Execute(func() (any, error)) (any, error) } func NewRetry(name string, configs ...ConfigBuilder) Retry { @@ -46,7 +45,7 @@ func (r *retry) EventListener() EventListener { return r.eventListener } -func (r *retry) execute(fn func() (any, error)) (any, error) { +func (r *retry) Execute(fn func() (any, error)) (any, error) { context := r.executeOnce(fn) if r.testResult(context) { r.metrics.successfulCallsWithoutRetryAttemptIncrement() diff --git a/retry/retry_test.go b/retry/retry_test.go index 4b2dcac..97e6bc0 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -46,7 +46,7 @@ func TestSuccess(t *testing.T) { onError := func(event retry.ErrorEvent) { t.Error("Should not listen error event") } - listener.OnSuccess(onSuccess).OnRetry(onRetry).OnError(onError) + listener.OnSuccessFunc(onSuccess).OnRetryFunc(onRetry).OnErrorFunc(onError) var count atomic.Int64 fn := func() error { @@ -80,7 +80,7 @@ func TestSuccess(t *testing.T) { t.Errorf("Expected failed calls with retry attempt '0', but got '%d'", metrics.NumberOfFailedCallsWithRetryAttempt()) } - listener.Dismiss(onSuccess).Dismiss(onRetry).Dismiss(onError) + listener.DismissSuccessFunc(onSuccess).DismissRetryFunc(onRetry).DismissErrorFunc(onError) } func TestError(t *testing.T) { @@ -120,7 +120,7 @@ func TestError(t *testing.T) { t.Errorf("Expected event string '%s', but got '%s'", expected, event) } } - listener.OnSuccess(onSuccess).OnRetry(onRetry).OnError(onError) + listener.OnSuccessFunc(onSuccess).OnRetryFunc(onRetry).OnErrorFunc(onError) var count atomic.Int64 fn := func() error { @@ -154,5 +154,5 @@ func TestError(t *testing.T) { t.Errorf("Expected failed calls with retry attempt '1', but got '%d'", metrics.NumberOfFailedCallsWithRetryAttempt()) } - listener.Dismiss(onSuccess).Dismiss(onRetry).Dismiss(onError) + listener.DismissSuccessFunc(onSuccess).DismissRetryFunc(onRetry).DismissErrorFunc(onError) } diff --git a/timelimiter/decorate.go b/timelimiter/decorate.go index b1406f5..067a2f9 100644 --- a/timelimiter/decorate.go +++ b/timelimiter/decorate.go @@ -4,7 +4,7 @@ import "github.com/CharLemAznable/ge" func DecorateRunnable(limiter TimeLimiter, fn func() error) func() error { return func() error { - _, err := limiter.execute(func() (any, error) { + _, err := limiter.Execute(func() (any, error) { return nil, fn() }) return err @@ -13,7 +13,7 @@ func DecorateRunnable(limiter TimeLimiter, fn func() error) func() error { func DecorateSupplier[T any](limiter TimeLimiter, fn func() (T, error)) func() (T, error) { return func() (T, error) { - ret, err := limiter.execute(func() (any, error) { + ret, err := limiter.Execute(func() (any, error) { return fn() }) return ge.CastQuietly[T](ret), err @@ -22,7 +22,7 @@ func DecorateSupplier[T any](limiter TimeLimiter, fn func() (T, error)) func() ( func DecorateConsumer[T any](limiter TimeLimiter, fn func(T) error) func(T) error { return func(t T) error { - _, err := limiter.execute(func() (any, error) { + _, err := limiter.Execute(func() (any, error) { return nil, fn(t) }) return err @@ -31,7 +31,7 @@ func DecorateConsumer[T any](limiter TimeLimiter, fn func(T) error) func(T) erro func DecorateFunction[T any, R any](limiter TimeLimiter, fn func(T) (R, error)) func(T) (R, error) { return func(t T) (R, error) { - ret, err := limiter.execute(func() (any, error) { + ret, err := limiter.Execute(func() (any, error) { return fn(t) }) return ge.CastQuietly[R](ret), err diff --git a/timelimiter/event_listener.go b/timelimiter/event_listener.go index b662530..b473ab3 100644 --- a/timelimiter/event_listener.go +++ b/timelimiter/event_listener.go @@ -6,59 +6,99 @@ import ( ) type EventListener interface { - OnSuccess(func(SuccessEvent)) EventListener - OnTimeout(func(TimeoutEvent)) EventListener - OnPanic(func(PanicEvent)) EventListener - Dismiss(any) EventListener + OnSuccessFunc(func(SuccessEvent)) EventListener + OnTimeoutFunc(func(TimeoutEvent)) EventListener + OnPanicFunc(func(PanicEvent)) EventListener + DismissSuccessFunc(func(SuccessEvent)) EventListener + DismissTimeoutFunc(func(TimeoutEvent)) EventListener + DismissPanicFunc(func(PanicEvent)) EventListener + + OnSuccess(ge.Action[SuccessEvent]) EventListener + OnTimeout(ge.Action[TimeoutEvent]) EventListener + OnPanic(ge.Action[PanicEvent]) EventListener + DismissSuccess(ge.Action[SuccessEvent]) EventListener + DismissTimeout(ge.Action[TimeoutEvent]) EventListener + DismissPanic(ge.Action[PanicEvent]) EventListener } func newEventListener() *eventListener { return &eventListener{ - onSuccess: make([]func(SuccessEvent), 0), - onTimeout: make([]func(TimeoutEvent), 0), - onPanic: make([]func(PanicEvent), 0), + onSuccess: make([]ge.Action[SuccessEvent], 0), + onTimeout: make([]ge.Action[TimeoutEvent], 0), + onPanic: make([]ge.Action[PanicEvent], 0), } } type eventListener struct { sync.RWMutex - onSuccess []func(SuccessEvent) - onTimeout []func(TimeoutEvent) - onPanic []func(PanicEvent) + onSuccess []ge.Action[SuccessEvent] + onTimeout []ge.Action[TimeoutEvent] + onPanic []ge.Action[PanicEvent] +} + +func (listener *eventListener) OnSuccessFunc(consumer func(SuccessEvent)) EventListener { + return listener.OnSuccess(ge.ActionFunc[SuccessEvent](consumer)) +} + +func (listener *eventListener) OnTimeoutFunc(consumer func(TimeoutEvent)) EventListener { + return listener.OnTimeout(ge.ActionFunc[TimeoutEvent](consumer)) +} + +func (listener *eventListener) OnPanicFunc(consumer func(PanicEvent)) EventListener { + return listener.OnPanic(ge.ActionFunc[PanicEvent](consumer)) +} + +func (listener *eventListener) DismissSuccessFunc(consumer func(SuccessEvent)) EventListener { + return listener.DismissSuccess(ge.ActionFunc[SuccessEvent](consumer)) +} + +func (listener *eventListener) DismissTimeoutFunc(consumer func(TimeoutEvent)) EventListener { + return listener.DismissTimeout(ge.ActionFunc[TimeoutEvent](consumer)) +} + +func (listener *eventListener) DismissPanicFunc(consumer func(PanicEvent)) EventListener { + return listener.DismissPanic(ge.ActionFunc[PanicEvent](consumer)) } -func (listener *eventListener) OnSuccess(consumer func(SuccessEvent)) EventListener { +func (listener *eventListener) OnSuccess(action ge.Action[SuccessEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onSuccess = ge.AppendElementUnique(listener.onSuccess, consumer) + listener.onSuccess = ge.AppendElementUnique(listener.onSuccess, action) return listener } -func (listener *eventListener) OnTimeout(consumer func(TimeoutEvent)) EventListener { +func (listener *eventListener) OnTimeout(action ge.Action[TimeoutEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onTimeout = ge.AppendElementUnique(listener.onTimeout, consumer) + listener.onTimeout = ge.AppendElementUnique(listener.onTimeout, action) return listener } -func (listener *eventListener) OnPanic(consumer func(PanicEvent)) EventListener { +func (listener *eventListener) OnPanic(action ge.Action[PanicEvent]) EventListener { listener.Lock() defer listener.Unlock() - listener.onPanic = ge.AppendElementUnique(listener.onPanic, consumer) + listener.onPanic = ge.AppendElementUnique(listener.onPanic, action) return listener } -func (listener *eventListener) Dismiss(consumer any) EventListener { +func (listener *eventListener) DismissSuccess(action ge.Action[SuccessEvent]) EventListener { listener.Lock() defer listener.Unlock() - switch c := consumer.(type) { - case func(SuccessEvent): - listener.onSuccess = ge.RemoveElementByValue(listener.onSuccess, c) - case func(TimeoutEvent): - listener.onTimeout = ge.RemoveElementByValue(listener.onTimeout, c) - case func(PanicEvent): - listener.onPanic = ge.RemoveElementByValue(listener.onPanic, c) - } + listener.onSuccess = ge.RemoveElementByValue(listener.onSuccess, action) + return listener +} + +func (listener *eventListener) DismissTimeout(action ge.Action[TimeoutEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onTimeout = ge.RemoveElementByValue(listener.onTimeout, action) + return listener +} + +func (listener *eventListener) DismissPanic(action ge.Action[PanicEvent]) EventListener { + listener.Lock() + defer listener.Unlock() + listener.onPanic = ge.RemoveElementByValue(listener.onPanic, action) return listener } @@ -68,11 +108,11 @@ func (listener *eventListener) consumeEvent(event Event) { defer listener.RUnlock() switch e := event.(type) { case *successEvent: - ge.ConsumeEach(listener.onSuccess, SuccessEvent(e)) + ge.ForEach(listener.onSuccess, SuccessEvent(e)) case *timeoutEvent: - ge.ConsumeEach(listener.onTimeout, TimeoutEvent(e)) + ge.ForEach(listener.onTimeout, TimeoutEvent(e)) case *panicEvent: - ge.ConsumeEach(listener.onPanic, PanicEvent(e)) + ge.ForEach(listener.onPanic, PanicEvent(e)) } }() } diff --git a/timelimiter/timelimiter.go b/timelimiter/timelimiter.go index 8d7e17a..97aaf2d 100644 --- a/timelimiter/timelimiter.go +++ b/timelimiter/timelimiter.go @@ -10,8 +10,7 @@ type TimeLimiter interface { Name() string Metrics() Metrics EventListener() EventListener - - execute(func() (any, error)) (any, error) + Execute(func() (any, error)) (any, error) } func NewTimeLimiter(name string, configs ...ConfigBuilder) TimeLimiter { @@ -48,7 +47,7 @@ func (limiter *timeLimiter) EventListener() EventListener { return limiter.eventListener } -func (limiter *timeLimiter) execute(fn func() (any, error)) (any, error) { +func (limiter *timeLimiter) Execute(fn func() (any, error)) (any, error) { timeout, cancelFunc := context.WithTimeout(limiter.rootContext, limiter.config.timeoutDuration) defer cancelFunc() finished := make(chan *channelValue) diff --git a/timelimiter/timelimiter_test.go b/timelimiter/timelimiter_test.go index edbac07..674a431 100644 --- a/timelimiter/timelimiter_test.go +++ b/timelimiter/timelimiter_test.go @@ -43,7 +43,7 @@ func TestTimeLimiterPublishEvents(t *testing.T) { t.Errorf("Expected event message '%s', but got '%s'", expectedMsg, event) } } - eventListener.OnSuccess(onSuccess).OnTimeout(onTimeout).OnPanic(onPanic) + eventListener.OnSuccessFunc(onSuccess).OnTimeoutFunc(onTimeout).OnPanicFunc(onPanic) // 创建一个可运行的函数 fn := func() error { @@ -100,5 +100,5 @@ func TestTimeLimiterPublishEvents(t *testing.T) { if metrics.PanicCount() != 1 { t.Errorf("Expected 1 panic call, but got '%d'", metrics.PanicCount()) } - eventListener.Dismiss(onSuccess).Dismiss(onTimeout).Dismiss(onPanic) + eventListener.DismissSuccessFunc(onSuccess).DismissTimeoutFunc(onTimeout).DismissPanicFunc(onPanic) }