Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
pss: fix concurrent map read and map write on pubKeyRWPool (#1870)
Browse files Browse the repository at this point in the history
  • Loading branch information
janos committed Oct 14, 2019
1 parent 2cba431 commit 6dfe222
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions pss/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type Protocol struct {
symKeyRWPool map[string]p2p.MsgReadWriter
Asymmetric bool
Symmetric bool
RWPoolMu sync.Mutex
poolMu sync.RWMutex
}

// Activates devp2p emulation over a specific pss topic
Expand Down Expand Up @@ -183,28 +183,39 @@ func (p *Protocol) Handle(msg []byte, peer *p2p.Peer, asymmetric bool, keyid str
if err != nil {
return fmt.Errorf("could not decode pssmsg")
}

if asymmetric {
if p.pubKeyRWPool[keyid] == nil {
p.poolMu.RLock()
v := p.pubKeyRWPool[keyid]
p.poolMu.RUnlock()
if v == nil {
return fmt.Errorf("handle called on nil MsgReadWriter for key " + keyid)
}
vrw = p.pubKeyRWPool[keyid].(*PssReadWriter)
vrw = v.(*PssReadWriter)
} else {
if p.symKeyRWPool[keyid] == nil {
p.poolMu.RLock()
v := p.symKeyRWPool[keyid]
p.poolMu.RUnlock()
if v == nil {
return fmt.Errorf("handle called on nil MsgReadWriter for key " + keyid)
}
vrw = p.symKeyRWPool[keyid].(*PssReadWriter)
vrw = v.(*PssReadWriter)
}
vrw.injectMsg(pmsg)
return nil
}

// check if (peer) symmetric key is currently registered with this topic
func (p *Protocol) isActiveSymKey(key string, topic message.Topic) bool {
p.poolMu.RLock()
defer p.poolMu.RUnlock()
return p.symKeyRWPool[key] != nil
}

// check if (peer) asymmetric key is currently registered with this topic
func (p *Protocol) isActiveAsymKey(key string, topic message.Topic) bool {
p.poolMu.RLock()
defer p.poolMu.RUnlock()
return p.pubKeyRWPool[key] != nil
}

Expand Down Expand Up @@ -245,16 +256,16 @@ func (p *Protocol) AddPeer(peer *p2p.Peer, topic message.Topic, asymmetric bool,
if !p.Pss.isPubKeyStored(key) {
return nil, fmt.Errorf("asym key does not exist: %s", key)
}
p.RWPoolMu.Lock()
p.poolMu.Lock()
p.pubKeyRWPool[key] = rw
p.RWPoolMu.Unlock()
p.poolMu.Unlock()
} else {
if !p.Pss.isSymKeyStored(key) {
return nil, fmt.Errorf("symkey does not exist: %s", key)
}
p.RWPoolMu.Lock()
p.poolMu.Lock()
p.symKeyRWPool[key] = rw
p.RWPoolMu.Unlock()
p.poolMu.Unlock()
}
go func() {
err := p.proto.Run(peer, rw)
Expand All @@ -265,8 +276,8 @@ func (p *Protocol) AddPeer(peer *p2p.Peer, topic message.Topic, asymmetric bool,

func (p *Protocol) RemovePeer(asymmetric bool, key string) {
log.Debug("closing pss peer", "asym", asymmetric, "key", key)
p.RWPoolMu.Lock()
defer p.RWPoolMu.Unlock()
p.poolMu.Lock()
defer p.poolMu.Unlock()
if asymmetric {
rw := p.pubKeyRWPool[key].(*PssReadWriter)
rw.closed = true
Expand Down

0 comments on commit 6dfe222

Please sign in to comment.