From c020267c7b650560d37f471a610f89c1c93e64fa Mon Sep 17 00:00:00 2001 From: Inhere Date: Fri, 9 Jun 2023 14:53:15 +0800 Subject: [PATCH] :sparkles: feat: support async consume events and wait complete - add some new global func - add more unit tests --- event.go | 7 +- event_all_test.go | 442 ---------------------------------------------- event_test.go | 121 +++++++++++++ manager.go | 79 +++++++-- manager_test.go | 231 ++++++++++++++++++++++++ std.go | 74 +++++--- std_test.go | 183 +++++++++++++++++++ 7 files changed, 653 insertions(+), 484 deletions(-) delete mode 100644 event_all_test.go create mode 100644 event_test.go diff --git a/event.go b/event.go index bed0bcc..241e287 100644 --- a/event.go +++ b/event.go @@ -87,7 +87,7 @@ type Cloneable interface { // FactoryFunc for create event instance. type FactoryFunc func() Event -// BasicEvent a basic event struct define. +// BasicEvent a built-in implements Event interface type BasicEvent struct { // event name name string @@ -99,6 +99,11 @@ type BasicEvent struct { aborted bool } +// New create an event instance +func New(name string, data M) *BasicEvent { + return NewBasic(name, data) +} + // NewBasic new a basic event instance func NewBasic(name string, data M) *BasicEvent { if data == nil { diff --git a/event_all_test.go b/event_all_test.go deleted file mode 100644 index 73e7017..0000000 --- a/event_all_test.go +++ /dev/null @@ -1,442 +0,0 @@ -package event - -import ( - "bytes" - "fmt" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -var emptyListener = func(e Event) error { - return nil -} - -type testListener struct { - userData string -} - -func (l *testListener) Handle(e Event) error { - if ret := e.Get("result"); ret != nil { - str := ret.(string) + fmt.Sprintf(" -> %s(%s)", e.Name(), l.userData) - e.Set("result", str) - } else { - e.Set("result", fmt.Sprintf("handled: %s(%s)", e.Name(), l.userData)) - } - return nil -} - -func TestEvent(t *testing.T) { - e := &BasicEvent{} - e.SetName("n1") - e.SetData(M{ - "arg0": "val0", - }) - e.SetTarget("tgt") - - e.Add("arg1", "val1") - - assert.False(t, e.IsAborted()) - e.Abort(true) - assert.True(t, e.IsAborted()) - - assert.Equal(t, "n1", e.Name()) - assert.Equal(t, "tgt", e.Target()) - assert.Contains(t, e.Data(), "arg1") - assert.Equal(t, "val0", e.Get("arg0")) - assert.Equal(t, nil, e.Get("not-exist")) - - e.Set("arg1", "new val") - assert.Equal(t, "new val", e.Get("arg1")) - - e1 := &BasicEvent{} - e1.Set("k", "v") - assert.Equal(t, "v", e1.Get("k")) - assert.NotEmpty(t, e1.Clone()) -} - -func TestAddEvent(t *testing.T) { - defer Reset() - DefaultEM.RemoveEvents() - - // no name - assert.Panics(t, func() { - AddEvent(&BasicEvent{}) - }) - - _, ok := GetEvent("evt1") - assert.False(t, ok) - - // AddEvent - e := NewBasic("evt1", M{"k1": "inhere"}) - AddEvent(e) - // add by AttachTo - NewBasic("evt2", nil).AttachTo(DefaultEM) - - assert.False(t, e.IsAborted()) - assert.True(t, HasEvent("evt1")) - assert.True(t, HasEvent("evt2")) - assert.False(t, HasEvent("not-exist")) - - // GetEvent - r1, ok := GetEvent("evt1") - assert.True(t, ok) - assert.Equal(t, e, r1) - - // RemoveEvent - DefaultEM.RemoveEvent("evt2") - assert.False(t, HasEvent("evt2")) - - // RemoveEvents - DefaultEM.RemoveEvents() - assert.False(t, HasEvent("evt1")) -} - -func TestFire(t *testing.T) { - buf := new(bytes.Buffer) - fn := func(e Event) error { - _, _ = fmt.Fprintf(buf, "event: %s", e.Name()) - return nil - } - - On("evt1", ListenerFunc(fn), 0) - On("evt1", ListenerFunc(emptyListener), High) - assert.True(t, HasListeners("evt1")) - - err, e := Fire("evt1", nil) - assert.NoError(t, err) - assert.Equal(t, "evt1", e.Name()) - assert.Equal(t, "event: evt1", buf.String()) - - NewBasic("evt2", nil).AttachTo(DefaultEM) - On("evt2", ListenerFunc(func(e Event) error { - assert.Equal(t, "evt2", e.Name()) - assert.Equal(t, "v", e.Get("k")) - return nil - }), AboveNormal) - - assert.True(t, HasListeners("evt2")) - err, e = Trigger("evt2", M{"k": "v"}) - assert.NoError(t, err) - assert.Equal(t, "evt2", e.Name()) - assert.Equal(t, map[string]any{"k": "v"}, e.Data()) - - // clear all - Reset() - assert.False(t, HasListeners("evt1")) - assert.False(t, HasListeners("evt2")) - - err, e = Fire("not-exist", nil) - assert.NoError(t, err) - assert.NotEmpty(t, e) -} - -func TestFireEvent(t *testing.T) { - defer Reset() - buf := new(bytes.Buffer) - - evt1 := NewBasic("evt1", nil).Fill(nil, M{"n": "inhere"}) - AddEvent(evt1) - - assert.True(t, HasEvent("evt1")) - assert.False(t, HasEvent("not-exist")) - - Listen("evt1", ListenerFunc(func(e Event) error { - _, _ = fmt.Fprintf(buf, "event: %s, params: n=%s", e.Name(), e.Get("n")) - return nil - }), Normal) - - assert.True(t, HasListeners("evt1")) - assert.False(t, HasListeners("not-exist")) - - err := FireEvent(evt1) - assert.NoError(t, err) - assert.Equal(t, "event: evt1, params: n=inhere", buf.String()) - buf.Reset() - - err = TriggerEvent(evt1) - assert.NoError(t, err) - assert.Equal(t, "event: evt1, params: n=inhere", buf.String()) - buf.Reset() - - AsyncFire(evt1) - time.Sleep(time.Second) - assert.Equal(t, "event: evt1, params: n=inhere", buf.String()) -} - -func TestMustFire(t *testing.T) { - defer Reset() - - On("n1", ListenerFunc(func(e Event) error { - return fmt.Errorf("an error") - }), Max) - On("n2", ListenerFunc(emptyListener), Min) - - assert.Panics(t, func() { - _ = MustFire("n1", nil) - }) - - assert.NotPanics(t, func() { - _ = MustTrigger("n2", nil) - }) -} - -func TestManager_FireEvent(t *testing.T) { - em := NewManager("test") - em.EnableLock = true - - e1 := NewBasic("e1", nil) - em.AddEvent(e1) - - em.On("e1", &testListener{"HI"}, Min) - em.On("e1", &testListener{"WEL"}, High) - em.AddListener("e1", &testListener{"COM"}, BelowNormal) - - err := em.FireEvent(e1) - assert.NoError(t, err) - assert.Equal(t, "handled: e1(WEL) -> e1(COM) -> e1(HI)", e1.Get("result")) - - // not exist - err = em.FireEvent(e1.SetName("e2")) - assert.NoError(t, err) - - em.Clear() -} - -func TestManager_FireEvent2(t *testing.T) { - buf := new(bytes.Buffer) - mgr := NewManager("test") - - evt1 := NewBasic("evt1", nil).Fill(nil, M{"n": "inhere"}) - mgr.AddEvent(evt1) - - assert.True(t, mgr.HasEvent("evt1")) - assert.False(t, mgr.HasEvent("not-exist")) - - mgr.On("evt1", ListenerFunc(func(e Event) error { - _, _ = fmt.Fprintf(buf, "event: %s, params: n=%s", e.Name(), e.Get("n")) - return nil - }), Normal) - - assert.True(t, mgr.HasListeners("evt1")) - assert.False(t, mgr.HasListeners("not-exist")) - - err := mgr.FireEvent(evt1) - assert.NoError(t, err) - assert.Equal(t, "event: evt1, params: n=inhere", buf.String()) - buf.Reset() - - mgr.On(Wildcard, ListenerFunc(func(e Event) error { - buf.WriteString("|Wildcard handler") - return nil - })) - - err = mgr.FireEvent(evt1) - assert.NoError(t, err) - assert.Equal(t, "event: evt1, params: n=inhere|Wildcard handler", buf.String()) -} - -func TestManager_Fire_WithWildcard(t *testing.T) { - buf := new(bytes.Buffer) - mgr := NewManager("test") - - const Event2FurcasTicketCreate = "kapal.furcas.ticket.create" - - handler := ListenerFunc(func(e Event) error { - _, _ = fmt.Fprintf(buf, "%s-%s|", e.Name(), e.Get("user")) - return nil - }) - - mgr.On("kapal.furcas.ticket.*", handler) - mgr.On(Event2FurcasTicketCreate, handler) - - err, _ := mgr.Fire(Event2FurcasTicketCreate, M{"user": "inhere"}) - assert.NoError(t, err) - assert.Equal( - t, - "kapal.furcas.ticket.create-inhere|kapal.furcas.ticket.create-inhere|", - buf.String(), - ) - buf.Reset() - - // add Wildcard listen - mgr.On("*", handler) - - err, _ = mgr.Trigger(Event2FurcasTicketCreate, M{"user": "inhere"}) - assert.NoError(t, err) - assert.Equal( - t, - "kapal.furcas.ticket.create-inhere|kapal.furcas.ticket.create-inhere|kapal.furcas.ticket.create-inhere|", - buf.String(), - ) -} - -func TestListenGroupEvent(t *testing.T) { - em := NewManager("test") - - e1 := NewBasic("app.evt1", M{"buf": new(bytes.Buffer)}) - e1.AttachTo(em) - - l2 := ListenerFunc(func(e Event) error { - e.Get("buf").(*bytes.Buffer).WriteString(" > 2 " + e.Name()) - return nil - }) - l3 := ListenerFunc(func(e Event) error { - e.Get("buf").(*bytes.Buffer).WriteString(" > 3 " + e.Name()) - return nil - }) - - em.On("app.evt1", ListenerFunc(func(e Event) error { - e.Get("buf").(*bytes.Buffer).WriteString("Hi > 1 " + e.Name()) - return nil - })) - em.On("app.*", l2) - em.On("*", l3) - - buf := e1.Get("buf").(*bytes.Buffer) - err, e := em.Fire("app.evt1", nil) - assert.NoError(t, err) - assert.Equal(t, e1, e) - assert.Equal(t, "Hi > 1 app.evt1 > 2 app.evt1 > 3 app.evt1", buf.String()) - - em.RemoveListener("app.*", l2) - assert.Len(t, em.ListenedNames(), 2) - em.On("app.*", ListenerFunc(func(e Event) error { - return fmt.Errorf("an error") - })) - - buf.Reset() - err, e = em.Fire("app.evt1", nil) - assert.Error(t, err) - assert.Equal(t, "Hi > 1 app.evt1", buf.String()) - - em.RemoveListeners("app.*") - em.RemoveListener("", l3) - em.On("app.*", l2) // re-add - em.On("*", ListenerFunc(func(e Event) error { - return fmt.Errorf("an error") - })) - assert.Len(t, em.ListenedNames(), 3) - - buf.Reset() - err, e = em.Trigger("app.evt1", nil) - assert.Error(t, err) - assert.Equal(t, e1, e) - assert.Equal(t, "Hi > 1 app.evt1 > 2 app.evt1", buf.String()) - - em.RemoveListener("", nil) - - // clear - em.Clear() - buf.Reset() -} - -func TestManager_AsyncFire(t *testing.T) { - em := NewManager("test") - em.On("e1", ListenerFunc(func(e Event) error { - assert.Equal(t, map[string]any{"k": "v"}, e.Data()) - e.Set("nk", "nv") - return nil - })) - - e1 := NewBasic("e1", M{"k": "v"}) - em.AsyncFire(e1) - time.Sleep(time.Second / 10) - assert.Equal(t, "nv", e1.Get("nk")) - - var wg sync.WaitGroup - em.On("e2", ListenerFunc(func(e Event) error { - defer wg.Done() - assert.Equal(t, "v", e.Get("k")) - return nil - })) - - wg.Add(1) - em.AsyncFire(e1.SetName("e2")) - wg.Wait() - - em.Clear() -} - -func TestManager_AwaitFire(t *testing.T) { - em := NewManager("test") - em.On("e1", ListenerFunc(func(e Event) error { - assert.Equal(t, map[string]any{"k": "v"}, e.Data()) - e.Set("nk", "nv") - return nil - })) - - e1 := NewBasic("e1", M{"k": "v"}) - err := em.AwaitFire(e1) - - assert.NoError(t, err) - assert.Contains(t, e1.Data(), "nk") - assert.Equal(t, "nv", e1.Get("nk")) -} - -type testSubscriber struct { - // ooo -} - -func (s *testSubscriber) SubscribedEvents() map[string]any { - return map[string]any{ - "e1": ListenerFunc(s.e1Handler), - "e2": ListenerItem{ - Priority: AboveNormal, - Listener: ListenerFunc(func(e Event) error { - return fmt.Errorf("an error") - }), - }, - "e3": &testListener{}, - } -} - -func (s *testSubscriber) e1Handler(e Event) error { - e.Set("e1-key", "val1") - return nil -} - -type testSubscriber2 struct{} - -func (s testSubscriber2) SubscribedEvents() map[string]any { - return map[string]any{ - "e1": "invalid", - } -} - -func TestAddSubscriber(t *testing.T) { - AddSubscriber(&testSubscriber{}) - - assert.True(t, HasListeners("e1")) - assert.True(t, HasListeners("e2")) - assert.True(t, HasListeners("e3")) - - ers := FireBatch("e1", NewBasic("e2", nil)) - assert.Len(t, ers, 1) - - assert.Panics(t, func() { - Subscribe(testSubscriber2{}) - }) - - Reset() -} - -func TestManager_AddSubscriber(t *testing.T) { - em := NewManager("test") - em.AddSubscriber(&testSubscriber{}) - - assert.True(t, em.HasListeners("e1")) - assert.True(t, em.HasListeners("e2")) - assert.True(t, em.HasListeners("e3")) - - ers := em.FireBatch("e1", NewBasic("e2", nil)) - assert.Len(t, ers, 1) - - assert.Panics(t, func() { - em.AddSubscriber(testSubscriber2{}) - }) - - em.Clear() -} diff --git a/event_test.go b/event_test.go new file mode 100644 index 0000000..6543d45 --- /dev/null +++ b/event_test.go @@ -0,0 +1,121 @@ +package event_test + +import ( + "fmt" + "testing" + + "github.com/gookit/event" + "github.com/stretchr/testify/assert" +) + +type testListener struct { + userData string +} + +func (l *testListener) Handle(e event.Event) error { + if ret := e.Get("result"); ret != nil { + str := ret.(string) + fmt.Sprintf(" -> %s(%s)", e.Name(), l.userData) + e.Set("result", str) + } else { + e.Set("result", fmt.Sprintf("handled: %s(%s)", e.Name(), l.userData)) + } + return nil +} + +type testSubscriber struct { + // ooo +} + +func (s *testSubscriber) SubscribedEvents() map[string]any { + return map[string]any{ + "e1": event.ListenerFunc(s.e1Handler), + "e2": event.ListenerItem{ + Priority: event.AboveNormal, + Listener: event.ListenerFunc(func(e event.Event) error { + return fmt.Errorf("an error") + }), + }, + "e3": &testListener{}, + } +} + +func (s *testSubscriber) e1Handler(e event.Event) error { + e.Set("e1-key", "val1") + return nil +} + +type testSubscriber2 struct{} + +func (s testSubscriber2) SubscribedEvents() map[string]any { + return map[string]any{ + "e1": "invalid", + } +} + +type testEvent struct { + name string + data map[string]any + abort bool +} + +func (t *testEvent) Name() string { + return t.name +} + +func (t *testEvent) Get(key string) any { + return t.data[key] +} + +func (t *testEvent) Set(key string, val any) { + t.data[key] = val +} + +func (t *testEvent) Add(key string, val any) { + t.data[key] = val +} + +func (t *testEvent) Data() map[string]any { + return t.data +} + +func (t *testEvent) SetData(m event.M) event.Event { + t.data = m + return t +} + +func (t *testEvent) Abort(b bool) { + t.abort = b +} + +func (t *testEvent) IsAborted() bool { + return t.abort +} + +func TestEvent(t *testing.T) { + e := &event.BasicEvent{} + e.SetName("n1") + e.SetData(event.M{ + "arg0": "val0", + }) + e.SetTarget("tgt") + + e.Add("arg1", "val1") + + assert.False(t, e.IsAborted()) + e.Abort(true) + assert.True(t, e.IsAborted()) + + assert.Equal(t, "n1", e.Name()) + assert.Equal(t, "tgt", e.Target()) + assert.Contains(t, e.Data(), "arg1") + assert.Equal(t, "val0", e.Get("arg0")) + assert.Equal(t, nil, e.Get("not-exist")) + + e.Set("arg1", "new val") + assert.Equal(t, "new val", e.Get("arg1")) + + e1 := &event.BasicEvent{} + e1.Set("k", "v") + assert.Equal(t, "v", e1.Get("k")) + assert.NotEmpty(t, e1.Clone()) +} diff --git a/manager.go b/manager.go index d43c8b4..68f3898 100644 --- a/manager.go +++ b/manager.go @@ -1,6 +1,7 @@ package event import ( + "fmt" "reflect" "strings" "sync" @@ -8,7 +9,7 @@ import ( const ( defaultChannelSize = 100 - defaultConsumerNum = 5 + defaultConsumerNum = 3 ) // Manager event manager definition. for manage events and listeners @@ -16,8 +17,10 @@ type Manager struct { Options sync.Mutex - ch chan Event - once sync.Once + wg sync.WaitGroup + ch chan Event + oc sync.Once + err error // latest error // name of the manager name string @@ -36,6 +39,11 @@ type Manager struct { listenedNames map[string]int } +// NewM create event manager. alias of the NewManager() +func NewM(name string, fns ...OptionFn) *Manager { + return NewManager(name, fns...) +} + // NewManager create event manager func NewManager(name string, fns ...OptionFn) *Manager { em := &Manager{ @@ -68,12 +76,12 @@ func (em *Manager) WithOptions(fns ...OptionFn) *Manager { * -- register listeners *************************************************************/ -// AddListener alias of the method On() +// AddListener register a event handler/listener. alias of the method On() func (em *Manager) AddListener(name string, listener Listener, priority ...int) { em.On(name, listener, priority...) } -// Listen alias of the On() +// Listen register a event handler/listener. alias of the On() func (em *Manager) Listen(name string, listener Listener, priority ...int) { em.On(name, listener, priority...) } @@ -93,14 +101,15 @@ func (em *Manager) On(name string, listener Listener, priority ...int) { em.addListenerItem(name, &ListenerItem{pv, listener}) } -// Subscribe alias of the AddSubscriber() +// Subscribe add events by subscriber interface. alias of the AddSubscriber() func (em *Manager) Subscribe(sbr Subscriber) { em.AddSubscriber(sbr) } // AddSubscriber add events by subscriber interface. +// // you can register multi event listeners in a struct func. -// more usage please see README or test. +// more usage please see README or tests. func (em *Manager) AddSubscriber(sbr Subscriber) { for name, listener := range sbr.SubscribedEvents() { switch lt := listener.(type) { @@ -122,7 +131,7 @@ func (em *Manager) addListenerItem(name string, li *ListenerItem) { panicf("event: the event %q listener cannot be empty", name) } if reflect.ValueOf(li.Listener).Kind() == reflect.Struct { - panicf("event: %q - listener should be pointer of Listener", name) + panicf("event: %q - struct listener must be pointer", name) } // exists, append it. @@ -165,11 +174,17 @@ func (em *Manager) Fire(name string, params M) (err error, e Event) { } // Async fire event by go channel. +// +// Note: if you want to use this method, you should +// call the method Close() after all events are fired. func (em *Manager) Async(name string, params M) { _, _ = em.fireByName(name, params, true) } // FireC async fire event by go channel. alias of the method Async() +// +// Note: if you want to use this method, you should +// call the method Close() after all events are fired. func (em *Manager) FireC(name string, params M) { _, _ = em.fireByName(name, params, true) } @@ -295,9 +310,19 @@ func (em *Manager) firePathMode(name string, e Event) (err error) { return nil } +/************************************************************* + * Fire by channel + *************************************************************/ + // FireAsync async fire event by go channel. // -// Note: if you want to use this method, you should call the method Close() after all events are fired. +// Note: if you want to use this method, you should +// call the method Close() after all events are fired. +// +// Example: +// +// em := NewManager("test") +// em.FireAsync("db.user.add", M{"id": 1001}) func (em *Manager) FireAsync(e Event) { if em.ConsumerNum <= 0 { em.ConsumerNum = defaultConsumerNum @@ -307,7 +332,7 @@ func (em *Manager) FireAsync(e Event) { } // once make consumers - em.once.Do(func() { + em.oc.Do(func() { em.makeConsumers() }) @@ -321,7 +346,16 @@ func (em *Manager) makeConsumers() { // make event consumers for i := 0; i < em.ConsumerNum; i++ { + em.wg.Add(1) + go func() { + defer func() { + if err := recover(); err != nil { + em.err = fmt.Errorf("async consum event error: %v", err) + } + em.wg.Done() + }() + // keep running until channel closed for e := range em.ch { _ = em.FireEvent(e) // ignore async fire error @@ -330,7 +364,21 @@ func (em *Manager) makeConsumers() { } } -// Close manager and event channel +// CloseWait close channel and wait all async event done. +func (em *Manager) CloseWait() error { + if err := em.Close(); err != nil { + return err + } + return em.Wait() +} + +// Wait wait all async event done. +func (em *Manager) Wait() error { + em.wg.Wait() + return em.err +} + +// Close event channel, deny to fire new event. func (em *Manager) Close() error { if em.ch != nil { close(em.ch) @@ -518,9 +566,7 @@ func (em *Manager) RemoveListeners(name string) { } // Clear alias of the Reset() -func (em *Manager) Clear() { - em.Reset() -} +func (em *Manager) Clear() { em.Reset() } // Reset the manager, clear all data. func (em *Manager) Reset() { @@ -530,7 +576,10 @@ func (em *Manager) Reset() { } // reset all - em.name = "" + em.ch = nil + em.oc = sync.Once{} + em.wg = sync.WaitGroup{} + em.eventFc = make(map[string]FactoryFunc) em.listeners = make(map[string]*ListenerQueue) em.listenedNames = make(map[string]int) diff --git a/manager_test.go b/manager_test.go index 4cb168e..c0782f1 100644 --- a/manager_test.go +++ b/manager_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "strings" + "sync" "testing" "time" @@ -11,6 +12,216 @@ import ( "github.com/stretchr/testify/assert" ) +func TestManager_FireEvent(t *testing.T) { + em := event.NewManager("test") + em.EnableLock = true + + e1 := event.NewBasic("e1", nil) + em.AddEvent(e1) + + em.On("e1", &testListener{"HI"}, event.Min) + em.On("e1", &testListener{"WEL"}, event.High) + em.AddListener("e1", &testListener{"COM"}, event.BelowNormal) + + err := em.FireEvent(e1) + assert.NoError(t, err) + assert.Equal(t, "handled: e1(WEL) -> e1(COM) -> e1(HI)", e1.Get("result")) + + // not exist + err = em.FireEvent(e1.SetName("e2")) + assert.NoError(t, err) + + em.Clear() +} + +func TestManager_FireEvent2(t *testing.T) { + buf := new(bytes.Buffer) + mgr := event.NewM("test") + + evt1 := event.New("evt1", nil).Fill(nil, event.M{"n": "inhere"}) + mgr.AddEvent(evt1) + + assert.True(t, mgr.HasEvent("evt1")) + assert.False(t, mgr.HasEvent("not-exist")) + + mgr.On("evt1", event.ListenerFunc(func(e event.Event) error { + _, _ = fmt.Fprintf(buf, "event: %s, params: n=%s", e.Name(), e.Get("n")) + return nil + }), event.Normal) + + assert.True(t, mgr.HasListeners("evt1")) + assert.False(t, mgr.HasListeners("not-exist")) + + err := mgr.FireEvent(evt1) + assert.NoError(t, err) + assert.Equal(t, "event: evt1, params: n=inhere", buf.String()) + buf.Reset() + + mgr.On(event.Wildcard, event.ListenerFunc(func(e event.Event) error { + buf.WriteString("|Wildcard handler") + return nil + })) + + err = mgr.FireEvent(evt1) + assert.NoError(t, err) + assert.Equal(t, "event: evt1, params: n=inhere|Wildcard handler", buf.String()) +} + +func TestManager_AsyncFire(t *testing.T) { + em := event.NewManager("test") + em.On("e1", event.ListenerFunc(func(e event.Event) error { + assert.Equal(t, map[string]any{"k": "v"}, e.Data()) + e.Set("nk", "nv") + return nil + })) + + e1 := event.NewBasic("e1", event.M{"k": "v"}) + em.AsyncFire(e1) + time.Sleep(time.Second / 10) + assert.Equal(t, "nv", e1.Get("nk")) + + var wg sync.WaitGroup + em.On("e2", event.ListenerFunc(func(e event.Event) error { + defer wg.Done() + assert.Equal(t, "v", e.Get("k")) + return nil + })) + + wg.Add(1) + em.AsyncFire(e1.SetName("e2")) + wg.Wait() + em.Clear() +} + +func TestManager_AwaitFire(t *testing.T) { + em := event.NewManager("test") + em.On("e1", event.ListenerFunc(func(e event.Event) error { + assert.Equal(t, map[string]any{"k": "v"}, e.Data()) + e.Set("nk", "nv") + return nil + })) + + e1 := event.NewBasic("e1", event.M{"k": "v"}) + err := em.AwaitFire(e1) + + assert.NoError(t, err) + assert.Contains(t, e1.Data(), "nk") + assert.Equal(t, "nv", e1.Get("nk")) +} + +func TestManager_AddSubscriber(t *testing.T) { + em := event.NewManager("test") + em.AddSubscriber(&testSubscriber{}) + + assert.True(t, em.HasListeners("e1")) + assert.True(t, em.HasListeners("e2")) + assert.True(t, em.HasListeners("e3")) + + ers := em.FireBatch("e1", event.NewBasic("e2", nil)) + assert.Len(t, ers, 1) + + assert.Panics(t, func() { + em.AddSubscriber(testSubscriber2{}) + }) + + em.Clear() +} + +func TestManager_ListenGroupEvent(t *testing.T) { + em := event.NewManager("test") + + e1 := event.NewBasic("app.evt1", event.M{"buf": new(bytes.Buffer)}) + e1.AttachTo(em) + + l2 := event.ListenerFunc(func(e event.Event) error { + e.Get("buf").(*bytes.Buffer).WriteString(" > 2 " + e.Name()) + return nil + }) + l3 := event.ListenerFunc(func(e event.Event) error { + e.Get("buf").(*bytes.Buffer).WriteString(" > 3 " + e.Name()) + return nil + }) + + em.On("app.evt1", event.ListenerFunc(func(e event.Event) error { + e.Get("buf").(*bytes.Buffer).WriteString("Hi > 1 " + e.Name()) + return nil + })) + em.On("app.*", l2) + em.On("*", l3) + + buf := e1.Get("buf").(*bytes.Buffer) + err, e := em.Fire("app.evt1", nil) + assert.NoError(t, err) + assert.Equal(t, e1, e) + assert.Equal(t, "Hi > 1 app.evt1 > 2 app.evt1 > 3 app.evt1", buf.String()) + + em.RemoveListener("app.*", l2) + assert.Len(t, em.ListenedNames(), 2) + em.On("app.*", event.ListenerFunc(func(e event.Event) error { + return fmt.Errorf("an error") + })) + + buf.Reset() + err, e = em.Fire("app.evt1", nil) + assert.Error(t, err) + assert.Equal(t, "Hi > 1 app.evt1", buf.String()) + + em.RemoveListeners("app.*") + em.RemoveListener("", l3) + em.On("app.*", l2) // re-add + em.On("*", event.ListenerFunc(func(e event.Event) error { + return fmt.Errorf("an error") + })) + assert.Len(t, em.ListenedNames(), 3) + + buf.Reset() + err, e = em.Trigger("app.evt1", nil) + assert.Error(t, err) + assert.Equal(t, e1, e) + assert.Equal(t, "Hi > 1 app.evt1 > 2 app.evt1", buf.String()) + + em.RemoveListener("", nil) + + // clear + em.Clear() + buf.Reset() +} + +func TestManager_Fire_WithWildcard(t *testing.T) { + buf := new(bytes.Buffer) + mgr := event.NewManager("test") + + const Event2FurcasTicketCreate = "kapal.furcas.ticket.create" + + handler := event.ListenerFunc(func(e event.Event) error { + _, _ = fmt.Fprintf(buf, "%s-%s|", e.Name(), e.Get("user")) + return nil + }) + + mgr.On("kapal.furcas.ticket.*", handler) + mgr.On(Event2FurcasTicketCreate, handler) + + err, _ := mgr.Fire(Event2FurcasTicketCreate, event.M{"user": "inhere"}) + assert.NoError(t, err) + assert.Equal( + t, + "kapal.furcas.ticket.create-inhere|kapal.furcas.ticket.create-inhere|", + buf.String(), + ) + buf.Reset() + + // add Wildcard listen + mgr.On("*", handler) + + err, _ = mgr.Trigger(Event2FurcasTicketCreate, event.M{"user": "inhere"}) + assert.NoError(t, err) + assert.Equal( + t, + "kapal.furcas.ticket.create-inhere|kapal.furcas.ticket.create-inhere|kapal.furcas.ticket.create-inhere|", + buf.String(), + ) +} + func TestManager_Fire_usePathMode(t *testing.T) { buf := new(bytes.Buffer) em := event.NewManager("test", event.UsePathMode, event.EnableLock) @@ -148,3 +359,23 @@ func TestManager_FireC(t *testing.T) { time.Sleep(time.Millisecond * 100) // wait for async assert.Empty(t, buf.String()) } + +func TestManager_Wait(t *testing.T) { + em := event.NewManager("test", event.UsePathMode, event.EnableLock) + + buf := new(bytes.Buffer) + em.Listen("db.user.*", event.ListenerFunc(func(e event.Event) error { + time.Sleep(time.Millisecond * 200) + _, _ = buf.WriteString("db.user.*|") + return nil + })) + assert.True(t, em.HasListeners("db.user.*")) + + em.Async("db.user.add", event.M{"user": "inhere"}) + assert.NoError(t, em.CloseWait()) + + str := buf.String() + fmt.Println(str) + assert.Equal(t, "db.user.*|", str) + buf.Reset() +} diff --git a/std.go b/std.go index 987f2a0..72b774c 100644 --- a/std.go +++ b/std.go @@ -1,105 +1,127 @@ package event -// DefaultEM default event manager -var DefaultEM = NewManager("default") - -// NoMatchedListener error -// var NoMatchedListener = errors.New("no event listeners matched") +// std default event manager +var std = NewManager("default") // Std get default event manager func Std() *Manager { - return DefaultEM + return std +} + +// Config set default event manager options +func Config(fn ...OptionFn) { + std.WithOptions(fn...) } /************************************************************* * Listener *************************************************************/ -// On register a listener to the event +// On register a listener to the event. alias of Listen() func On(name string, listener Listener, priority ...int) { - DefaultEM.On(name, listener, priority...) + std.On(name, listener, priority...) } // Listen register a listener to the event func Listen(name string, listener Listener, priority ...int) { - DefaultEM.Listen(name, listener, priority...) + std.Listen(name, listener, priority...) } // Subscribe register a listener to the event func Subscribe(sbr Subscriber) { - DefaultEM.Subscribe(sbr) + std.Subscribe(sbr) } // AddSubscriber register a listener to the event func AddSubscriber(sbr Subscriber) { - DefaultEM.AddSubscriber(sbr) + std.AddSubscriber(sbr) } -// AsyncFire async fire event by 'go' keywords +// AsyncFire simple async fire event by 'go' keywords func AsyncFire(e Event) { - DefaultEM.AsyncFire(e) + std.AsyncFire(e) +} + +// Async fire event by channel +func Async(name string, params M) { + std.Async(name, params) +} + +// FireAsync fire event by channel +func FireAsync(e Event) { + std.FireAsync(e) +} + +// CloseWait close chan and wait for all async events done. +func CloseWait() error { + return std.CloseWait() } // Trigger alias of Fire func Trigger(name string, params M) (error, Event) { - return DefaultEM.Fire(name, params) + return std.Fire(name, params) } // Fire listeners by name. func Fire(name string, params M) (error, Event) { - return DefaultEM.Fire(name, params) + return std.Fire(name, params) } // FireEvent fire listeners by Event instance. func FireEvent(e Event) error { - return DefaultEM.FireEvent(e) + return std.FireEvent(e) } // TriggerEvent alias of FireEvent func TriggerEvent(e Event) error { - return DefaultEM.FireEvent(e) + return std.FireEvent(e) } // MustFire fire event by name. will panic on error func MustFire(name string, params M) Event { - return DefaultEM.MustFire(name, params) + return std.MustFire(name, params) } // MustTrigger alias of MustFire func MustTrigger(name string, params M) Event { - return DefaultEM.MustFire(name, params) + return std.MustFire(name, params) } // FireBatch fire multi event at once. func FireBatch(es ...any) []error { - return DefaultEM.FireBatch(es...) + return std.FireBatch(es...) } // HasListeners has listeners for the event name. func HasListeners(name string) bool { - return DefaultEM.HasListeners(name) + return std.HasListeners(name) } // Reset the default event manager func Reset() { - DefaultEM.Clear() + std.Clear() } /************************************************************* * Event *************************************************************/ -// AddEvent has event check. +// AddEvent add a pre-defined event. func AddEvent(e Event) { - DefaultEM.AddEvent(e) + std.AddEvent(e) +} + +// AddEventFc add a pre-defined event factory func to manager. +func AddEventFc(name string, fc FactoryFunc) { + std.AddEventFc(name, fc) } // GetEvent get event by name. func GetEvent(name string) (Event, bool) { - return DefaultEM.GetEvent(name) + return std.GetEvent(name) } // HasEvent has event check. func HasEvent(name string) bool { - return DefaultEM.HasEvent(name) + return std.HasEvent(name) } diff --git a/std_test.go b/std_test.go index e122061..6b6f4f7 100644 --- a/std_test.go +++ b/std_test.go @@ -1,7 +1,10 @@ package event_test import ( + "bytes" + "fmt" "testing" + "time" "github.com/gookit/event" "github.com/stretchr/testify/assert" @@ -11,6 +14,169 @@ var emptyListener = func(e event.Event) error { return nil } +func TestAddEvent(t *testing.T) { + defer event.Reset() + event.Std().RemoveEvents() + + // no name + assert.Panics(t, func() { + event.AddEvent(&event.BasicEvent{}) + }) + + _, ok := event.GetEvent("evt1") + assert.False(t, ok) + + // event.AddEvent + e := event.NewBasic("evt1", event.M{"k1": "inhere"}) + event.AddEvent(e) + // add by AttachTo + event.NewBasic("evt2", nil).AttachTo(event.Std()) + + assert.False(t, e.IsAborted()) + assert.True(t, event.HasEvent("evt1")) + assert.True(t, event.HasEvent("evt2")) + assert.False(t, event.HasEvent("not-exist")) + + // event.GetEvent + r1, ok := event.GetEvent("evt1") + assert.True(t, ok) + assert.Equal(t, e, r1) + + // RemoveEvent + event.Std().RemoveEvent("evt2") + assert.False(t, event.HasEvent("evt2")) + + // RemoveEvents + event.Std().RemoveEvents() + assert.False(t, event.HasEvent("evt1")) +} + +func TestAddEventFc(t *testing.T) { + // clear all + event.Reset() + event.AddEvent(&testEvent{ + name: "evt1", + }) + assert.True(t, event.HasEvent("evt1")) + + event.AddEventFc("test", func() event.Event { + return event.NewBasic("test", nil) + }) + + assert.True(t, event.HasEvent("test")) +} + +func TestFire(t *testing.T) { + buf := new(bytes.Buffer) + fn := func(e event.Event) error { + _, _ = fmt.Fprintf(buf, "event: %s", e.Name()) + return nil + } + + event.On("evt1", event.ListenerFunc(fn), 0) + event.On("evt1", event.ListenerFunc(emptyListener), event.High) + assert.True(t, event.HasListeners("evt1")) + + err, e := event.Fire("evt1", nil) + assert.NoError(t, err) + assert.Equal(t, "evt1", e.Name()) + assert.Equal(t, "event: evt1", buf.String()) + + event.NewBasic("evt2", nil).AttachTo(event.Std()) + event.On("evt2", event.ListenerFunc(func(e event.Event) error { + assert.Equal(t, "evt2", e.Name()) + assert.Equal(t, "v", e.Get("k")) + return nil + }), event.AboveNormal) + + assert.True(t, event.HasListeners("evt2")) + err, e = event.Trigger("evt2", event.M{"k": "v"}) + assert.NoError(t, err) + assert.Equal(t, "evt2", e.Name()) + assert.Equal(t, map[string]any{"k": "v"}, e.Data()) + + // clear all + event.Reset() + assert.False(t, event.HasListeners("evt1")) + assert.False(t, event.HasListeners("evt2")) + + err, e = event.Fire("not-exist", nil) + assert.NoError(t, err) + assert.NotEmpty(t, e) +} + +func TestAddSubscriber(t *testing.T) { + event.AddSubscriber(&testSubscriber{}) + + assert.True(t, event.HasListeners("e1")) + assert.True(t, event.HasListeners("e2")) + assert.True(t, event.HasListeners("e3")) + + ers := event.FireBatch("e1", event.NewBasic("e2", nil)) + assert.Len(t, ers, 1) + + assert.Panics(t, func() { + event.Subscribe(testSubscriber2{}) + }) + + event.Reset() +} + +func TestFireEvent(t *testing.T) { + defer event.Reset() + buf := new(bytes.Buffer) + + evt1 := event.NewBasic("evt1", nil).Fill(nil, event.M{"n": "inhere"}) + event.AddEvent(evt1) + + assert.True(t, event.HasEvent("evt1")) + assert.False(t, event.HasEvent("not-exist")) + + event.Listen("evt1", event.ListenerFunc(func(e event.Event) error { + _, _ = fmt.Fprintf(buf, "event: %s, params: n=%s", e.Name(), e.Get("n")) + return nil + }), event.Normal) + + assert.True(t, event.HasListeners("evt1")) + assert.False(t, event.HasListeners("not-exist")) + + err := event.FireEvent(evt1) + assert.NoError(t, err) + assert.Equal(t, "event: evt1, params: n=inhere", buf.String()) + buf.Reset() + + err = event.TriggerEvent(evt1) + assert.NoError(t, err) + assert.Equal(t, "event: evt1, params: n=inhere", buf.String()) + buf.Reset() + + event.AsyncFire(evt1) + time.Sleep(time.Second) + assert.Equal(t, "event: evt1, params: n=inhere", buf.String()) +} + +func TestAsync(t *testing.T) { + event.Reset() + event.Config(event.UsePathMode) + + buf := new(bytes.Buffer) + event.On("test", event.ListenerFunc(func(e event.Event) error { + buf.WriteString("test:") + buf.WriteString(e.Get("key").(string)) + buf.WriteString("|") + return nil + })) + + event.Async("test", event.M{"key": "val1"}) + te := &testEvent{name: "test", data: event.M{"key": "val2"}} + event.FireAsync(te) + + assert.NoError(t, event.CloseWait()) + s := buf.String() + assert.Contains(t, s, "test:val1|") + assert.Contains(t, s, "test:val2|") +} + func TestFire_point_at_end(t *testing.T) { // clear all event.Reset() @@ -41,6 +207,23 @@ func TestFire_notExist(t *testing.T) { assert.NotEmpty(t, e) } +func TestMustFire(t *testing.T) { + defer event.Reset() + + event.On("n1", event.ListenerFunc(func(e event.Event) error { + return fmt.Errorf("an error") + }), event.Max) + event.On("n2", event.ListenerFunc(emptyListener), event.Min) + + assert.Panics(t, func() { + _ = event.MustFire("n1", nil) + }) + + assert.NotPanics(t, func() { + _ = event.MustTrigger("n2", nil) + }) +} + func TestOn(t *testing.T) { defer event.Reset()