Skip to content

Commit

Permalink
[Memory fix] Remove memory leak in event subscriptions (#741)
Browse files Browse the repository at this point in the history
* Remove dangling pointers in the subscription struct

* Remove leftover log
  • Loading branch information
zivkovicmilos committed Sep 21, 2022
1 parent 68df30d commit 52b6261
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 105 deletions.
70 changes: 19 additions & 51 deletions blockchain/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ func (m *MockSubscription) Close() {

// subscription is the Blockchain event subscription object
type subscription struct {
updateCh chan void // Channel for update information
closeCh chan void // Channel for close signals
elem *eventElem // Reference to the blockchain event wrapper
updateCh chan *Event // Channel for update information
closeCh chan void // Channel for close signals
}

// GetEventCh creates a new event channel, and returns it
Expand All @@ -70,17 +69,10 @@ func (s *subscription) GetEventCh() chan *Event {
// GetEvent returns the event from the subscription (BLOCKING)
func (s *subscription) GetEvent() *Event {
for {
if s.elem.next != nil {
s.elem = s.elem.next
evnt := s.elem.event

return evnt
}

// Wait for an update
select {
case <-s.updateCh:
continue
case ev := <-s.updateCh:
return ev
case <-s.closeCh:
return nil
}
Expand Down Expand Up @@ -115,7 +107,7 @@ type Event struct {
Type EventType

// Source is the source that generated the blocks for the event
// right now it can be either the Sealer or the Syncer. TODO
// right now it can be either the Sealer or the Syncer
Source string
}

Expand Down Expand Up @@ -158,73 +150,49 @@ func (b *Blockchain) SubscribeEvents() Subscription {
return b.stream.subscribe()
}

// eventElem contains the event, as well as the next list event
type eventElem struct {
event *Event
next *eventElem
}

// eventStream is the structure that contains the event list,
// as well as the update channel which it uses to notify of updates
type eventStream struct {
lock sync.Mutex
head *eventElem
sync.Mutex

// channel to notify updates
updateCh []chan void
updateCh []chan *Event
}

// subscribe Creates a new blockchain event subscription
func (e *eventStream) subscribe() *subscription {
head, updateCh := e.Head()
s := &subscription{
elem: head,
updateCh: updateCh,
return &subscription{
updateCh: e.newUpdateCh(),
closeCh: make(chan void),
}

return s
}

// Head returns the event list head
func (e *eventStream) Head() (*eventElem, chan void) {
e.lock.Lock()
head := e.head
// newUpdateCh returns the event update channel
func (e *eventStream) newUpdateCh() chan *Event {
e.Lock()
defer e.Unlock()

ch := make(chan void)
ch := make(chan *Event, 1)

if e.updateCh == nil {
e.updateCh = make([]chan void, 0)
e.updateCh = make([]chan *Event, 0)
}

e.updateCh = append(e.updateCh, ch)

e.lock.Unlock()

return head, ch
return ch
}

// push adds a new Event, and notifies listeners
func (e *eventStream) push(event *Event) {
e.lock.Lock()

newHead := &eventElem{
event: event,
}

if e.head != nil {
e.head.next = newHead
}

e.head = newHead
e.Lock()
defer e.Unlock()

// Notify the listeners
for _, update := range e.updateCh {
select {
case update <- void{}:
case update <- event:
default:
}
}

e.lock.Unlock()
}
85 changes: 31 additions & 54 deletions blockchain/subscription_test.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,53 @@
package blockchain

import (
"sync"
"testing"
"time"

"github.com/0xPolygon/polygon-edge/types"
"github.com/stretchr/testify/assert"
)

func TestSubscriptionLinear(t *testing.T) {
e := &eventStream{}
func TestSubscription(t *testing.T) {
t.Parallel()

// add a genesis block to eventstream
e.push(&Event{
NewChain: []*types.Header{
{Number: 0},
},
})
var (
e = &eventStream{}
sub = e.subscribe()
caughtEventNum = uint64(0)
event = &Event{
NewChain: []*types.Header{
{
Number: 100,
},
},
}

sub := e.subscribe()
wg sync.WaitGroup
)

eventCh := make(chan *Event)
defer sub.Close()

go func() {
for {
task := sub.GetEvent()
eventCh <- task
}
}()
updateCh := sub.GetEventCh()

for i := 1; i < 10; i++ {
evnt := &Event{}
wg.Add(1)

evnt.AddNewHeader(&types.Header{Number: uint64(i)})
e.push(evnt)
go func() {
defer wg.Done()

// it should fire updateCh
select {
case evnt := <-eventCh:
if evnt.NewChain[0].Number != uint64(i) {
t.Fatal("bad")
}
case <-time.After(1 * time.Second):
t.Fatal("timeout")
case ev := <-updateCh:
caughtEventNum = ev.NewChain[0].Number
case <-time.After(5 * time.Second):
}
}
}

func TestSubscriptionSlowConsumer(t *testing.T) {
e := &eventStream{}
}()

e.push(&Event{
NewChain: []*types.Header{
{Number: 0},
},
})
// Send the event to the channel
e.push(event)

sub := e.subscribe()
// Wait for the event to be parsed
wg.Wait()

// send multiple events
for i := 1; i < 10; i++ {
e.push(&Event{
NewChain: []*types.Header{
{Number: uint64(i)},
},
})
}

// consume events now
for i := 1; i < 10; i++ {
evnt := sub.GetEvent()
if evnt.NewChain[0].Number != uint64(i) {
t.Fatal("bad")
}
}
assert.Equal(t, event.NewChain[0].Number, caughtEventNum)
}

0 comments on commit 52b6261

Please sign in to comment.