Skip to content

Commit

Permalink
Wrap WalFile map ops in RWMutex's
Browse files Browse the repository at this point in the history
  • Loading branch information
Sambigeara committed Oct 27, 2021
1 parent b8278e0 commit cf42d5a
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 66 deletions.
14 changes: 14 additions & 0 deletions pkg/service/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,30 @@ func (r *DBListRepo) registerWeb() error {
}

func (r *DBListRepo) AddWalFile(wf WalFile, hasFullAccess bool) {
r.allWalFileMut.Lock()
r.allWalFiles[wf.GetUUID()] = wf
r.allWalFileMut.Unlock()

if hasFullAccess {
r.syncWalFileMut.Lock()
r.syncWalFiles[wf.GetUUID()] = wf
r.syncWalFileMut.Unlock()
}

if _, ok := wf.(*WebWalFile); ok {
r.webWalFileMut.Lock()
r.webWalFiles[wf.GetUUID()] = wf
r.webWalFileMut.Unlock()
}
}

func (r *DBListRepo) DeleteWalFile(name string) {
r.allWalFileMut.Lock()
r.syncWalFileMut.Lock()
r.webWalFileMut.Lock()
defer r.allWalFileMut.Unlock()
defer r.syncWalFileMut.Unlock()
defer r.webWalFileMut.Unlock()
delete(r.allWalFiles, name)
delete(r.syncWalFiles, name)
delete(r.webWalFiles, name)
Expand Down
20 changes: 13 additions & 7 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,13 @@ type DBListRepo struct {
friendsLastPushDT int64

// TODO better naming convention
LocalWalFile LocalWalFile
webWalFiles map[string]WalFile
allWalFiles map[string]WalFile
syncWalFiles map[string]WalFile
LocalWalFile LocalWalFile
webWalFiles map[string]WalFile
allWalFiles map[string]WalFile
syncWalFiles map[string]WalFile
webWalFileMut *sync.RWMutex
allWalFileMut *sync.RWMutex
syncWalFileMut *sync.RWMutex

processedPartialWals map[string]struct{}
processedPartialWalsLock *sync.Mutex
Expand Down Expand Up @@ -113,9 +116,12 @@ func NewDBListRepo(localWalFile LocalWalFile, webTokenStore WebTokenStore, syncF

collabMapLock: &sync.Mutex{},

webWalFiles: make(map[string]WalFile),
allWalFiles: make(map[string]WalFile),
syncWalFiles: make(map[string]WalFile),
webWalFiles: make(map[string]WalFile),
allWalFiles: make(map[string]WalFile),
syncWalFiles: make(map[string]WalFile),
webWalFileMut: &sync.RWMutex{},
allWalFileMut: &sync.RWMutex{},
syncWalFileMut: &sync.RWMutex{},

processedPartialWals: make(map[string]struct{}),
processedPartialWalsLock: &sync.Mutex{},
Expand Down
135 changes: 76 additions & 59 deletions pkg/service/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,7 @@ func (r *DBListRepo) generateFriendChangeEvents(e EventLog, item *ListItem) {
delete(r.friends[email], key)
if len(r.friends[email]) == 0 {
delete(r.friends, email)
if _, exists := r.allWalFiles[email]; exists {
r.DeleteWalFile(email)
}
r.DeleteWalFile(email)
}
}()
}
Expand Down Expand Up @@ -563,7 +561,9 @@ func (r *DBListRepo) Replay(partialWal *[]EventLog) error {
if len(e.Line) > 0 {
for f := range r.getFriendsFromLine(e.Line) {
if _, isFriend := r.friends[f]; isFriend {
r.allWalFileMut.RLock()
wf, _ := r.allWalFiles[f]
r.allWalFileMut.RUnlock()
wf.SetProcessedEvent(key)
}
}
Expand Down Expand Up @@ -957,6 +957,10 @@ func checkWalIntegrity(wal *[]EventLog) (*ListItem, []*ListItem, error) {
webWalFiles: make(map[string]WalFile),
allWalFiles: make(map[string]WalFile),
syncWalFiles: make(map[string]WalFile),
webWalFileMut: &sync.RWMutex{},
allWalFileMut: &sync.RWMutex{},
syncWalFileMut: &sync.RWMutex{},

friends: make(map[string]map[string]int64),
friendsMapLock: sync.Mutex{},
}
Expand Down Expand Up @@ -1489,6 +1493,8 @@ func (r *DBListRepo) flushPartialWals(el []EventLog, sync bool) {
//log.Print("Flushing...")
if len(el) > 0 {
randomUUID := fmt.Sprintf("%v%v", r.uuid, generateUUID())
r.allWalFileMut.RLock()
defer r.allWalFileMut.RUnlock()
for _, wf := range r.allWalFiles {
if sync {
// TODO Use waitgroups
Expand Down Expand Up @@ -1552,11 +1558,15 @@ func (r *DBListRepo) startSync(walChan chan *[]EventLog) error {
log.Fatal(err)
}
if len(localFileNames) > 1 {
for _, wf := range r.allWalFiles {
if wf != r.LocalWalFile {
go func(wf WalFile) { r.push(r.log, wf, "") }(wf)
func() {
r.allWalFileMut.RLock()
defer r.allWalFileMut.RUnlock()
for _, wf := range r.allWalFiles {
if wf != r.LocalWalFile {
go func(wf WalFile) { r.push(r.log, wf, "") }(wf)
}
}
}
}()
hasRunInitialSync = true
}

Expand Down Expand Up @@ -1638,21 +1648,25 @@ func (r *DBListRepo) startSync(walChan chan *[]EventLog) error {
for {
e := <-r.localCursorMoveChan
if r.web.wsConn != nil {
for _, wf := range r.webWalFiles {
if wf.GetUUID() != "" {
func() {
m := websocketMessage{
Action: "position",
UUID: wf.GetUUID(),
Key: e.listItemKey,
UnixNanoTime: e.unixNanoTime,
}
webRefreshMut.RLock()
defer webRefreshMut.RUnlock()
r.web.pushWebsocket(m)
}()
func() {
r.webWalFileMut.RLock()
defer r.webWalFileMut.RUnlock()
for _, wf := range r.webWalFiles {
if wf.GetUUID() != "" {
func() {
m := websocketMessage{
Action: "position",
UUID: wf.GetUUID(),
Key: e.listItemKey,
UnixNanoTime: e.unixNanoTime,
}
webRefreshMut.RLock()
defer webRefreshMut.RUnlock()
r.web.pushWebsocket(m)
}()
}
}
}
}()
} else {
time.Sleep(5 * time.Second)
}
Expand Down Expand Up @@ -1686,27 +1700,31 @@ func (r *DBListRepo) startSync(walChan chan *[]EventLog) error {

// Main sync event loop
go func() {
i := 0
i := 2
for {
var el *[]EventLog
// Every fourth iteration is a `gather`
select {
case <-syncTriggerChan:
syncWalFiles := []WalFile{}
for _, wf := range r.syncWalFiles {
syncWalFiles = append(syncWalFiles, wf)
}
if i < 3 {
if el, err = r.pull(syncWalFiles); err != nil {
log.Fatal(err)
func() {
r.syncWalFileMut.RLock()
defer r.syncWalFileMut.RUnlock()
syncWalFiles := []WalFile{}
for _, wf := range r.syncWalFiles {
syncWalFiles = append(syncWalFiles, wf)
}
i++
} else {
if el, err = r.gather(syncWalFiles, false); err != nil {
log.Fatal(err)
if i < 3 {
if el, err = r.pull(syncWalFiles); err != nil {
log.Fatal(err)
}
i++
} else {
if el, err = r.gather(syncWalFiles, false); err != nil {
log.Fatal(err)
}
i = 0
}
i = 0
}
}()
}
// Currently even empty event logs will trigger a client refresh which is very wasteful, so only publish to
// the channel if not empty
Expand All @@ -1733,27 +1751,31 @@ func (r *DBListRepo) startSync(walChan chan *[]EventLog) error {
case e := <-r.eventsChan:
// Write in real time to the websocket, if present
if r.web != nil {
for _, wf := range r.webWalFiles {
// TODO uuid is a hack to work around the GetUUID stubs I have in place atm:
if wf.GetUUID() != "" {
matchedEventLog := r.getMatchedWal(&[]EventLog{e}, wf)
if len(*matchedEventLog) > 0 {
// There are only single events, so get the zero index
b := buildByteWal(&[]EventLog{(*matchedEventLog)[0]})
b64Wal := base64.StdEncoding.EncodeToString(b.Bytes())
m := websocketMessage{
Action: "wal",
UUID: wf.GetUUID(),
Wal: b64Wal,
func() {
r.webWalFileMut.RLock()
defer r.webWalFileMut.RUnlock()
for _, wf := range r.webWalFiles {
// TODO uuid is a hack to work around the GetUUID stubs I have in place atm:
if wf.GetUUID() != "" {
matchedEventLog := r.getMatchedWal(&[]EventLog{e}, wf)
if len(*matchedEventLog) > 0 {
// There are only single events, so get the zero index
b := buildByteWal(&[]EventLog{(*matchedEventLog)[0]})
b64Wal := base64.StdEncoding.EncodeToString(b.Bytes())
m := websocketMessage{
Action: "wal",
UUID: wf.GetUUID(),
Wal: b64Wal,
}
func() {
webRefreshMut.RLock()
defer webRefreshMut.RUnlock()
r.web.pushWebsocket(m)
}()
}
func() {
webRefreshMut.RLock()
defer webRefreshMut.RUnlock()
r.web.pushWebsocket(m)
}()
}
}
}
}()

// Emit any remote updates if web active and local changes have occurred
r.emitRemoteUpdate()
Expand All @@ -1763,12 +1785,7 @@ func (r *DBListRepo) startSync(walChan chan *[]EventLog) error {
case <-pushTriggerChan:
// On ticks, Flush what we've aggregated to all walfiles, and then reset the
// ephemeral log. If empty, skip.
// We pass by reference, so we'll need to create a copy prior to sending to `push`
// otherwise the underlying el may change before `push` has a chance to process it
// If we start passing by value later on, this won't be required (as go will pass
// copies by default, I think).
elCopy := el
r.flushPartialWals(elCopy, false)
r.flushPartialWals(el, false)
el = []EventLog{}
go func() {
time.Sleep(time.Second * 30)
Expand Down

0 comments on commit cf42d5a

Please sign in to comment.