Skip to content

Commit

Permalink
Upgrade event listeners, some refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
CharLemAznable committed Dec 19, 2023
1 parent 4e8c58d commit baa708d
Show file tree
Hide file tree
Showing 25 changed files with 498 additions and 248 deletions.
9 changes: 4 additions & 5 deletions bulkhead/bulkhead.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ type Bulkhead interface {
Name() string
Metrics() Metrics
EventListener() EventListener

acquire() error
release()
Acquire() error
Release()
}

func NewBulkhead(name string, configs ...ConfigBuilder) Bulkhead {
Expand Down Expand Up @@ -51,7 +50,7 @@ func (bulkhead *semaphoreBulkhead) EventListener() EventListener {
return bulkhead.eventListener
}

func (bulkhead *semaphoreBulkhead) acquire() error {
func (bulkhead *semaphoreBulkhead) Acquire() error {
permitted := func() bool {
timeout, cancelFn := context.WithTimeout(
bulkhead.rootContext,
Expand All @@ -75,7 +74,7 @@ func (bulkhead *semaphoreBulkhead) acquire() error {
return &FullError{name: bulkhead.name}
}

func (bulkhead *semaphoreBulkhead) release() {
func (bulkhead *semaphoreBulkhead) Release() {
bulkhead.semaphore.Release(1)
bulkhead.metrics.release(1)
bulkhead.eventListener.consumeEvent(newFinishedEvent(bulkhead.name))
Expand Down
4 changes: 2 additions & 2 deletions bulkhead/bulkhead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestBulkheadPublishEvents(t *testing.T) {
}
finished.Add(1)
}
eventListener.OnPermitted(onPermitted).OnRejected(onRejected).OnFinished(onFinished)
eventListener.OnPermittedFunc(onPermitted).OnRejectedFunc(onRejected).OnFinishedFunc(onFinished)

// 创建一个可运行的函数
fn := func() error {
Expand Down Expand Up @@ -86,5 +86,5 @@ func TestBulkheadPublishEvents(t *testing.T) {
if finished.Load() != 1 {
t.Errorf("Expected 1 finished call, but got '%d'", finished.Load())
}
eventListener.Dismiss(onPermitted).Dismiss(onRejected).Dismiss(onFinished)
eventListener.DismissPermittedFunc(onPermitted).DismissRejectedFunc(onRejected).DismissFinishedFunc(onFinished)
}
16 changes: 8 additions & 8 deletions bulkhead/decorate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,40 @@ import "github.com/CharLemAznable/ge"

func DecorateRunnable(bulkhead Bulkhead, fn func() error) func() error {
return func() error {
if err := bulkhead.acquire(); err != nil {
if err := bulkhead.Acquire(); err != nil {
return err
}
defer bulkhead.release()
defer bulkhead.Release()
return fn()
}
}

func DecorateSupplier[T any](bulkhead Bulkhead, fn func() (T, error)) func() (T, error) {
return func() (T, error) {
if err := bulkhead.acquire(); err != nil {
if err := bulkhead.Acquire(); err != nil {
return ge.Zero[T](), err
}
defer bulkhead.release()
defer bulkhead.Release()
return fn()
}
}

func DecorateConsumer[T any](bulkhead Bulkhead, fn func(T) error) func(T) error {
return func(t T) error {
if err := bulkhead.acquire(); err != nil {
if err := bulkhead.Acquire(); err != nil {
return err
}
defer bulkhead.release()
defer bulkhead.Release()
return fn(t)
}
}

func DecorateFunction[T any, R any](bulkhead Bulkhead, fn func(T) (R, error)) func(T) (R, error) {
return func(t T) (R, error) {
if err := bulkhead.acquire(); err != nil {
if err := bulkhead.Acquire(); err != nil {
return ge.Zero[R](), err
}
defer bulkhead.release()
defer bulkhead.Release()
return fn(t)
}
}
96 changes: 68 additions & 28 deletions bulkhead/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,99 @@ import (
)

type EventListener interface {
OnPermitted(func(PermittedEvent)) EventListener
OnRejected(func(RejectedEvent)) EventListener
OnFinished(func(FinishedEvent)) EventListener
Dismiss(any) EventListener
OnPermittedFunc(func(PermittedEvent)) EventListener
OnRejectedFunc(func(RejectedEvent)) EventListener
OnFinishedFunc(func(FinishedEvent)) EventListener
DismissPermittedFunc(func(PermittedEvent)) EventListener
DismissRejectedFunc(func(RejectedEvent)) EventListener
DismissFinishedFunc(func(FinishedEvent)) EventListener

OnPermitted(ge.Action[PermittedEvent]) EventListener
OnRejected(ge.Action[RejectedEvent]) EventListener
OnFinished(ge.Action[FinishedEvent]) EventListener
DismissPermitted(ge.Action[PermittedEvent]) EventListener
DismissRejected(ge.Action[RejectedEvent]) EventListener
DismissFinished(ge.Action[FinishedEvent]) EventListener
}

func newEventListener() *eventListener {
return &eventListener{
onPermitted: make([]func(PermittedEvent), 0),
onRejected: make([]func(RejectedEvent), 0),
onFinished: make([]func(FinishedEvent), 0),
onPermitted: make([]ge.Action[PermittedEvent], 0),
onRejected: make([]ge.Action[RejectedEvent], 0),
onFinished: make([]ge.Action[FinishedEvent], 0),
}
}

type eventListener struct {
sync.RWMutex
onPermitted []func(PermittedEvent)
onRejected []func(RejectedEvent)
onFinished []func(FinishedEvent)
onPermitted []ge.Action[PermittedEvent]
onRejected []ge.Action[RejectedEvent]
onFinished []ge.Action[FinishedEvent]
}

func (listener *eventListener) OnPermittedFunc(consumer func(PermittedEvent)) EventListener {
return listener.OnPermitted(ge.ActionFunc[PermittedEvent](consumer))
}

func (listener *eventListener) OnRejectedFunc(consumer func(RejectedEvent)) EventListener {
return listener.OnRejected(ge.ActionFunc[RejectedEvent](consumer))
}

func (listener *eventListener) OnFinishedFunc(consumer func(FinishedEvent)) EventListener {
return listener.OnFinished(ge.ActionFunc[FinishedEvent](consumer))
}

func (listener *eventListener) DismissPermittedFunc(consumer func(PermittedEvent)) EventListener {
return listener.DismissPermitted(ge.ActionFunc[PermittedEvent](consumer))
}

func (listener *eventListener) DismissRejectedFunc(consumer func(RejectedEvent)) EventListener {
return listener.DismissRejected(ge.ActionFunc[RejectedEvent](consumer))
}

func (listener *eventListener) DismissFinishedFunc(consumer func(FinishedEvent)) EventListener {
return listener.DismissFinished(ge.ActionFunc[FinishedEvent](consumer))
}

func (listener *eventListener) OnPermitted(consumer func(PermittedEvent)) EventListener {
func (listener *eventListener) OnPermitted(action ge.Action[PermittedEvent]) EventListener {
listener.Lock()
defer listener.Unlock()
listener.onPermitted = ge.AppendElementUnique(listener.onPermitted, consumer)
listener.onPermitted = ge.AppendElementUnique(listener.onPermitted, action)
return listener
}

func (listener *eventListener) OnRejected(consumer func(RejectedEvent)) EventListener {
func (listener *eventListener) OnRejected(action ge.Action[RejectedEvent]) EventListener {
listener.Lock()
defer listener.Unlock()
listener.onRejected = ge.AppendElementUnique(listener.onRejected, consumer)
listener.onRejected = ge.AppendElementUnique(listener.onRejected, action)
return listener
}

func (listener *eventListener) OnFinished(consumer func(FinishedEvent)) EventListener {
func (listener *eventListener) OnFinished(action ge.Action[FinishedEvent]) EventListener {
listener.Lock()
defer listener.Unlock()
listener.onFinished = ge.AppendElementUnique(listener.onFinished, consumer)
listener.onFinished = ge.AppendElementUnique(listener.onFinished, action)
return listener
}

func (listener *eventListener) Dismiss(consumer any) EventListener {
func (listener *eventListener) DismissPermitted(action ge.Action[PermittedEvent]) EventListener {
listener.Lock()
defer listener.Unlock()
switch c := consumer.(type) {
case func(PermittedEvent):
listener.onPermitted = ge.RemoveElementByValue(listener.onPermitted, c)
case func(RejectedEvent):
listener.onRejected = ge.RemoveElementByValue(listener.onRejected, c)
case func(FinishedEvent):
listener.onFinished = ge.RemoveElementByValue(listener.onFinished, c)
}
listener.onPermitted = ge.RemoveElementByValue(listener.onPermitted, action)
return listener
}

func (listener *eventListener) DismissRejected(action ge.Action[RejectedEvent]) EventListener {
listener.Lock()
defer listener.Unlock()
listener.onRejected = ge.RemoveElementByValue(listener.onRejected, action)
return listener
}

func (listener *eventListener) DismissFinished(action ge.Action[FinishedEvent]) EventListener {
listener.Lock()
defer listener.Unlock()
listener.onFinished = ge.RemoveElementByValue(listener.onFinished, action)
return listener
}

Expand All @@ -68,11 +108,11 @@ func (listener *eventListener) consumeEvent(event Event) {
defer listener.RUnlock()
switch e := event.(type) {
case *permittedEvent:
ge.ConsumeEach(listener.onPermitted, PermittedEvent(e))
ge.ForEach(listener.onPermitted, PermittedEvent(e))
case *rejectedEvent:
ge.ConsumeEach(listener.onRejected, RejectedEvent(e))
ge.ForEach(listener.onRejected, RejectedEvent(e))
case *finishedEvent:
ge.ConsumeEach(listener.onFinished, FinishedEvent(e))
ge.ForEach(listener.onFinished, FinishedEvent(e))
}
}()
}
2 changes: 0 additions & 2 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ type Cache[K any, V any] interface {
Name() string
Metrics() Metrics
EventListener() EventListener

WithMarshalFn(func(V) any, func(any) V) Cache[K, V]

GetOrLoad(key K, loader func(K) (V, error)) (V, error)
}

Expand Down
4 changes: 2 additions & 2 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestCache(t *testing.T) {
}
misses.Add(1)
}
eventListener.OnCacheHit(onCacheHit).OnCacheMiss(onCacheMiss)
eventListener.OnCacheHitFunc(onCacheHit).OnCacheMissFunc(onCacheMiss)

// fail with no error, max retries exceeded
fn := func(key string) (string, error) {
Expand Down Expand Up @@ -92,5 +92,5 @@ func TestCache(t *testing.T) {
if misses.Load() != 2 {
t.Errorf("Expected 2 miss calls, but got '%d'", misses.Load())
}
eventListener.Dismiss(onCacheHit).Dismiss(onCacheMiss)
eventListener.DismissCacheHitFunc(onCacheHit).DismissCacheMissFunc(onCacheMiss)
}
60 changes: 42 additions & 18 deletions cache/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,71 @@ import (
)

type EventListener interface {
OnCacheHit(func(HitEvent)) EventListener
OnCacheMiss(func(MissEvent)) EventListener
Dismiss(any) EventListener
OnCacheHitFunc(func(HitEvent)) EventListener
OnCacheMissFunc(func(MissEvent)) EventListener
DismissCacheHitFunc(func(HitEvent)) EventListener
DismissCacheMissFunc(func(MissEvent)) EventListener

OnCacheHit(ge.Action[HitEvent]) EventListener
OnCacheMiss(ge.Action[MissEvent]) EventListener
DismissCacheHit(ge.Action[HitEvent]) EventListener
DismissCacheMiss(ge.Action[MissEvent]) EventListener
}

func newEventListener() *eventListener {
return &eventListener{
onCacheHit: make([]func(HitEvent), 0),
onCacheMiss: make([]func(MissEvent), 0),
onCacheHit: make([]ge.Action[HitEvent], 0),
onCacheMiss: make([]ge.Action[MissEvent], 0),
}
}

type eventListener struct {
sync.RWMutex
onCacheHit []func(HitEvent)
onCacheMiss []func(MissEvent)
onCacheHit []ge.Action[HitEvent]
onCacheMiss []ge.Action[MissEvent]
}

func (listener *eventListener) OnCacheHitFunc(consumer func(HitEvent)) EventListener {
return listener.OnCacheHit(ge.ActionFunc[HitEvent](consumer))
}

func (listener *eventListener) OnCacheMissFunc(consumer func(MissEvent)) EventListener {
return listener.OnCacheMiss(ge.ActionFunc[MissEvent](consumer))
}

func (listener *eventListener) DismissCacheHitFunc(consumer func(HitEvent)) EventListener {
return listener.DismissCacheHit(ge.ActionFunc[HitEvent](consumer))
}

func (listener *eventListener) OnCacheHit(consumer func(HitEvent)) EventListener {
func (listener *eventListener) DismissCacheMissFunc(consumer func(MissEvent)) EventListener {
return listener.DismissCacheMiss(ge.ActionFunc[MissEvent](consumer))
}

func (listener *eventListener) OnCacheHit(consumer ge.Action[HitEvent]) EventListener {
listener.Lock()
defer listener.Unlock()
listener.onCacheHit = ge.AppendElementUnique(listener.onCacheHit, consumer)
return listener
}

func (listener *eventListener) OnCacheMiss(consumer func(MissEvent)) EventListener {
func (listener *eventListener) OnCacheMiss(consumer ge.Action[MissEvent]) EventListener {
listener.Lock()
defer listener.Unlock()
listener.onCacheMiss = ge.AppendElementUnique(listener.onCacheMiss, consumer)
return listener
}

func (listener *eventListener) Dismiss(consumer any) EventListener {
func (listener *eventListener) DismissCacheHit(consumer ge.Action[HitEvent]) EventListener {
listener.Lock()
defer listener.Unlock()
switch c := consumer.(type) {
case func(HitEvent):
listener.onCacheHit = ge.RemoveElementByValue(listener.onCacheHit, c)
case func(MissEvent):
listener.onCacheMiss = ge.RemoveElementByValue(listener.onCacheMiss, c)
}
listener.onCacheHit = ge.RemoveElementByValue(listener.onCacheHit, consumer)
return listener
}

func (listener *eventListener) DismissCacheMiss(consumer ge.Action[MissEvent]) EventListener {
listener.Lock()
defer listener.Unlock()
listener.onCacheMiss = ge.RemoveElementByValue(listener.onCacheMiss, consumer)
return listener
}

Expand All @@ -56,9 +80,9 @@ func (listener *eventListener) consumeEvent(event Event) {
defer listener.RUnlock()
switch e := event.(type) {
case *hitEvent:
ge.ConsumeEach(listener.onCacheHit, HitEvent(e))
ge.ForEach(listener.onCacheHit, HitEvent(e))
case *missEvent:
ge.ConsumeEach(listener.onCacheMiss, MissEvent(e))
ge.ForEach(listener.onCacheMiss, MissEvent(e))
}
}()
}
14 changes: 7 additions & 7 deletions circuitbreaker/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ type CircuitBreaker interface {
TransitionToClosedState() error
TransitionToOpenState() error
TransitionToHalfOpenState() error
Execute(func() (any, error)) (any, error)

config() *Config
execute(func() (any, error)) (any, error)
acquirePermission() error
publishThresholdsExceededEvent(metricsResult, Metrics)
config() *Config
}

func NewCircuitBreaker(name string, configs ...ConfigBuilder) CircuitBreaker {
Expand Down Expand Up @@ -108,11 +108,7 @@ func (machine *stateMachine) stateTransition(newStateName State, generator func(
return err
}

func (machine *stateMachine) config() *Config {
return machine.conf
}

func (machine *stateMachine) execute(fn func() (any, error)) (any, error) {
func (machine *stateMachine) Execute(fn func() (any, error)) (any, error) {
if err := machine.acquirePermission(); err != nil {
machine.publishEvent(newNotPermittedEvent(machine.name))
return nil, err
Expand Down Expand Up @@ -166,6 +162,10 @@ func (machine *stateMachine) publishThresholdsExceededEvent(result metricsResult
}
}

func (machine *stateMachine) config() *Config {
return machine.conf
}

func (machine *stateMachine) loadState() *state {
return machine.state.Load()
}
Expand Down

0 comments on commit baa708d

Please sign in to comment.