Skip to content
This repository has been archived by the owner on Aug 27, 2022. It is now read-only.

Suport event system #370

Merged
merged 2 commits into from Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 10 additions & 9 deletions core/blockchain/blockchain.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Qitmeer/qitmeer/common/roughtime"
"github.com/Qitmeer/qitmeer/core/blockdag"
"github.com/Qitmeer/qitmeer/core/dbnamespace"
"github.com/Qitmeer/qitmeer/core/event"
"github.com/Qitmeer/qitmeer/core/types"
"github.com/Qitmeer/qitmeer/database"
"github.com/Qitmeer/qitmeer/engine/txscript"
Expand Down Expand Up @@ -45,12 +46,12 @@ type BlockChain struct {
// separate mutex.
checkpointsByLayer map[uint64]*params.Checkpoint

db database.DB
dbInfo *databaseInfo
timeSource MedianTimeSource
notifications NotificationCallback
sigCache *txscript.SigCache
indexManager IndexManager
db database.DB
dbInfo *databaseInfo
timeSource MedianTimeSource
events *event.Feed
sigCache *txscript.SigCache
indexManager IndexManager

// subsidyCache is the cache that provides quick lookup of subsidy
// values.
Expand Down Expand Up @@ -142,14 +143,14 @@ type Config struct {
// time is adjusted to be in agreement with other peers.
TimeSource MedianTimeSource

// Notifications defines a callback to which notifications will be sent
// Events defines a event manager to which notifications will be sent
// when various events take place. See the documentation for
// Notification and NotificationType for details on the types and
// contents of notifications.
//
// This field can be nil if the caller is not interested in receiving
// notifications.
Notifications NotificationCallback
Events *event.Feed

// SigCache defines a signature cache to use when when validating
// signatures. This is typically most useful when individual
Expand Down Expand Up @@ -259,7 +260,7 @@ func New(config *Config) (*BlockChain, error) {
db: config.DB,
params: par,
timeSource: config.TimeSource,
notifications: config.Notifications,
events: config.Events,
sigCache: config.SigCache,
indexManager: config.IndexManager,
index: newBlockIndex(config.DB, par),
Expand Down
10 changes: 3 additions & 7 deletions core/blockchain/notifications.go
Expand Up @@ -8,18 +8,14 @@ package blockchain

import (
"fmt"

"github.com/Qitmeer/qitmeer/common/hash"
"github.com/Qitmeer/qitmeer/core/event"
"github.com/Qitmeer/qitmeer/core/types"
)

// NotificationType represents the type of a notification message.
type NotificationType int

// NotificationCallback is used for a caller to provide a callback for
// notifications about various chain events.
type NotificationCallback func(*Notification)

// Constants for the type of a notification message.
const (
// BlockAccepted indicates the associated block was accepted into
Expand Down Expand Up @@ -103,14 +99,14 @@ type Notification struct {
// to New.
func (b *BlockChain) sendNotification(typ NotificationType, data interface{}) {
// Ignore it if the caller didn't request notifications.
if b.notifications == nil {
if b.events == nil {
return
}

// Generate and send the notification.
n := Notification{Type: typ, Data: data}
log.Trace("send blkmgr notification", "type", n.Type, "data", n.Data)
b.ChainUnlock()
b.notifications(&n)
b.events.Send(event.New(&n))
b.ChainLock()
}
14 changes: 14 additions & 0 deletions core/event/event.go
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2017-2020 The qitmeer developers
*/

package event

type Event struct {
Data interface{}
Ack chan<- struct{}
}

func New(data interface{}) *Event {
return &Event{Data: data, Ack: nil}
}
219 changes: 219 additions & 0 deletions core/event/feed.go
@@ -0,0 +1,219 @@
/*
* Copyright (c) 2017-2020 The qitmeer developers
*/

package event

import (
"errors"
"reflect"
"sync"
)

var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type")

// Feed implements one-to-many subscriptions where the carrier of events is a channel.
// Values sent to a Feed are delivered to all subscribed channels simultaneously.
//
// Feeds can only be used with a single type. The type is determined by the first Send or
// Subscribe operation. Subsequent calls to these methods panic if the type does not
// match.
//
// The zero value is ready to use.
type Feed struct {
once sync.Once // ensures that init only runs once
sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases.
removeSub chan interface{} // interrupts Send
sendCases caseList // the active set of select cases used by Send

// The inbox holds newly subscribed channels until they are added to sendCases.
mu sync.Mutex
inbox caseList
etype reflect.Type
}

// This is the index of the first actual subscription channel in sendCases.
// sendCases[0] is a SelectRecv case for the removeSub channel.
const firstSubSendCase = 1

type feedTypeError struct {
got, want reflect.Type
op string
}

func (e feedTypeError) Error() string {
return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String()
}

func (f *Feed) init() {
f.removeSub = make(chan interface{})
f.sendLock = make(chan struct{}, 1)
f.sendLock <- struct{}{}
f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
}

// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
// until the subscription is canceled. All channels added must have the same element type.
//
// The channel should have ample buffer space to avoid blocking other subscribers.
// Slow subscribers are not dropped.
func (f *Feed) Subscribe(channel interface{}) Subscription {
f.once.Do(f.init)

chanval := reflect.ValueOf(channel)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
panic(errBadChannel)
}
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}

f.mu.Lock()
defer f.mu.Unlock()
if !f.typecheck(chantyp.Elem()) {
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
}
// Add the select case to the inbox.
// The next Send will add it to f.sendCases.
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
f.inbox = append(f.inbox, cas)
return sub
}

// note: callers must hold f.mu
func (f *Feed) typecheck(typ reflect.Type) bool {
if f.etype == nil {
f.etype = typ
return true
}
return f.etype == typ
}

func (f *Feed) remove(sub *feedSub) {
// Delete from inbox first, which covers channels
// that have not been added to f.sendCases yet.
ch := sub.channel.Interface()
f.mu.Lock()
index := f.inbox.find(ch)
if index != -1 {
f.inbox = f.inbox.delete(index)
f.mu.Unlock()
return
}
f.mu.Unlock()

select {
case f.removeSub <- ch:
// Send will remove the channel from f.sendCases.
case <-f.sendLock:
// No Send is in progress, delete the channel now that we have the send lock.
f.sendCases = f.sendCases.delete(f.sendCases.find(ch))
f.sendLock <- struct{}{}
}
}

// Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to.
func (f *Feed) Send(value interface{}) (nsent int) {
rvalue := reflect.ValueOf(value)

f.once.Do(f.init)
<-f.sendLock

// Add new cases from the inbox after taking the send lock.
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...)
f.inbox = nil

if !f.typecheck(rvalue.Type()) {
f.sendLock <- struct{}{}
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
f.mu.Unlock()

// Set the sent value on all channels.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue
}

// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
// of sendCases. When a send succeeds, the corresponding case moves to the end of
// 'cases' and it shrinks by one element.
cases := f.sendCases
for {
// Fast path: try sending without blocking before adding to the select set.
// This should usually succeed if subscribers are fast enough and have free
// buffer space.
for i := firstSubSendCase; i < len(cases); i++ {
if cases[i].Chan.TrySend(rvalue) {
nsent++
cases = cases.deactivate(i)
i--
}
}
if len(cases) == firstSubSendCase {
break
}
// Select on all the receivers, waiting for them to unblock.
chosen, recv, _ := reflect.Select(cases)
if chosen == 0 /* <-f.removeSub */ {
index := f.sendCases.find(recv.Interface())
f.sendCases = f.sendCases.delete(index)
if index >= 0 && index < len(cases) {
// Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
cases = cases.deactivate(chosen)
nsent++
}
}

// Forget about the sent value and hand off the send lock.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{}
}
f.sendLock <- struct{}{}
return nsent
}

type feedSub struct {
feed *Feed
channel reflect.Value
errOnce sync.Once
err chan error
}

func (sub *feedSub) Unsubscribe() {
sub.errOnce.Do(func() {
sub.feed.remove(sub)
close(sub.err)
})
}

func (sub *feedSub) Err() <-chan error {
return sub.err
}

type caseList []reflect.SelectCase

// find returns the index of a case containing the given channel.
func (cs caseList) find(channel interface{}) int {
for i, cas := range cs {
if cas.Chan.Interface() == channel {
return i
}
}
return -1
}

// delete removes the given case from cs.
func (cs caseList) delete(index int) caseList {
return append(cs[:index], cs[index+1:]...)
}

// deactivate moves the case at index into the non-accessible portion of the cs slice.
func (cs caseList) deactivate(index int) caseList {
last := len(cs) - 1
cs[index], cs[last] = cs[last], cs[index]
return cs[:last]
}