Skip to content

Commit

Permalink
Moved latest event caches from DBListRepo to crdtTree
Browse files Browse the repository at this point in the history
  • Loading branch information
Sambigeara committed Oct 1, 2022
1 parent 2bd3875 commit 1355e37
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 91 deletions.
2 changes: 0 additions & 2 deletions cmd/debug/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"fmt"
"log"
"os"
"runtime"
Expand Down Expand Up @@ -40,6 +39,5 @@ func main() {
r.TestPullLocal(c)
matches, _, _ := r.Match([][]rune{}, true, "", 0, 0)
_ = matches
fmt.Println(r.Tree())
runtime.Breakpoint()
}
32 changes: 3 additions & 29 deletions pkg/service/crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,31 +580,17 @@ func (a *EventLog) before(b EventLog) bool {
return vectorClockBefore(a.VectorClock, b.VectorClock)
}

func (r *DBListRepo) itemIsLive(item *ListItem) bool {
var k string
if item != nil {
k = item.key
}

latestAddEvent, isAdded := r.addEventSet[k]
latestDeleteEvent, isDeleted := r.deleteEventSet[k]
if isAdded && (!isDeleted || (isAdded && latestDeleteEvent.before(latestAddEvent))) {
return true
}
return false
}

func (r *DBListRepo) processEventLog(e EventLog) (*ListItem, error) {
item := r.getOrCreateListItem(e.ListItemKey)

var eventCache map[string]EventLog
switch e.EventType {
case UpdateEvent:
eventCache = r.addEventSet
eventCache = r.crdt.addEventSet
case DeleteEvent:
eventCache = r.deleteEventSet
eventCache = r.crdt.deleteEventSet
case PositionEvent:
eventCache = r.positionEventSet
eventCache = r.crdt.positionEventSet
}

// Check the event cache and skip if the event is older than the most-recently processed
Expand All @@ -631,8 +617,6 @@ func (r *DBListRepo) processEventLog(e EventLog) (*ListItem, error) {
switch e.EventType {
case UpdateEvent:
err = updateItemFromEvent(item, e, r.email)
//case DeleteEvent:
//r.crdt.del(e)
case PositionEvent:
r.crdt.add(e)
}
Expand Down Expand Up @@ -671,16 +655,6 @@ func updateItemFromEvent(item *ListItem, e EventLog, email string) error {
return nil
}

func (r *DBListRepo) getLatestVectorClock(item *ListItem) map[uuid]int64 {
// if the item is being newly added, the add event comes before the position event, so default to the add event if position is not
// yet set
latestVectorClockEvent, exists := r.positionEventSet[item.key]
if !exists {
latestVectorClockEvent = r.addEventSet[item.key]
}
return latestVectorClockEvent.VectorClock
}

// Replay updates listItems based on the current state of the local WAL logs. It generates or updates the linked list
// which is attached to DBListRepo.Root
func (r *DBListRepo) Replay(partialWal []EventLog) error {
Expand Down
10 changes: 1 addition & 9 deletions pkg/service/crdt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,9 +864,7 @@ func TestCRDTAllPermsMix(t *testing.T) {
// We can't rely on fresh repos each iterations here because OS+file management lags behind and
// causes inconsistencies. Therefore, use the same repo and refresh state
repo.listItemCache = make(map[string]*ListItem)
repo.addEventSet = make(map[string]EventLog)
repo.deleteEventSet = make(map[string]EventLog)
repo.positionEventSet = make(map[string]EventLog)
repo.crdt = newTree()

//if i == 5042 {
// runtime.Breakpoint()
Expand Down Expand Up @@ -975,9 +973,6 @@ func TestCRDTAllPermsMoves(t *testing.T) {
// We can't rely on fresh repos each iterations here because OS+file management lags behind and
// causes inconsistencies. Therefore, use the same repo and refresh state
repo.listItemCache = make(map[string]*ListItem)
repo.addEventSet = make(map[string]EventLog)
repo.deleteEventSet = make(map[string]EventLog)
repo.positionEventSet = make(map[string]EventLog)
repo.crdt = newTree()

repo.Replay(p)
Expand Down Expand Up @@ -1075,9 +1070,6 @@ func TestCRDTAllPermsDeletes(t *testing.T) {
// We can't rely on fresh repos each iterations here because OS+file management lags behind and
// causes inconsistencies. Therefore, use the same repo and refresh state
repo.listItemCache = make(map[string]*ListItem)
repo.addEventSet = make(map[string]EventLog)
repo.deleteEventSet = make(map[string]EventLog)
repo.positionEventSet = make(map[string]EventLog)
repo.crdt = newTree()

repo.Replay(p)
Expand Down
11 changes: 0 additions & 11 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ type DBListRepo struct {
//currentLamportTimestamp int64
vectorClock map[uuid]int64
listItemCache map[string]*ListItem
//processedEventLogCache map[string]struct{}
//listItemProcessedEventLogTypeCache map[EventType]map[string]EventLog
addEventSet, deleteEventSet, positionEventSet map[string]EventLog

crdt *crdtTree

Expand Down Expand Up @@ -104,11 +101,6 @@ func NewDBListRepo(localWalFile LocalWalFile, webTokenStore WebTokenStore) *DBLi
latestWalSchemaID: latestWalSchemaID,
vectorClock: make(map[uuid]int64),
listItemCache: make(map[string]*ListItem),
//processedEventLogCache: make(map[string]struct{}),
//listItemProcessedEventLogTypeCache: make(map[EventType]map[string]EventLog),
addEventSet: make(map[string]EventLog),
deleteEventSet: make(map[string]EventLog),
positionEventSet: make(map[string]EventLog),

crdt: newTree(),

Expand Down Expand Up @@ -486,9 +478,6 @@ func (r *DBListRepo) Match(keys [][]rune, showHidden bool, curKey string, offset
//for cur := range r.getListItems() {
for nodeKey := range r.crdt.traverse() {
cur := r.listItemCache[nodeKey]
if !r.itemIsLive(cur) {
continue
}
//for {
// Nullify match pointers
// TODO centralise this logic, it's too closely coupled with the moveItem logic (if match pointers
Expand Down
6 changes: 3 additions & 3 deletions pkg/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestServiceDelete(t *testing.T) {
t.Errorf("Matches should be len 0")
}

if _, exists := repo.deleteEventSet[k]; !exists {
if _, exists := repo.crdt.deleteEventSet[k]; !exists {
t.Errorf("Delete event should be present in the deleteEventSet")
}
})
Expand Down Expand Up @@ -402,10 +402,10 @@ func TestServiceDelete(t *testing.T) {
t.Errorf("Matches should be len 0 but is %d", l)
}

if _, exists := repo.positionEventSet[k]; !exists {
if _, exists := repo.crdt.positionEventSet[k]; !exists {
t.Errorf("Position event should be present in the positionEventSet")
}
if _, exists := repo.deleteEventSet[k]; !exists {
if _, exists := repo.crdt.deleteEventSet[k]; !exists {
t.Errorf("Delete event should be present in the deleteEventSet")
}
})
Expand Down
59 changes: 22 additions & 37 deletions pkg/service/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const (
)

type crdtTree struct {
cache map[string]*node
cache map[string]*node
addEventSet, deleteEventSet, positionEventSet map[string]EventLog
}

type node struct {
Expand All @@ -37,6 +38,9 @@ func newTree() *crdtTree {
crdtOrphanKey: orphan,
crdtRootKey: root,
},
addEventSet: make(map[string]EventLog),
deleteEventSet: make(map[string]EventLog),
positionEventSet: make(map[string]EventLog),
}
}

Expand All @@ -58,6 +62,15 @@ func (crdt *crdtTree) String() string {
return tree.String()
}

func (crdt *crdtTree) itemIsLive(key string) bool {
latestAddEvent, isAdded := crdt.addEventSet[key]
latestDeleteEvent, isDeleted := crdt.deleteEventSet[key]
if isAdded && (!isDeleted || (isAdded && latestDeleteEvent.before(latestAddEvent))) {
return true
}
return false
}

func (crdt *crdtTree) traverse() <-chan string {
ch := make(chan string)
go func() {
Expand All @@ -71,7 +84,7 @@ func (crdt *crdtTree) traverse() <-chan string {
curChildren = append(curChildren, crdt.cache[k])
}
items = append(curChildren, items...)
if cur.key != crdtRootKey && cur.key != crdtOrphanKey && cur.parent.key != crdtOrphanKey {
if cur.key != crdtRootKey && cur.key != crdtOrphanKey && cur.parent.key != crdtOrphanKey && crdt.itemIsLive(cur.key) {
ch <- cur.key
}
}
Expand Down Expand Up @@ -99,36 +112,6 @@ func (crdt *crdtTree) addToTargetChildArray(item, target *node) {
target.children = newChildren
}

func (crdt *crdtTree) removeFromParentChildArray(item *node) {
newParentChildren := []string{}
for _, c := range item.parent.children {
if c != item.key {
newParentChildren = append(newParentChildren, c)
}
}
item.parent.children = newParentChildren
}

func (crdt *crdtTree) del(e EventLog) {
item, exists := crdt.cache[e.ListItemKey]
if !exists {
return
}
crdt.removeFromParentChildArray(item)

// if the parent is not an orphan child, we move all children of the deleted item
// to the children of the parent (resolving with clocks as we go)
if item.parent.key != crdtOrphanKey {
for _, c := range item.children {
crdt.addToTargetChildArray(crdt.cache[c], item.parent)
}
item.children = []string{}
}

orphan := crdt.cache[crdtOrphanKey]
crdt.addToTargetChildArray(item, orphan)
}

func (crdt *crdtTree) add(e EventLog) {
target, exists := crdt.cache[e.TargetListItemKey]
if !exists {
Expand All @@ -149,13 +132,15 @@ func (crdt *crdtTree) add(e EventLog) {
crdt.cache[e.ListItemKey] = item
} else {
// remove from parent.children if pre-existing
crdt.removeFromParentChildArray(item)
newParentChildren := []string{}
for _, c := range item.parent.children {
if c != item.key {
newParentChildren = append(newParentChildren, c)
}
}
item.parent.children = newParentChildren
}
item.latestVectorClock = e.VectorClock

crdt.addToTargetChildArray(item, target)
}

func (r *DBListRepo) Tree() *crdtTree {
return r.crdt
}

0 comments on commit 1355e37

Please sign in to comment.