Skip to content

Commit

Permalink
Implement unscubscribe on event stream (#1991)
Browse files Browse the repository at this point in the history
* Unsubscribe from event stream
  • Loading branch information
stana-miric committed Oct 18, 2023
1 parent b1e7a26 commit 72a7e3a
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 47 deletions.
9 changes: 7 additions & 2 deletions archive/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (

type blockchainInterface interface {
SubscribeEvents() blockchain.Subscription
UnsubscribeEvents(blockchain.Subscription)
Genesis() types.Hash
GetBlockByNumber(uint64, bool) (*types.Block, bool)
GetHashByNumber(uint64) types.Hash
Expand Down Expand Up @@ -68,9 +69,13 @@ func importBlocks(chain blockchainInterface, blockStream *blockStream, progressi
}

// Create a blockchain subscription for the sync progression and start tracking
progression.StartProgression(firstBlock.Number(), chain.SubscribeEvents())
subscription := chain.SubscribeEvents()
progression.StartProgression(firstBlock.Number(), subscription)
// Stop monitoring the sync progression upon exit
defer progression.StopProgression()
defer func() {
progression.StopProgression()
chain.UnsubscribeEvents(subscription)
}()

// Set the goal
progression.UpdateHighestProgression(metadata.Latest)
Expand Down
3 changes: 3 additions & 0 deletions archive/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func (m *mockChain) SubscribeEvents() blockchain.Subscription {
return blockchain.NewMockSubscription()
}

func (m *mockChain) UnsubscribeEvents(subscription blockchain.Subscription) {
}

func getLatestBlockFromMockChain(m *mockChain) *types.Block {
if l := len(m.blocks); l != 0 {
return m.blocks[l-1]
Expand Down
2 changes: 1 addition & 1 deletion blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func NewBlockchain(
db: db,
executor: executor,
txSigner: txSigner,
stream: &eventStream{},
stream: newEventStream(),
gpAverage: &gasPriceAverage{
price: big.NewInt(0),
count: big.NewInt(0),
Expand Down
2 changes: 1 addition & 1 deletion blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ func TestBlockchain_WriteFullBlock(t *testing.T) {
BaseFeeEM: 4,
},
},
stream: &eventStream{},
stream: newEventStream(),
}

bc.headersCache, _ = lru.New(10)
Expand Down
60 changes: 35 additions & 25 deletions blockchain/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type void struct{}
type Subscription interface {
GetEventCh() chan *Event
GetEvent() *Event
Close()
}

// FOR TESTING PURPOSES //
Expand Down Expand Up @@ -56,11 +55,6 @@ func (s *subscription) GetEvent() *Event {
}
}

// Close closes the subscription
func (s *subscription) Close() {
close(s.closeCh)
}

type EventType int

const (
Expand Down Expand Up @@ -127,42 +121,58 @@ func (b *Blockchain) SubscribeEvents() Subscription {
return b.stream.subscribe()
}

// eventStream is the structure that contains the event list,
// as well as the update channel which it uses to notify of updates
// UnsubscribeEvents removes subscription from blockchain event stream
func (b *Blockchain) UnsubscribeEvents(sub Subscription) {
if subPtr, ok := sub.(*subscription); ok {
b.stream.unsubscribe(subPtr)
} else {
b.logger.Warn("Failed to unsubscribe from event stream. Invalid subscription.")
}
}

// eventStream is the structure that contains the subscribers
// which it uses to notify of updates
type eventStream struct {
sync.Mutex
sync.RWMutex
subscriptions map[*subscription]struct{}
}

// channel to notify updates
updateCh []chan *Event
// newEventStream creates event stream and initializes subscriptions map
func newEventStream() *eventStream {
return &eventStream{
subscriptions: make(map[*subscription]struct{}),
}
}

// subscribe Creates a new blockchain event subscription
// subscribe creates a new blockchain event subscription
func (e *eventStream) subscribe() *subscription {
return &subscription{
updateCh: e.newUpdateCh(),
sub := &subscription{
updateCh: make(chan *Event, 5),
closeCh: make(chan void),
}
}

// newUpdateCh returns the event update channel
func (e *eventStream) newUpdateCh() chan *Event {
e.Lock()
defer e.Unlock()
e.subscriptions[sub] = struct{}{}
e.Unlock()

ch := make(chan *Event, 5)
return sub
}

e.updateCh = append(e.updateCh, ch)
func (e *eventStream) unsubscribe(sub *subscription) {
e.Lock()
defer e.Unlock()

return ch
delete(e.subscriptions, sub)
close(sub.closeCh)
}

// push adds a new Event, and notifies listeners
func (e *eventStream) push(event *Event) {
e.Lock()
defer e.Unlock()
e.RLock()
defer e.RUnlock()

// Notify the listeners
for _, update := range e.updateCh {
update <- event
for sub := range e.subscriptions {
sub.updateCh <- event
}
}
119 changes: 108 additions & 11 deletions blockchain/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestSubscription(t *testing.T) {
t.Parallel()

var (
e = &eventStream{}
e = newEventStream()
sub = e.subscribe()
caughtEventNum = uint64(0)
event = &Event{
Expand All @@ -27,7 +27,7 @@ func TestSubscription(t *testing.T) {
wg sync.WaitGroup
)

defer sub.Close()
defer e.unsubscribe(sub)

updateCh := sub.GetEventCh()

Expand Down Expand Up @@ -56,7 +56,7 @@ func TestSubscription_BufferedChannel_MultipleSubscriptions(t *testing.T) {
t.Parallel()

var (
e = &eventStream{}
e = newEventStream()
wg sync.WaitGroup
numOfEvents = 100000
numOfSubscriptions = 10
Expand Down Expand Up @@ -93,19 +93,116 @@ func TestSubscription_BufferedChannel_MultipleSubscriptions(t *testing.T) {

// Send the events to the channels
for i := 0; i < numOfEvents; i++ {
e.push(&Event{
NewChain: []*types.Header{
{
Number: uint64(i),
},
},
})
e.push(&Event{})
}

// Wait for the events to be processed
wg.Wait()

for _, s := range subscriptions {
s.Close()
e.unsubscribe(s)
}
}

func TestSubscription_AfterOneUnsubscribe(t *testing.T) {
t.Parallel()

var (
e = newEventStream()

wg sync.WaitGroup
)

sub1 := e.subscribe()
sub2 := e.subscribe()

wg.Add(2)

worker := func(sub *subscription, expectedBlockCount uint8) {
defer wg.Done()

updateCh := sub.GetEventCh()
receivedBlockCount := uint8(0)

for {
select {
case <-updateCh:
receivedBlockCount++
if receivedBlockCount == expectedBlockCount {
e.unsubscribe(sub)

return
}
case <-time.After(10 * time.Second):
e.unsubscribe(sub)
t.Errorf("subscription did not caught all events")
}
}
}

go worker(sub1, 10)
go worker(sub2, 20)

// Send the events to the channels
for i := 0; i < 20; i++ {
e.push(&Event{})
time.Sleep(time.Millisecond)
}

// Wait for the event to be parsed
wg.Wait()
}

func TestSubscription_NilEventAfterClosingSubscription(t *testing.T) {
t.Parallel()

var (
e = newEventStream()

wg sync.WaitGroup
)

sub := e.subscribe()

wg.Add(1)

receivedEvtCount := 0

worker := func(sub *subscription, expectedBlockCount uint8) {
defer wg.Done()

startTime := time.Now().UTC()
timeout := 5 * time.Second

for {
evt := sub.GetEvent()
if evt != nil {
receivedEvtCount++
} else {
return
}

// Check for timeout
if time.Since(startTime) >= timeout {
t.Errorf("subscription did not caught all events")

return
}
}
}

go worker(sub, 2)

// Send the events to the channels
for i := 0; i < 2; i++ {
e.push(&Event{})
time.Sleep(time.Millisecond)
}

e.unsubscribe(sub)

// Wait for the event to be parsed
wg.Wait()

assert.Equal(t, 2, receivedEvtCount)
}
2 changes: 1 addition & 1 deletion blockchain/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func NewMockBlockchain(
consensus: mockVerifier,
executor: executor,
config: config,
stream: &eventStream{},
stream: newEventStream(),
gpAverage: &gasPriceAverage{
price: big.NewInt(0),
count: big.NewInt(0),
Expand Down
2 changes: 1 addition & 1 deletion consensus/ibft/ibft.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (i *backendIBFT) startConsensus() {
}
}()

defer newBlockSub.Close()
defer i.blockchain.UnsubscribeEvents(newBlockSub)

var (
sequenceCh = make(<-chan struct{})
Expand Down
8 changes: 8 additions & 0 deletions consensus/polybft/blockchain_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ type blockchainBackend interface {
// GetSystemState creates a new instance of SystemState interface
GetSystemState(provider contract.Provider) SystemState

// SubscribeEvents subscribes to blockchain events
SubscribeEvents() blockchain.Subscription

// UnubscribeEvents unsubscribes from blockchain events
UnubscribeEvents(subscription blockchain.Subscription)

// GetChainID returns chain id of the current blockchain
GetChainID() uint64

Expand Down Expand Up @@ -182,6 +186,10 @@ func (p *blockchainWrapper) SubscribeEvents() blockchain.Subscription {
return p.blockchain.SubscribeEvents()
}

func (p *blockchainWrapper) UnubscribeEvents(subscription blockchain.Subscription) {
p.blockchain.UnsubscribeEvents(subscription)
}

func (p *blockchainWrapper) GetChainID() uint64 {
return uint64(p.blockchain.Config().ChainID)
}
Expand Down
3 changes: 3 additions & 0 deletions consensus/polybft/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func (m *blockchainMock) SubscribeEvents() blockchain.Subscription {
return nil
}

func (m *blockchainMock) UnubscribeEvents(blockchain.Subscription) {
}

func (m *blockchainMock) CalculateGasLimit(number uint64) (uint64, error) {
return 0, nil
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/polybft/polybft.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func (p *Polybft) startConsensusProtocol() {
p.logger.Debug("peers connected")

newBlockSub := p.blockchain.SubscribeEvents()
defer newBlockSub.Close()
defer p.blockchain.UnubscribeEvents(newBlockSub)

syncerBlockCh := make(chan struct{})

Expand Down
2 changes: 0 additions & 2 deletions helper/progress/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func (pw *ProgressionWrapper) RunUpdateLoop(subscription blockchain.Subscription
lastBlock := event.NewChain[len(event.NewChain)-1]
pw.UpdateCurrentProgression(lastBlock.Number)
case <-pw.stopCh:
subscription.Close()

return
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/system_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *systemService) Subscribe(req *empty.Empty, stream proto.System_Subscrib
}
}

sub.Close()
s.server.blockchain.UnsubscribeEvents(sub)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (m *syncPeerClient) Close() {
}

if m.subscription != nil {
m.subscription.Close()
m.blockchain.UnsubscribeEvents(m.subscription)

m.subscription = nil
}
Expand Down
3 changes: 3 additions & 0 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func (m *mockBlockchain) SubscribeEvents() blockchain.Subscription {
return m.subscription
}

func (m *mockBlockchain) UnsubscribeEvents(blockchain.Subscription) {
}

func (m *mockBlockchain) Header() *types.Header {
return m.headerHandler()
}
Expand Down
Loading

0 comments on commit 72a7e3a

Please sign in to comment.