Skip to content

Commit

Permalink
Protect nilList, move to pl so it's not shared when running sim
Browse files Browse the repository at this point in the history
  • Loading branch information
factom-clay committed May 17, 2018
1 parent 6bc07b6 commit 40679ca
Showing 1 changed file with 49 additions and 23 deletions.
72 changes: 49 additions & 23 deletions state/processList.go
Expand Up @@ -61,19 +61,19 @@ type ProcessList struct {
(discard DBlock if > 1/2 have sig differences) */
// messages processed in this list
OldMsgs map[[32]byte]interfaces.IMsg
oldmsgslock *sync.Mutex
oldmsgslock sync.Mutex

// Chains that are executed, but not processed. There is a small window of a pending chain that the ack
// will pass and the chainhead will fail. This covers that window. This is only used by WSAPI,
// do not use it anywhere internally.
PendingChainHeads *SafeMsgMap

OldAcks map[[32]byte]interfaces.IMsg
oldackslock *sync.Mutex
oldackslock sync.Mutex

// Entry Blocks added within 10 minutes (follower and leader)
NewEBlocks map[[32]byte]interfaces.IEntryBlock
neweblockslock *sync.Mutex
neweblockslock sync.Mutex

NewEntriesMutex sync.RWMutex
NewEntries map[[32]byte]interfaces.IEntry
Expand Down Expand Up @@ -104,6 +104,10 @@ type ProcessList struct {
asks chan askRef // Requests to ask for missing messages
adds chan plRef // notices of slots filled in the process list
done chan struct{} // Notice that this DBHeight is done

// debug -- highest nil seen and reported in processlist
nilListMutex sync.Mutex
nilList map[int]int
}

var _ interfaces.IProcessList = (*ProcessList)(nil)
Expand Down Expand Up @@ -553,12 +557,26 @@ func (p *ProcessList) GetOldMsgs(key interfaces.IHash) interfaces.IMsg {
if p == nil {
return nil
}
if p.oldmsgslock == nil {
return nil
}
p.oldmsgslock.Lock()
defer p.oldmsgslock.Unlock()
return p.OldMsgs[key.Fixed()]
m, ok := p.OldMsgs[key.Fixed()]
if !ok {
return nil
}
return m
}

func (p *ProcessList) GetOldAck(key interfaces.IHash) interfaces.IMsg {
if p == nil {
return nil
}
p.oldackslock.Lock()
defer p.oldackslock.Unlock()
a, ok := p.OldAcks[key.Fixed()]
if !ok {
return nil
}
return a
}

func (p *ProcessList) AddNewEBlocks(key interfaces.IHash, value interfaces.IEntryBlock) {
Expand Down Expand Up @@ -829,9 +847,11 @@ func (p *ProcessList) TrimVMList(height uint32, vmIndex int) {
p.VMs[vmIndex].List = p.VMs[vmIndex].List[:height]
p.VMs[vmIndex].HighestAsk = int(height) // make sure we will ask again for nil's above this height
if p.State.DebugExec() {
if nillist[vmIndex] > int(height-1) {
nillist[vmIndex] = int(height - 1) // Drag the highest nil logged back before this nil
p.nilListMutex.Lock()
if p.nilList[vmIndex] > int(height-1) {
p.nilList[vmIndex] = int(height - 1) // Drag the highest nil logged back before this nil
}
p.nilListMutex.Unlock()
}
}
}
Expand Down Expand Up @@ -906,8 +926,6 @@ func (p *ProcessList) decodeState(Syncing bool, DBSig bool, EOM bool, DBSigDone

}

var nillist map[int]int = make(map[int]int)

var extraDebug bool = false

// Process messages and update our state.
Expand Down Expand Up @@ -962,10 +980,12 @@ func (p *ProcessList) Process(state *State) (progress bool) {
}
}
if p.State.DebugExec() {
if nillist[i] < j {
p.nilListMutex.Lock()
if p.nilList[i] < j {
p.State.LogPrintf("process", "%d nils at %v/%v/%v", cnt, p.DBHeight, i, j)
nillist[i] = j
p.nilList[i] = j
}
p.nilListMutex.Unlock()
}

// p.State.LogPrintf("process","nil at %v/%v/%v", p.DBHeight, i, j)
Expand All @@ -988,9 +1008,11 @@ func (p *ProcessList) Process(state *State) (progress bool) {
vm.List[j] = nil
vm.HighestAsk = j // have to be able to ask for this again
if p.State.DebugExec() {
if nillist[i] > j-1 {
nillist[i] = j - 1 // Drag the highest nil logged back before this nil
p.nilListMutex.Lock()
if p.nilList[i] > j-1 {
p.nilList[i] = j - 1 // Drag the highest nil logged back before this nil
}
p.nilListMutex.Unlock()
}
//p.State.AddStatus(fmt.Sprintf("ProcessList.go Process: Error computing serial hash at dbht: %d vm %d vm-height %d ", p.DBHeight, i, j))
p.Ask(i, uint32(j), 3000) // 3 second delay
Expand Down Expand Up @@ -1036,9 +1058,11 @@ func (p *ProcessList) Process(state *State) (progress bool) {
vm.List[j] = nil // If we have seen this message, we don't process it again. Ever.
vm.HighestAsk = j // have to be able to ask for this again
if p.State.DebugExec() {
if nillist[i] > j-1 {
nillist[i] = j - 1 // Drag the highest nil logged back before this nil
p.nilListMutex.Lock()
if p.nilList[i] > j-1 {
p.nilList[i] = j - 1 // Drag the highest nil logged back before this nil
}
p.nilListMutex.Unlock()
}
p.Ask(i, uint32(j), 3000) // 3 second delay
// If we ask won't we just get the same thing back?
Expand Down Expand Up @@ -1215,8 +1239,11 @@ func (p *ProcessList) AddToProcessList(ack *messages.Ack, m interfaces.IMsg) {
p.VMs[ack.VMIndex].List[ack.Height] = m
p.VMs[ack.VMIndex].ListAck[ack.Height] = ack

delete(nillist, int(ack.Height)) // Notify if this is ever nil again

if p.State.DebugExec() {
p.nilListMutex.Lock()
delete(p.nilList, int(ack.Height)) // Notify if this is ever nil again
p.nilListMutex.Unlock()
}
p.AddOldMsgs(m)
p.OldAcks[msgHash.Fixed()] = ack

Expand Down Expand Up @@ -1390,12 +1417,8 @@ func NewProcessList(state interfaces.IState, previous *ProcessList, dbheight uin

pl.PendingChainHeads = NewSafeMsgMap("PendingChainHeads", pl.State)
pl.OldMsgs = make(map[[32]byte]interfaces.IMsg)
pl.oldmsgslock = new(sync.Mutex)
pl.OldAcks = make(map[[32]byte]interfaces.IMsg)
pl.oldackslock = new(sync.Mutex)

pl.NewEBlocks = make(map[[32]byte]interfaces.IEntryBlock)
pl.neweblockslock = new(sync.Mutex)
pl.NewEntries = make(map[[32]byte]interfaces.IEntry)

pl.DBSignatures = make([]DBSig, 0)
Expand Down Expand Up @@ -1430,6 +1453,9 @@ func NewProcessList(state interfaces.IState, previous *ProcessList, dbheight uin
pl.adds = nil
pl.done = nil
}

pl.nilList = make(map[int]int)

return pl
}

Expand Down

0 comments on commit 40679ca

Please sign in to comment.