Skip to content

Commit

Permalink
Improvements to avoid race conditions for data sent to the frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
Kasper Skårhøj committed Aug 2, 2022
1 parent 83b05d8 commit 880fca6
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 49 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -22,6 +22,7 @@ require (
github.com/antchfx/xpath v0.0.0-20170515025933-1f3266e77307 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/disintegration/gift v1.2.1 // indirect
github.com/jinzhu/copier v0.3.5 // indirect
github.com/mattn/go-colorable v0.1.9 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/miekg/dns v1.1.27 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -42,6 +42,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grandcat/zeroconf v1.0.0 h1:uHhahLBKqwWBV6WZUDAT71044vwOTL+McW0mBJvo6kE=
github.com/grandcat/zeroconf v1.0.0/go.mod h1:lTKmG1zh86XyCoUeIHSA4FJMBwCJiQmGfcP2PdzytEs=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
Expand Down
2 changes: 1 addition & 1 deletion main.go
Expand Up @@ -47,7 +47,7 @@ func main() {

// Start webserver:
port := 8080
log.Infof("Starting server on localhost:%d\n", port)
log.Infof("Starting webserver on localhost:%d\n", port)
setupRoutes()
go http.ListenAndServe(fmt.Sprintf(":%d", port), nil)

Expand Down
4 changes: 1 addition & 3 deletions websocket.go
Expand Up @@ -186,14 +186,12 @@ func reader(conn *websocket.Conn) {
wsslice.Iter(func(w *wsclient) { w.msgToClient <- lastState })
lastStateMu.Unlock()
case "SendIndex":
ZeroconfEntriesMu.Lock()
wsslice.Iter(func(w *wsclient) {
w.msgToClient <- &wsToClient{
ZeroconfEntries: ZeroconfEntries,
ZeroconfEntries: ZEntries.Copy(),
Time: getTimeString(),
}
})
ZeroconfEntriesMu.Unlock()
wsslice.Iter(func(w *wsclient) {
w.msgToClient <- &wsToClient{
ConnectedSignal: isConnected.Load(),
Expand Down
125 changes: 80 additions & 45 deletions zeroconf.go
Expand Up @@ -16,6 +16,7 @@ import (
rwp "github.com/SKAARHOJ/rawpanel-lib/ibeam_rawpanel"
topology "github.com/SKAARHOJ/rawpanel-lib/topology"
"github.com/grandcat/zeroconf"
"github.com/jinzhu/copier"
log "github.com/s00500/env_logger"
"github.com/tatsushid/go-fastping"
"go.uber.org/atomic"
Expand Down Expand Up @@ -65,10 +66,45 @@ type RawPanelDetails struct {
DeltaTime int
}

var ZeroconfEntries []*ZeroconfEntry
var ZeroconfEntriesMu sync.Mutex
var UpdateWS atomic.Bool

var ZEntries ZeroconfEntries

type ZeroconfEntries struct {
sync.Mutex

entries []*ZeroconfEntry
}

func (ze *ZeroconfEntries) DeepLock() {
ze.Lock()
for _, entry := range ze.entries {
entry.Lock()
}
}

func (ze *ZeroconfEntries) DeepUnlock() {
for _, entry := range ze.entries {
entry.Unlock()
}
ze.Unlock()
}

func (ze *ZeroconfEntries) Copy() []*ZeroconfEntry {
ze.DeepLock()
defer ze.DeepUnlock()

copy := []*ZeroconfEntry{}
for _, entry := range ze.entries {
cp := &ZeroconfEntry{}
err := copier.CopyWithOption(cp, entry, copier.Option{DeepCopy: true, IgnoreEmpty: true})
log.Should(err)
copy = append(copy, cp)
}

return copy
}

func runZeroConfSearch() {

go func() {
Expand All @@ -78,14 +114,12 @@ func runZeroConfSearch() {
if UpdateWS.Load() {
UpdateWS.Store(false)

ZeroconfEntriesMu.Lock()
wsslice.Iter(func(w *wsclient) {
w.msgToClient <- &wsToClient{
ZeroconfEntries: ZeroconfEntries,
ZeroconfEntries: ZEntries.Copy(),
Time: getTimeString(),
}
})
ZeroconfEntriesMu.Unlock()
}
}
}()
Expand Down Expand Up @@ -144,20 +178,21 @@ func zeroconfSearchSession(sessionId int) {
<-ctxRwp.Done()

// Remove old entries
ZeroconfEntriesMu.Lock()
for a := len(ZeroconfEntries); a > 0; a-- {
ZEntries.DeepLock()
for a := len(ZEntries.entries); a > 0; a-- {
i := a - 1
if sessionId-ZeroconfEntries[i].SessionId > 4 {
ZeroconfEntries = append(ZeroconfEntries[:i], ZeroconfEntries[i+1:]...)

if sessionId-ZEntries.entries[i].SessionId > 4 {
ZEntries.entries = append(ZEntries.entries[:i], ZEntries.entries[i+1:]...)
UpdateWS.Store(true)
}
}
ZeroconfEntriesMu.Unlock()
ZEntries.DeepUnlock()
}

func addRWPEntry(addThisEntry *zeroconf.ServiceEntry, sesId int) {
ZeroconfEntriesMu.Lock()
defer ZeroconfEntriesMu.Unlock()
ZEntries.Lock()
defer ZEntries.Unlock()

if len(addThisEntry.AddrIPv4) > 0 {
// Derive some info here:
Expand All @@ -176,34 +211,34 @@ func addRWPEntry(addThisEntry *zeroconf.ServiceEntry, sesId int) {
}

// Search for existing to update:
for i, entry := range ZeroconfEntries {
for i, entry := range ZEntries.entries {
if entry.IPaddr.String() == addThisEntry.AddrIPv4[0].String() &&
entry.Port == addThisEntry.Port {

//fmt.Printf("Updating %v\n", zeroconfEntries[i])
ZeroconfEntries[i].Lock()
//fmt.Printf("Updating %v\n", ZEntries.entries[i])
ZEntries.entries[i].Lock()

ZeroconfEntries[i].IPaddr = addThisEntry.AddrIPv4[0]
ZeroconfEntries[i].Port = addThisEntry.Port
ZeroconfEntries[i].Serial = parts[0]
ZeroconfEntries[i].Model = parts[1]
ZeroconfEntries[i].Name = devicename
ZeroconfEntries[i].Protocol = protocol
ZeroconfEntries[i].SessionId = sesId
ZEntries.entries[i].IPaddr = addThisEntry.AddrIPv4[0]
ZEntries.entries[i].Port = addThisEntry.Port
ZEntries.entries[i].Serial = parts[0]
ZEntries.entries[i].Model = parts[1]
ZEntries.entries[i].Name = devicename
ZEntries.entries[i].Protocol = protocol
ZEntries.entries[i].SessionId = sesId

ZeroconfEntries[i].IsNew = time.Now().Before(ZeroconfEntries[i].createdTime.Add(time.Second * 5))
ZEntries.entries[i].IsNew = time.Now().Before(ZEntries.entries[i].createdTime.Add(time.Second * 5))

if *AggressiveQuery && !ZeroconfEntries[i].AggressiveQueryStarted {
go rawPanelInquery(ZeroconfEntries[i])
if *AggressiveQuery && !ZEntries.entries[i].AggressiveQueryStarted {
go rawPanelInquery(ZEntries.entries[i])
}

ZeroconfEntries[i].Unlock()
ZeroconfEntries = sortEntries(ZeroconfEntries)
ZEntries.entries[i].Unlock()
ZEntries.entries = sortEntries(ZEntries.entries)
UpdateWS.Store(true)

// Pingtime:
ipAddr := addThisEntry.AddrIPv4[0].String()
theEntry := ZeroconfEntries[i]
theEntry := ZEntries.entries[i]
go func() {
pingTime := getPingTimes(ipAddr)
theEntry.Lock()
Expand All @@ -228,8 +263,8 @@ func addRWPEntry(addThisEntry *zeroconf.ServiceEntry, sesId int) {
IsNew: true,
createdTime: time.Now(),
}
ZeroconfEntries = append([]*ZeroconfEntry{newEntry}, ZeroconfEntries...)
ZeroconfEntries = sortEntries(ZeroconfEntries)
ZEntries.entries = append([]*ZeroconfEntry{newEntry}, ZEntries.entries...)
ZEntries.entries = sortEntries(ZEntries.entries)

if *AggressiveQuery {
go rawPanelInquery(newEntry)
Expand All @@ -249,8 +284,8 @@ func addRWPEntry(addThisEntry *zeroconf.ServiceEntry, sesId int) {
}

func addGenericEntry(addThisEntry *zeroconf.ServiceEntry, sesId int) {
ZeroconfEntriesMu.Lock()
defer ZeroconfEntriesMu.Unlock()
ZEntries.Lock()
defer ZEntries.Unlock()

if len(addThisEntry.AddrIPv4) > 0 {

Expand All @@ -273,23 +308,23 @@ func addGenericEntry(addThisEntry *zeroconf.ServiceEntry, sesId int) {
foundIP := false
foundOtherPort := false
foundGeneric := false
for i, entry := range ZeroconfEntries {
for i, entry := range ZEntries.entries {
if entry.IPaddr.String() == addThisEntry.AddrIPv4[0].String() {

// For any port, update skaarOS:
ZeroconfEntries[i].Lock()
ZeroconfEntries[i].SkaarOS = skaarOS
ZeroconfEntries[i].IsNew = time.Now().Before(ZeroconfEntries[i].createdTime.Add(time.Second * 5))
ZeroconfEntries[i].Unlock()
ZEntries.entries[i].Lock()
ZEntries.entries[i].SkaarOS = skaarOS
ZEntries.entries[i].IsNew = time.Now().Before(ZEntries.entries[i].createdTime.Add(time.Second * 5))
ZEntries.entries[i].Unlock()

// Pingtime and session for true non-rwp devices:
if entry.Port == -1 {
ZeroconfEntries[i].Lock()
ZeroconfEntries[i].SessionId = sesId
ZeroconfEntries[i].Unlock()
ZEntries.entries[i].Lock()
ZEntries.entries[i].SessionId = sesId
ZEntries.entries[i].Unlock()

ipAddr := addThisEntry.AddrIPv4[0].String()
theEntry := ZeroconfEntries[i]
theEntry := ZEntries.entries[i]
go func() {
pingTime := getPingTimes(ipAddr)
theEntry.Lock()
Expand All @@ -309,9 +344,9 @@ func addGenericEntry(addThisEntry *zeroconf.ServiceEntry, sesId int) {

// Remove generic entry if other port was found:
if foundOtherPort && foundGeneric {
for i, entry := range ZeroconfEntries {
for i, entry := range ZEntries.entries {
if entry.IPaddr.String() == addThisEntry.AddrIPv4[0].String() && entry.Port == -1 {
ZeroconfEntries = append(ZeroconfEntries[:i], ZeroconfEntries[i+1:]...)
ZEntries.entries = append(ZEntries.entries[:i], ZEntries.entries[i+1:]...)
break
}
}
Expand All @@ -327,8 +362,8 @@ func addGenericEntry(addThisEntry *zeroconf.ServiceEntry, sesId int) {
IsNew: true,
createdTime: time.Now(),
}
ZeroconfEntries = append([]*ZeroconfEntry{newEntry}, ZeroconfEntries...)
ZeroconfEntries = sortEntries(ZeroconfEntries)
ZEntries.entries = append([]*ZeroconfEntry{newEntry}, ZEntries.entries...)
ZEntries.entries = sortEntries(ZEntries.entries)

// Pingtime:
go func() {
Expand Down

0 comments on commit 880fca6

Please sign in to comment.