This repository has been archived by the owner on Aug 27, 2022. It is now read-only.
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #370 from lochjin/0.10-dev
Suport event system
- Loading branch information
Showing
8 changed files
with
542 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
/* | ||
* Copyright (c) 2017-2020 The qitmeer developers | ||
*/ | ||
|
||
package event | ||
|
||
type Event struct { | ||
Data interface{} | ||
Ack chan<- struct{} | ||
} | ||
|
||
func New(data interface{}) *Event { | ||
return &Event{Data: data, Ack: nil} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
/* | ||
* Copyright (c) 2017-2020 The qitmeer developers | ||
*/ | ||
|
||
package event | ||
|
||
import ( | ||
"errors" | ||
"reflect" | ||
"sync" | ||
) | ||
|
||
var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type") | ||
|
||
// Feed implements one-to-many subscriptions where the carrier of events is a channel. | ||
// Values sent to a Feed are delivered to all subscribed channels simultaneously. | ||
// | ||
// Feeds can only be used with a single type. The type is determined by the first Send or | ||
// Subscribe operation. Subsequent calls to these methods panic if the type does not | ||
// match. | ||
// | ||
// The zero value is ready to use. | ||
type Feed struct { | ||
once sync.Once // ensures that init only runs once | ||
sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases. | ||
removeSub chan interface{} // interrupts Send | ||
sendCases caseList // the active set of select cases used by Send | ||
|
||
// The inbox holds newly subscribed channels until they are added to sendCases. | ||
mu sync.Mutex | ||
inbox caseList | ||
etype reflect.Type | ||
} | ||
|
||
// This is the index of the first actual subscription channel in sendCases. | ||
// sendCases[0] is a SelectRecv case for the removeSub channel. | ||
const firstSubSendCase = 1 | ||
|
||
type feedTypeError struct { | ||
got, want reflect.Type | ||
op string | ||
} | ||
|
||
func (e feedTypeError) Error() string { | ||
return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String() | ||
} | ||
|
||
func (f *Feed) init() { | ||
f.removeSub = make(chan interface{}) | ||
f.sendLock = make(chan struct{}, 1) | ||
f.sendLock <- struct{}{} | ||
f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}} | ||
} | ||
|
||
// Subscribe adds a channel to the feed. Future sends will be delivered on the channel | ||
// until the subscription is canceled. All channels added must have the same element type. | ||
// | ||
// The channel should have ample buffer space to avoid blocking other subscribers. | ||
// Slow subscribers are not dropped. | ||
func (f *Feed) Subscribe(channel interface{}) Subscription { | ||
f.once.Do(f.init) | ||
|
||
chanval := reflect.ValueOf(channel) | ||
chantyp := chanval.Type() | ||
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 { | ||
panic(errBadChannel) | ||
} | ||
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)} | ||
|
||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
if !f.typecheck(chantyp.Elem()) { | ||
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)}) | ||
} | ||
// Add the select case to the inbox. | ||
// The next Send will add it to f.sendCases. | ||
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval} | ||
f.inbox = append(f.inbox, cas) | ||
return sub | ||
} | ||
|
||
// note: callers must hold f.mu | ||
func (f *Feed) typecheck(typ reflect.Type) bool { | ||
if f.etype == nil { | ||
f.etype = typ | ||
return true | ||
} | ||
return f.etype == typ | ||
} | ||
|
||
func (f *Feed) remove(sub *feedSub) { | ||
// Delete from inbox first, which covers channels | ||
// that have not been added to f.sendCases yet. | ||
ch := sub.channel.Interface() | ||
f.mu.Lock() | ||
index := f.inbox.find(ch) | ||
if index != -1 { | ||
f.inbox = f.inbox.delete(index) | ||
f.mu.Unlock() | ||
return | ||
} | ||
f.mu.Unlock() | ||
|
||
select { | ||
case f.removeSub <- ch: | ||
// Send will remove the channel from f.sendCases. | ||
case <-f.sendLock: | ||
// No Send is in progress, delete the channel now that we have the send lock. | ||
f.sendCases = f.sendCases.delete(f.sendCases.find(ch)) | ||
f.sendLock <- struct{}{} | ||
} | ||
} | ||
|
||
// Send delivers to all subscribed channels simultaneously. | ||
// It returns the number of subscribers that the value was sent to. | ||
func (f *Feed) Send(value interface{}) (nsent int) { | ||
rvalue := reflect.ValueOf(value) | ||
|
||
f.once.Do(f.init) | ||
<-f.sendLock | ||
|
||
// Add new cases from the inbox after taking the send lock. | ||
f.mu.Lock() | ||
f.sendCases = append(f.sendCases, f.inbox...) | ||
f.inbox = nil | ||
|
||
if !f.typecheck(rvalue.Type()) { | ||
f.sendLock <- struct{}{} | ||
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) | ||
} | ||
f.mu.Unlock() | ||
|
||
// Set the sent value on all channels. | ||
for i := firstSubSendCase; i < len(f.sendCases); i++ { | ||
f.sendCases[i].Send = rvalue | ||
} | ||
|
||
// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix | ||
// of sendCases. When a send succeeds, the corresponding case moves to the end of | ||
// 'cases' and it shrinks by one element. | ||
cases := f.sendCases | ||
for { | ||
// Fast path: try sending without blocking before adding to the select set. | ||
// This should usually succeed if subscribers are fast enough and have free | ||
// buffer space. | ||
for i := firstSubSendCase; i < len(cases); i++ { | ||
if cases[i].Chan.TrySend(rvalue) { | ||
nsent++ | ||
cases = cases.deactivate(i) | ||
i-- | ||
} | ||
} | ||
if len(cases) == firstSubSendCase { | ||
break | ||
} | ||
// Select on all the receivers, waiting for them to unblock. | ||
chosen, recv, _ := reflect.Select(cases) | ||
if chosen == 0 /* <-f.removeSub */ { | ||
index := f.sendCases.find(recv.Interface()) | ||
f.sendCases = f.sendCases.delete(index) | ||
if index >= 0 && index < len(cases) { | ||
// Shrink 'cases' too because the removed case was still active. | ||
cases = f.sendCases[:len(cases)-1] | ||
} | ||
} else { | ||
cases = cases.deactivate(chosen) | ||
nsent++ | ||
} | ||
} | ||
|
||
// Forget about the sent value and hand off the send lock. | ||
for i := firstSubSendCase; i < len(f.sendCases); i++ { | ||
f.sendCases[i].Send = reflect.Value{} | ||
} | ||
f.sendLock <- struct{}{} | ||
return nsent | ||
} | ||
|
||
type feedSub struct { | ||
feed *Feed | ||
channel reflect.Value | ||
errOnce sync.Once | ||
err chan error | ||
} | ||
|
||
func (sub *feedSub) Unsubscribe() { | ||
sub.errOnce.Do(func() { | ||
sub.feed.remove(sub) | ||
close(sub.err) | ||
}) | ||
} | ||
|
||
func (sub *feedSub) Err() <-chan error { | ||
return sub.err | ||
} | ||
|
||
type caseList []reflect.SelectCase | ||
|
||
// find returns the index of a case containing the given channel. | ||
func (cs caseList) find(channel interface{}) int { | ||
for i, cas := range cs { | ||
if cas.Chan.Interface() == channel { | ||
return i | ||
} | ||
} | ||
return -1 | ||
} | ||
|
||
// delete removes the given case from cs. | ||
func (cs caseList) delete(index int) caseList { | ||
return append(cs[:index], cs[index+1:]...) | ||
} | ||
|
||
// deactivate moves the case at index into the non-accessible portion of the cs slice. | ||
func (cs caseList) deactivate(index int) caseList { | ||
last := len(cs) - 1 | ||
cs[index], cs[last] = cs[last], cs[index] | ||
return cs[:last] | ||
} |
Oops, something went wrong.