Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: model finalised notifier channel after imported notifier channel #1816

Merged
merged 7 commits into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dot/core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type BlockState interface {
GetFinalisedHash(uint64, uint64) (common.Hash, error)
GetImportedBlockNotifierChannel() chan *types.Block
FreeImportedBlockNotifierChannel(ch chan *types.Block)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalisedChannel(id byte)
GetFinalisedNotifierChannel() chan *types.FinalisationInfo
FreeFinalisedNotifierChannel(ch chan *types.FinalisationInfo)
HighestCommonAncestor(a, b common.Hash) (common.Hash, error)
SubChain(start, end common.Hash) ([]common.Hash, error)
GetBlockBody(hash common.Hash) (*types.Body, error)
Expand Down
47 changes: 21 additions & 26 deletions dot/core/mocks/block_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 4 additions & 12 deletions dot/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ type Handler struct {
grandpaState GrandpaState

// block notification channels
imported chan *types.Block
finalised chan *types.FinalisationInfo
finalisedID byte
imported chan *types.Block
finalised chan *types.FinalisationInfo

// GRANDPA changes
grandpaScheduledChange *grandpaChange
Expand All @@ -75,12 +74,7 @@ type resume struct {
func NewHandler(blockState BlockState, epochState EpochState, grandpaState GrandpaState) (*Handler, error) {
imported := blockState.GetImportedBlockNotifierChannel()

finalised := make(chan *types.FinalisationInfo, 16)

fid, err := blockState.RegisterFinalizedChannel(finalised)
if err != nil {
return nil, err
}
finalised := blockState.GetFinalisedNotifierChannel()

ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -92,7 +86,6 @@ func NewHandler(blockState BlockState, epochState EpochState, grandpaState Grand
grandpaState: grandpaState,
imported: imported,
finalised: finalised,
finalisedID: fid,
}, nil
}

Expand All @@ -107,8 +100,7 @@ func (h *Handler) Start() error {
func (h *Handler) Stop() error {
h.cancel()
h.blockState.FreeImportedBlockNotifierChannel(h.imported)
h.blockState.UnregisterFinalisedChannel(h.finalisedID)
close(h.finalised)
h.blockState.FreeFinalisedNotifierChannel(h.finalised)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions dot/digest/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type BlockState interface {
BestBlockHeader() (*types.Header, error)
GetImportedBlockNotifierChannel() chan *types.Block
FreeImportedBlockNotifierChannel(ch chan *types.Block)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalisedChannel(id byte)
GetFinalisedNotifierChannel() chan *types.FinalisationInfo
FreeFinalisedNotifierChannel(ch chan *types.FinalisationInfo)
}

// EpochState is the interface for state.EpochState
Expand Down
4 changes: 2 additions & 2 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type BlockAPI interface {
GetJustification(hash common.Hash) ([]byte, error)
GetImportedBlockNotifierChannel() chan *types.Block
FreeImportedBlockNotifierChannel(ch chan *types.Block)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalisedChannel(id byte)
GetFinalisedNotifierChannel() chan *types.FinalisationInfo
FreeFinalisedNotifierChannel(ch chan *types.FinalisationInfo)
SubChain(start, end common.Hash) ([]common.Hash, error)
RegisterRuntimeUpdatedChannel(ch chan<- runtime.Version) (uint32, error)
UnregisterRuntimeUpdatedChannel(id uint32) bool
Expand Down
5 changes: 2 additions & 3 deletions dot/rpc/modules/api_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ func NewMockBlockAPI() *modulesmocks.MockBlockAPI {
m.On("GetHighestFinalisedHash").Return(common.Hash{}, nil)
m.On("GetImportedBlockNotifierChannel").Return(make(chan *types.Block, 5))
m.On("FreeImportedBlockNotifierChannel", mock.AnythingOfType("chan *types.Block"))
m.On("UnregisterImportedChannel", mock.AnythingOfType("uint8"))
m.On("RegisterFinalizedChannel", mock.AnythingOfType("chan<- *types.FinalisationInfo")).Return(byte(0), nil)
m.On("UnregisterFinalizedChannel", mock.AnythingOfType("uint8"))
m.On("GetFinalisedNotifierChannel").Return(make(chan *types.FinalisationInfo, 5))
m.On("FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))
m.On("GetJustification", mock.AnythingOfType("common.Hash")).Return(make([]byte, 10), nil)
m.On("HasJustification", mock.AnythingOfType("common.Hash")).Return(true, nil)
m.On("SubChain", mock.AnythingOfType("common.Hash"), mock.AnythingOfType("common.Hash")).Return(make([]common.Hash, 0), nil)
Expand Down
47 changes: 21 additions & 26 deletions dot/rpc/modules/mocks/block_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 18 additions & 24 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ func (l *BlockListener) Stop() error {
type BlockFinalizedListener struct {
channel chan *types.FinalisationInfo
wsconn *WSConn
chanID byte
subID uint32
done chan struct{}
cancel chan struct{}
Expand All @@ -189,7 +188,7 @@ type BlockFinalizedListener struct {
func (l *BlockFinalizedListener) Listen() {
go func() {
defer func() {
l.wsconn.BlockAPI.UnregisterFinalisedChannel(l.chanID)
l.wsconn.BlockAPI.FreeFinalisedNotifierChannel(l.channel)
close(l.done)
}()

Expand Down Expand Up @@ -229,12 +228,11 @@ type AllBlocksListener struct {
finalizedChan chan *types.FinalisationInfo
importedChan chan *types.Block

wsconn *WSConn
finalizedChanID byte
subID uint32
done chan struct{}
cancel chan struct{}
cancelTimeout time.Duration
wsconn *WSConn
subID uint32
done chan struct{}
cancel chan struct{}
cancelTimeout time.Duration
}

func newAllBlockListener(conn *WSConn) *AllBlocksListener {
Expand All @@ -243,7 +241,6 @@ func newAllBlockListener(conn *WSConn) *AllBlocksListener {
done: make(chan struct{}, 1),
cancelTimeout: defaultCancelTimeout,
wsconn: conn,
finalizedChan: make(chan *types.FinalisationInfo, DEFAULT_BUFFER_SIZE),
}
}

Expand All @@ -252,9 +249,8 @@ func (l *AllBlocksListener) Listen() {
go func() {
defer func() {
l.wsconn.BlockAPI.FreeImportedBlockNotifierChannel(l.importedChan)
l.wsconn.BlockAPI.UnregisterFinalisedChannel(l.finalizedChanID)
l.wsconn.BlockAPI.FreeFinalisedNotifierChannel(l.finalizedChan)

close(l.finalizedChan)
close(l.done)
}()

Expand Down Expand Up @@ -307,16 +303,15 @@ func (l *AllBlocksListener) Stop() error {

// ExtrinsicSubmitListener to handle listening for extrinsic events
type ExtrinsicSubmitListener struct {
wsconn *WSConn
subID uint32
extrinsic types.Extrinsic
importedChan chan *types.Block
importedHash common.Hash
finalisedChan chan *types.FinalisationInfo
finalisedChanID byte
done chan struct{}
cancel chan struct{}
cancelTimeout time.Duration
wsconn *WSConn
subID uint32
extrinsic types.Extrinsic
importedChan chan *types.Block
importedHash common.Hash
finalisedChan chan *types.FinalisationInfo
done chan struct{}
cancel chan struct{}
cancelTimeout time.Duration
}

// NewExtrinsicSubmitListener constructor to build new ExtrinsicSubmitListener
Expand All @@ -338,7 +333,7 @@ func (l *ExtrinsicSubmitListener) Listen() {
go func() {
defer func() {
l.wsconn.BlockAPI.FreeImportedBlockNotifierChannel(l.importedChan)
l.wsconn.BlockAPI.UnregisterFinalisedChannel(l.finalisedChanID)
l.wsconn.BlockAPI.FreeFinalisedNotifierChannel(l.finalisedChan)
close(l.done)
close(l.finalisedChan)
}()
Expand Down Expand Up @@ -459,7 +454,6 @@ type GrandpaJustificationListener struct {
done chan struct{}
wsconn *WSConn
subID uint32
finalisedChID byte
finalisedCh chan *types.FinalisationInfo
}

Expand All @@ -468,7 +462,7 @@ func (g *GrandpaJustificationListener) Listen() {
// listen for finalised headers
go func() {
defer func() {
g.wsconn.BlockAPI.UnregisterFinalisedChannel(g.finalisedChID)
g.wsconn.BlockAPI.FreeFinalisedNotifierChannel(g.finalisedCh)
close(g.done)
}()

Expand Down
10 changes: 5 additions & 5 deletions dot/rpc/subscription/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestBlockFinalizedListener_Listen(t *testing.T) {
defer cancel()

BlockAPI := new(mocks.MockBlockAPI)
BlockAPI.On("UnregisterFinalisedChannel", mock.AnythingOfType("uint8"))
BlockAPI.On("FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))

wsconn.BlockAPI = BlockAPI

Expand All @@ -164,7 +164,7 @@ func TestBlockFinalizedListener_Listen(t *testing.T) {
defer func() {
require.NoError(t, bfl.Stop())
time.Sleep(time.Millisecond * 10)
BlockAPI.AssertCalled(t, "UnregisterFinalisedChannel", mock.AnythingOfType("uint8"))
BlockAPI.AssertCalled(t, "FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))
}()

notifyChan <- &types.FinalisationInfo{
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) {

BlockAPI := new(mocks.MockBlockAPI)
BlockAPI.On("FreeImportedBlockNotifierChannel", mock.AnythingOfType("chan *types.Block"))
BlockAPI.On("UnregisterFinalisedChannel", mock.AnythingOfType("uint8"))
BlockAPI.On("FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))

wsconn.BlockAPI = BlockAPI

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) {
time.Sleep(time.Millisecond * 10)

BlockAPI.AssertCalled(t, "FreeImportedBlockNotifierChannel", mock.AnythingOfType("chan *types.Block"))
BlockAPI.AssertCalled(t, "UnregisterFinalisedChannel", mock.AnythingOfType("uint8"))
BlockAPI.AssertCalled(t, "FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))
}()

notifyImportedChan <- block
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestGrandpaJustification_Listen(t *testing.T) {

blockStateMock := new(mocks.MockBlockAPI)
blockStateMock.On("GetJustification", mock.AnythingOfType("common.Hash")).Return(mockedJustBytes, nil)
blockStateMock.On("UnregisterFinalisedChannel", mock.AnythingOfType("uint8"))
blockStateMock.On("FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))
wsconn.BlockAPI = blockStateMock

finchannel := make(chan *types.FinalisationInfo)
Expand Down
Loading