Skip to content

Commit

Permalink
Started implementing orphan cache btree with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Sambigeara committed Oct 1, 2022
1 parent 08a3d55 commit 17fec2e
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 46 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect
github.com/gdamore/encoding v1.0.0 // indirect
github.com/gin-gonic/gin v1.7.0 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a // indirect
github.com/klauspost/compress v1.10.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/E
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
Expand Down
105 changes: 66 additions & 39 deletions pkg/service/crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"
"time"

"github.com/google/btree"
"nhooyr.io/websocket"
)

Expand Down Expand Up @@ -407,7 +408,7 @@ func (r *DBListRepo) repositionActiveFriends(e EventLog) EventLog {
// - `@foo@bar.com` is appended to the Line() portion of the string, because the friend is no longer present in
// the r.friends cache
var existingFriends []string
if item, exists := r.listItemTracker[e.ListItemKey]; exists {
if item, exists := r.listItemCache[e.ListItemKey]; exists {
existingFriends = item.friends.Emails
}
// If there are no friends, return early
Expand Down Expand Up @@ -537,45 +538,69 @@ func (r *DBListRepo) generateFriendChangeEvents(e EventLog, item *ListItem) {
}()
}

func (e *EventLog) isEqualOrHappenedAfter(a EventLog) bool {
func (a *EventLog) happenedBefore(b EventLog) bool {
// for one event to have happened before another event, all the elements of its vector need to be
// smaller or equal to the matching elements in the other vector.
// therefore, if _any_ of the cached event counters are <= the remote counterpart, we need
// to process the event. Or in other words, if all of the cached event counters are greater than
// the counterpart, we can skip the event
for id, remoteDT := range a.VectorClock {
if cachedDT, cachedExists := e.VectorClock[id]; cachedExists && cachedDT < remoteDT {

isEqual := len(a.VectorClock) == len(b.VectorClock)

for id, aDT := range a.VectorClock {
if bDT, aCachedExists := b.VectorClock[id]; !aCachedExists || aDT > bDT {
return false
} else if isEqual && aDT < bDT {
isEqual = false
}
}

if isEqual {
return false
}

return true
}

func (r *DBListRepo) getOrCreateListItem(key string, isDeleted bool) *ListItem {
func (r *DBListRepo) getOrCreateListItem(key string) *ListItem {
if key == "" {
return nil
}

item, exists := r.listItemTracker[key]
item, exists := r.listItemCache[key]
if exists {
return item
}
item = &ListItem{
key: key,
isDeleted: isDeleted,
key: key,
}
r.listItemTracker[key] = item
r.listItemCache[key] = item
return item
}

func (r *DBListRepo) processEventLog(e EventLog) (*ListItem, error) {
item := r.getOrCreateListItem(e.ListItemKey, e.EventType == DeleteEvent)
childItem := r.getOrCreateListItem(e.TargetListItemKey, false)
// TODO remove this
if childItem != nil && !childItem.isAppliedToList {
childItem = nil
type orphanCacheNode struct {
event EventLog
}

func (a orphanCacheNode) Less(bItem btree.Item) bool {
b, _ := bItem.(orphanCacheNode)
if a.event.TargetListItemKey == b.event.TargetListItemKey {
return a.event.happenedBefore(b.event)
}

// items placed at the top of the list (e.g. with an empty target) take precedence
if a.event.TargetListItemKey == "" {
return true
} else if b.event.TargetListItemKey == "" {
return false
}

return a.event.TargetListItemKey < b.event.TargetListItemKey
}

func (r *DBListRepo) processEventLog(e EventLog) (*ListItem, error) {
item := r.getOrCreateListItem(e.ListItemKey)
childItem := r.getOrCreateListItem(e.TargetListItemKey)

// Each event is either `content`+`positional`, `positional` or `delete`:
// - AddEvent/UpdateEvent: constituting both content (Line, Note, Visibility etc) AND positional updates
// - PositionEvent
Expand All @@ -588,7 +613,7 @@ func (r *DBListRepo) processEventLog(e EventLog) (*ListItem, error) {
// Check the event cache and skip if the event is older than the most-recently processed
if eventTypeCache, exists := r.listItemProcessedEventLogTypeCache[e.EventType]; exists {
if ce, exists := eventTypeCache[e.ListItemKey]; exists {
if ce.isEqualOrHappenedAfter(e) {
if e.happenedBefore(ce) {
return item, nil
}
}
Expand All @@ -598,18 +623,20 @@ func (r *DBListRepo) processEventLog(e EventLog) (*ListItem, error) {

// Check to see if the item has been deleted. If it has, we return early if the event is of type PositionEvent or
// if the DeleteEvent occurred after the UpdateEvent
if item.isDeleted {
if e.EventType == PositionEvent {
return item, nil
}
if eventTypeCache, exists := r.listItemProcessedEventLogTypeCache[DeleteEvent]; exists {
if ce, exists := eventTypeCache[e.ListItemKey]; exists {
if ce.isEqualOrHappenedAfter(e) {
return item, nil
}
}
}
}
// TODO
//if item.isDeleted {
// if e.EventType == PositionEvent {
// return item, nil
// }
// if eventTypeCache, exists := r.listItemProcessedEventLogTypeCache[DeleteEvent]; exists {
// if ce, exists := eventTypeCache[e.ListItemKey]; exists {
// if e.happenedBefore(ce) {
// //if ce.isEqualOrHappenedAfter(e) {
// return item, nil
// }
// }
// }
//}

// Add the event to the cache AFTER the deletion check above, otherwise deleted events will be added then immediately
// return early as the event will be equal to itself in the next check
Expand Down Expand Up @@ -646,7 +673,7 @@ func (r *DBListRepo) processEventLog(e EventLog) (*ListItem, error) {
item, err = r.insert(item, childItem)
}

r.listItemTracker[e.ListItemKey] = item
r.listItemCache[e.ListItemKey] = item

return item, err
}
Expand Down Expand Up @@ -678,16 +705,16 @@ func (r *DBListRepo) update(item *ListItem, e EventLog) error {
item.friends.Emails = emails

// Just in case an Update occurs on a Deleted item (distributed race conditions)
item.isDeleted = false
//item.isDeleted = false

return nil
}

func (r *DBListRepo) del(item *ListItem) error {
item.isDeleted = true
//item.isDeleted = true
r.remove(item)
// isAppliedToList is used in r.remove, so set afterwards
item.isAppliedToList = false
//item.isAppliedToList = false
return nil
}

Expand All @@ -696,8 +723,8 @@ func (r *DBListRepo) remove(item *ListItem) error {
oldParent := item.parent
if item.child != nil {
item.child.parent = oldParent
} else if item.isAppliedToList {
r.Root = oldParent
//} else if item.isAppliedToList {
// r.Root = oldParent
}
if item.parent != nil {
item.parent.child = oldChild
Expand All @@ -711,9 +738,9 @@ func (r *DBListRepo) insert(item, targetChild *ListItem) (*ListItem, error) {
// if we don't return early, we end up with an endless loop when setting targetParent
// tp targetChild.parent below
//if item.child != nil && item.child == targetChild && item.isAppliedToList {
if item.child == targetChild && item.isAppliedToList {
return item, nil
}
//if item.child == targetChild && item.isAppliedToList {
// return item, nil
//}

var targetParent *ListItem
if targetChild != nil {
Expand All @@ -723,7 +750,7 @@ func (r *DBListRepo) insert(item, targetChild *ListItem) (*ListItem, error) {
}

r.remove(item)
item.isAppliedToList = true
//item.isAppliedToList = true

// Insert item into new position
if targetChild != nil {
Expand Down
Loading

0 comments on commit 17fec2e

Please sign in to comment.