diff --git a/pkg/service/db.go b/pkg/service/db.go index e8d1b7cc..133ebaf9 100644 --- a/pkg/service/db.go +++ b/pkg/service/db.go @@ -144,16 +144,30 @@ func (r *DBListRepo) registerWeb() error { } func (r *DBListRepo) AddWalFile(wf WalFile, hasFullAccess bool) { + r.allWalFileMut.Lock() r.allWalFiles[wf.GetUUID()] = wf + r.allWalFileMut.Unlock() + if hasFullAccess { + r.syncWalFileMut.Lock() r.syncWalFiles[wf.GetUUID()] = wf + r.syncWalFileMut.Unlock() } + if _, ok := wf.(*WebWalFile); ok { + r.webWalFileMut.Lock() r.webWalFiles[wf.GetUUID()] = wf + r.webWalFileMut.Unlock() } } func (r *DBListRepo) DeleteWalFile(name string) { + r.allWalFileMut.Lock() + r.syncWalFileMut.Lock() + r.webWalFileMut.Lock() + defer r.allWalFileMut.Unlock() + defer r.syncWalFileMut.Unlock() + defer r.webWalFileMut.Unlock() delete(r.allWalFiles, name) delete(r.syncWalFiles, name) delete(r.webWalFiles, name) diff --git a/pkg/service/service.go b/pkg/service/service.go index a2458c67..1052f259 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -82,10 +82,13 @@ type DBListRepo struct { friendsLastPushDT int64 // TODO better naming convention - LocalWalFile LocalWalFile - webWalFiles map[string]WalFile - allWalFiles map[string]WalFile - syncWalFiles map[string]WalFile + LocalWalFile LocalWalFile + webWalFiles map[string]WalFile + allWalFiles map[string]WalFile + syncWalFiles map[string]WalFile + webWalFileMut *sync.RWMutex + allWalFileMut *sync.RWMutex + syncWalFileMut *sync.RWMutex processedPartialWals map[string]struct{} processedPartialWalsLock *sync.Mutex @@ -113,9 +116,12 @@ func NewDBListRepo(localWalFile LocalWalFile, webTokenStore WebTokenStore, syncF collabMapLock: &sync.Mutex{}, - webWalFiles: make(map[string]WalFile), - allWalFiles: make(map[string]WalFile), - syncWalFiles: make(map[string]WalFile), + webWalFiles: make(map[string]WalFile), + allWalFiles: make(map[string]WalFile), + syncWalFiles: make(map[string]WalFile), + webWalFileMut: &sync.RWMutex{}, + allWalFileMut: &sync.RWMutex{}, + syncWalFileMut: &sync.RWMutex{}, processedPartialWals: make(map[string]struct{}), processedPartialWalsLock: &sync.Mutex{}, diff --git a/pkg/service/wal.go b/pkg/service/wal.go index f133c962..976e961a 100644 --- a/pkg/service/wal.go +++ b/pkg/service/wal.go @@ -395,9 +395,7 @@ func (r *DBListRepo) generateFriendChangeEvents(e EventLog, item *ListItem) { delete(r.friends[email], key) if len(r.friends[email]) == 0 { delete(r.friends, email) - if _, exists := r.allWalFiles[email]; exists { - r.DeleteWalFile(email) - } + r.DeleteWalFile(email) } }() } @@ -563,7 +561,9 @@ func (r *DBListRepo) Replay(partialWal *[]EventLog) error { if len(e.Line) > 0 { for f := range r.getFriendsFromLine(e.Line) { if _, isFriend := r.friends[f]; isFriend { + r.allWalFileMut.RLock() wf, _ := r.allWalFiles[f] + r.allWalFileMut.RUnlock() wf.SetProcessedEvent(key) } } @@ -957,6 +957,10 @@ func checkWalIntegrity(wal *[]EventLog) (*ListItem, []*ListItem, error) { webWalFiles: make(map[string]WalFile), allWalFiles: make(map[string]WalFile), syncWalFiles: make(map[string]WalFile), + webWalFileMut: &sync.RWMutex{}, + allWalFileMut: &sync.RWMutex{}, + syncWalFileMut: &sync.RWMutex{}, + friends: make(map[string]map[string]int64), friendsMapLock: sync.Mutex{}, } @@ -1489,6 +1493,8 @@ func (r *DBListRepo) flushPartialWals(el []EventLog, sync bool) { //log.Print("Flushing...") if len(el) > 0 { randomUUID := fmt.Sprintf("%v%v", r.uuid, generateUUID()) + r.allWalFileMut.RLock() + defer r.allWalFileMut.RUnlock() for _, wf := range r.allWalFiles { if sync { // TODO Use waitgroups @@ -1552,11 +1558,15 @@ func (r *DBListRepo) startSync(walChan chan *[]EventLog) error { log.Fatal(err) } if len(localFileNames) > 1 { - for _, wf := range r.allWalFiles { - if wf != r.LocalWalFile { - go func(wf WalFile) { r.push(r.log, wf, "") }(wf) + func() { + r.allWalFileMut.RLock() + defer r.allWalFileMut.RUnlock() + for _, wf := range r.allWalFiles { + if wf != r.LocalWalFile { + go func(wf WalFile) { r.push(r.log, wf, "") }(wf) + } } - } + }() hasRunInitialSync = true } @@ -1638,21 +1648,25 @@ func (r *DBListRepo) startSync(walChan chan *[]EventLog) error { for { e := <-r.localCursorMoveChan if r.web.wsConn != nil { - for _, wf := range r.webWalFiles { - if wf.GetUUID() != "" { - func() { - m := websocketMessage{ - Action: "position", - UUID: wf.GetUUID(), - Key: e.listItemKey, - UnixNanoTime: e.unixNanoTime, - } - webRefreshMut.RLock() - defer webRefreshMut.RUnlock() - r.web.pushWebsocket(m) - }() + func() { + r.webWalFileMut.RLock() + defer r.webWalFileMut.RUnlock() + for _, wf := range r.webWalFiles { + if wf.GetUUID() != "" { + func() { + m := websocketMessage{ + Action: "position", + UUID: wf.GetUUID(), + Key: e.listItemKey, + UnixNanoTime: e.unixNanoTime, + } + webRefreshMut.RLock() + defer webRefreshMut.RUnlock() + r.web.pushWebsocket(m) + }() + } } - } + }() } else { time.Sleep(5 * time.Second) } @@ -1686,27 +1700,31 @@ func (r *DBListRepo) startSync(walChan chan *[]EventLog) error { // Main sync event loop go func() { - i := 0 + i := 2 for { var el *[]EventLog // Every fourth iteration is a `gather` select { case <-syncTriggerChan: - syncWalFiles := []WalFile{} - for _, wf := range r.syncWalFiles { - syncWalFiles = append(syncWalFiles, wf) - } - if i < 3 { - if el, err = r.pull(syncWalFiles); err != nil { - log.Fatal(err) + func() { + r.syncWalFileMut.RLock() + defer r.syncWalFileMut.RUnlock() + syncWalFiles := []WalFile{} + for _, wf := range r.syncWalFiles { + syncWalFiles = append(syncWalFiles, wf) } - i++ - } else { - if el, err = r.gather(syncWalFiles, false); err != nil { - log.Fatal(err) + if i < 3 { + if el, err = r.pull(syncWalFiles); err != nil { + log.Fatal(err) + } + i++ + } else { + if el, err = r.gather(syncWalFiles, false); err != nil { + log.Fatal(err) + } + i = 0 } - i = 0 - } + }() } // Currently even empty event logs will trigger a client refresh which is very wasteful, so only publish to // the channel if not empty @@ -1733,27 +1751,31 @@ func (r *DBListRepo) startSync(walChan chan *[]EventLog) error { case e := <-r.eventsChan: // Write in real time to the websocket, if present if r.web != nil { - for _, wf := range r.webWalFiles { - // TODO uuid is a hack to work around the GetUUID stubs I have in place atm: - if wf.GetUUID() != "" { - matchedEventLog := r.getMatchedWal(&[]EventLog{e}, wf) - if len(*matchedEventLog) > 0 { - // There are only single events, so get the zero index - b := buildByteWal(&[]EventLog{(*matchedEventLog)[0]}) - b64Wal := base64.StdEncoding.EncodeToString(b.Bytes()) - m := websocketMessage{ - Action: "wal", - UUID: wf.GetUUID(), - Wal: b64Wal, + func() { + r.webWalFileMut.RLock() + defer r.webWalFileMut.RUnlock() + for _, wf := range r.webWalFiles { + // TODO uuid is a hack to work around the GetUUID stubs I have in place atm: + if wf.GetUUID() != "" { + matchedEventLog := r.getMatchedWal(&[]EventLog{e}, wf) + if len(*matchedEventLog) > 0 { + // There are only single events, so get the zero index + b := buildByteWal(&[]EventLog{(*matchedEventLog)[0]}) + b64Wal := base64.StdEncoding.EncodeToString(b.Bytes()) + m := websocketMessage{ + Action: "wal", + UUID: wf.GetUUID(), + Wal: b64Wal, + } + func() { + webRefreshMut.RLock() + defer webRefreshMut.RUnlock() + r.web.pushWebsocket(m) + }() } - func() { - webRefreshMut.RLock() - defer webRefreshMut.RUnlock() - r.web.pushWebsocket(m) - }() } } - } + }() // Emit any remote updates if web active and local changes have occurred r.emitRemoteUpdate() @@ -1763,12 +1785,7 @@ func (r *DBListRepo) startSync(walChan chan *[]EventLog) error { case <-pushTriggerChan: // On ticks, Flush what we've aggregated to all walfiles, and then reset the // ephemeral log. If empty, skip. - // We pass by reference, so we'll need to create a copy prior to sending to `push` - // otherwise the underlying el may change before `push` has a chance to process it - // If we start passing by value later on, this won't be required (as go will pass - // copies by default, I think). - elCopy := el - r.flushPartialWals(elCopy, false) + r.flushPartialWals(el, false) el = []EventLog{} go func() { time.Sleep(time.Second * 30)