Skip to content

Commit

Permalink
Remove lock from OptimisticCache and unmerge it from MemoryCache (#5662)
Browse files Browse the repository at this point in the history
  • Loading branch information
BoD committed Mar 6, 2024
1 parent 98b11d3 commit 72af156
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 354 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package com.apollographql.apollo3.cache.normalized.api

import com.apollographql.apollo3.cache.normalized.api.internal.ConcurrentMap
import com.apollographql.apollo3.cache.normalized.api.internal.Lock
import com.apollographql.apollo3.cache.normalized.api.internal.LruCache
import com.apollographql.apollo3.cache.normalized.api.internal.OptimisticNormalizedCache
import com.apollographql.apollo3.cache.normalized.api.internal.patternToRegex
import com.benasher44.uuid.Uuid
import kotlin.jvm.JvmOverloads
import kotlin.math.max
import kotlin.reflect.KClass

/**
Expand All @@ -23,12 +19,10 @@ class MemoryCache(
private val nextCache: NormalizedCache? = null,
private val maxSizeBytes: Int = Int.MAX_VALUE,
private val expireAfterMillis: Long = -1,
) : OptimisticNormalizedCache {
) : NormalizedCache {
// A lock is only needed if there is a nextCache
private val lock = nextCache?.let { Lock() }

private val recordJournals = ConcurrentMap<String, RecordJournal>()

private fun <T> lockWrite(block: () -> T): T {
return lock?.write { block() } ?: block()
}
Expand All @@ -54,7 +48,7 @@ class MemoryCache(
record ?: nextCache?.loadRecord(key, cacheHeaders)?.also { nextCachedRecord ->
lruCache[key] = nextCachedRecord
}
}.mergeJournalRecord(key)
}

override fun loadRecords(keys: Collection<String>, cacheHeaders: CacheHeaders): Collection<Record> {
return keys.mapNotNull { key -> loadRecord(key, cacheHeaders) }
Expand All @@ -65,7 +59,6 @@ class MemoryCache(
lruCache.clear()
nextCache?.clearAll()
}
recordJournals.clear()
}

override fun remove(cacheKey: CacheKey, cascade: Boolean): Boolean {
Expand All @@ -79,24 +72,10 @@ class MemoryCache(
}

val chainRemoved = nextCache?.remove(cacheKey, cascade) ?: false
val journalRemoved = removeFromJournal(cacheKey, cascade)
record != null || chainRemoved || journalRemoved
record != null || chainRemoved
}
}

private fun removeFromJournal(cacheKey: CacheKey, cascade: Boolean): Boolean {
val recordJournal = recordJournals[cacheKey.key]
if (recordJournal != null) {
recordJournals.remove(cacheKey.key)
if (cascade) {
for (cacheReference in recordJournal.current.referencedFields()) {
removeFromJournal(CacheKey(cacheReference.key), true)
}
}
}
return recordJournal != null
}

override fun remove(pattern: String): Int {
val regex = patternToRegex(pattern)
return lockWrite {
Expand Down Expand Up @@ -141,121 +120,13 @@ class MemoryCache(

override fun dump(): Map<KClass<*>, Map<String, Record>> {
return lockRead {
mapOf(OptimisticNormalizedCache::class to recordJournals.mapValues { (_, journal) -> journal.current }) +
mapOf(this::class to lruCache.asMap().mapValues { (_, record) -> record }) +
mapOf(this::class to lruCache.asMap().mapValues { (_, record) -> record }) +
nextCache?.dump().orEmpty()
}
}

internal fun clearCurrentCache() {
lruCache.clear()
recordJournals.clear()
}

override fun addOptimisticUpdates(recordSet: Collection<Record>): Set<String> {
return recordSet.flatMap {
addOptimisticUpdate(it)
}.toSet()
}

override fun addOptimisticUpdate(record: Record): Set<String> {
val journal = recordJournals[record.key]
return if (journal == null) {
recordJournals[record.key] = RecordJournal(record)
record.fieldKeys()
} else {
journal.addPatch(record)
}
}

override fun removeOptimisticUpdates(mutationId: Uuid): Set<String> {
val changedCacheKeys = mutableSetOf<String>()
val keys = HashSet(recordJournals.keys) // local copy to avoid concurrent modification
keys.forEach {
val recordJournal = recordJournals[it] ?: return@forEach
val result = recordJournal.removePatch(mutationId)
changedCacheKeys.addAll(result.changedKeys)
if (result.isEmpty) {
recordJournals.remove(it)
}
}
return changedCacheKeys
}

private fun Record?.mergeJournalRecord(key: String): Record? {
val journal = recordJournals[key]
return if (journal != null) {
this?.mergeWith(journal.current)?.first ?: journal.current
} else {
this
}
}

private class RemovalResult(
val changedKeys: Set<String>,
val isEmpty: Boolean,
)

private class RecordJournal(record: Record) {
/**
* The latest value of the record made by applying all the patches.
*/
var current: Record = record

/**
* A list of chronological patches applied to the record.
*/
private val patches = mutableListOf(record)

/**
* Adds a new patch on top of all the previous ones.
*/
fun addPatch(record: Record): Set<String> {
val (mergedRecord, changedKeys) = current.mergeWith(record)
current = mergedRecord
patches.add(record)
return changedKeys
}

/**
* Lookup record by mutation id, if it's found removes it from the history and
* computes the new current record.
*/
fun removePatch(mutationId: Uuid): RemovalResult {
val recordIndex = patches.indexOfFirst { mutationId == it.mutationId }
if (recordIndex == -1) {
// The mutation did not impact this Record
return RemovalResult(emptySet(), false)
}

if (patches.size == 1) {
// The mutation impacted this Record and it was the only one in the history
return RemovalResult(current.fieldKeys(), true)
}

/**
* There are multiple patches, go over them and compute the new current value
* Remember the oldRecord so that we can compute the changed keys
*/
val oldRecord = current

patches.removeAt(recordIndex).key

var cur: Record? = null
val start = max(0, recordIndex - 1)
for (i in start until patches.size) {
val record = patches[i]
if (cur == null) {
cur = record
} else {
val (mergedRecord, _) = cur.mergeWith(record)
cur = mergedRecord
}
}
current = cur!!

return RemovalResult(Record.changedKeys(oldRecord, current), false)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,185 @@
package com.apollographql.apollo3.cache.normalized.api.internal

import com.apollographql.apollo3.annotations.ApolloInternal
import com.apollographql.apollo3.cache.normalized.api.CacheHeaders
import com.apollographql.apollo3.cache.normalized.api.CacheKey
import com.apollographql.apollo3.cache.normalized.api.NormalizedCache
import com.apollographql.apollo3.cache.normalized.api.Record
import com.apollographql.apollo3.cache.normalized.api.Record.Companion.changedKeys
import com.apollographql.apollo3.cache.normalized.api.RecordMerger
import com.benasher44.uuid.Uuid
import kotlin.math.max
import kotlin.reflect.KClass

interface OptimisticNormalizedCache : NormalizedCache {
fun addOptimisticUpdates(recordSet: Collection<Record>): Set<String>
@ApolloInternal
class OptimisticNormalizedCache(private val wrapped: NormalizedCache) : NormalizedCache {
private val recordJournals = ConcurrentMap<String, RecordJournal>()

fun addOptimisticUpdate(record: Record): Set<String>
override fun loadRecord(key: String, cacheHeaders: CacheHeaders): Record? {
val nonOptimisticRecord = wrapped.loadRecord(key, cacheHeaders)
return nonOptimisticRecord.mergeJournalRecord(key)
}

fun removeOptimisticUpdates(mutationId: Uuid): Set<String>
override fun loadRecords(keys: Collection<String>, cacheHeaders: CacheHeaders): Collection<Record> {
val nonOptimisticRecords = wrapped.loadRecords(keys, cacheHeaders).associateBy { it.key }
return keys.mapNotNull { key ->
nonOptimisticRecords[key].mergeJournalRecord(key)
}
}

override fun merge(record: Record, cacheHeaders: CacheHeaders, recordMerger: RecordMerger): Set<String> {
return wrapped.merge(record, cacheHeaders, recordMerger)
}

override fun merge(records: Collection<Record>, cacheHeaders: CacheHeaders, recordMerger: RecordMerger): Set<String> {
return wrapped.merge(records, cacheHeaders, recordMerger)
}

override fun clearAll() {
wrapped.clearAll()
recordJournals.clear()
}

override fun remove(cacheKey: CacheKey, cascade: Boolean): Boolean {
var removed = wrapped.remove(cacheKey, cascade)

val recordJournal = recordJournals[cacheKey.key]
if (recordJournal != null) {
recordJournals.remove(cacheKey.key)
removed = true
if (cascade) {
for (cacheReference in recordJournal.current.referencedFields()) {
remove(CacheKey(cacheReference.key), true)
}
}
}
return removed
}

override fun remove(pattern: String): Int {
var removed = wrapped.remove(pattern)

val regex = patternToRegex(pattern)
val keys = HashSet(recordJournals.keys) // local copy to avoid concurrent modification
keys.forEach { key ->
if (regex.matches(key)) {
recordJournals.remove(key)
removed++
}
}

return removed
}

fun addOptimisticUpdates(recordSet: Collection<Record>): Set<String> {
return recordSet.flatMap {
addOptimisticUpdate(it)
}.toSet()
}

fun addOptimisticUpdate(record: Record): Set<String> {
val journal = recordJournals[record.key]
return if (journal == null) {
recordJournals[record.key] = RecordJournal(record)
record.fieldKeys()
} else {
journal.addPatch(record)
}
}

fun removeOptimisticUpdates(mutationId: Uuid): Set<String> {
val changedCacheKeys = mutableSetOf<String>()
val keys = HashSet(recordJournals.keys) // local copy to avoid concurrent modification
keys.forEach {
val recordJournal = recordJournals[it] ?: return@forEach
val result = recordJournal.removePatch(mutationId)
changedCacheKeys.addAll(result.changedKeys)
if (result.isEmpty) {
recordJournals.remove(it)
}
}
return changedCacheKeys
}

override fun dump(): Map<KClass<*>, Map<String, Record>> {
return mapOf(this::class to recordJournals.mapValues { (_, journal) -> journal.current }) + wrapped.dump()
}

private fun Record?.mergeJournalRecord(key: String): Record? {
val journal = recordJournals[key]
return if (journal != null) {
this?.mergeWith(journal.current)?.first ?: journal.current
} else {
this
}
}

private class RemovalResult(
val changedKeys: Set<String>,
val isEmpty: Boolean,
)

private class RecordJournal(record: Record) {
/**
* The latest value of the record made by applying all the patches.
*/
var current: Record = record

/**
* A list of chronological patches applied to the record.
*/
private val patches = mutableListOf(record)

/**
* Adds a new patch on top of all the previous ones.
*/
fun addPatch(record: Record): Set<String> {
val (mergedRecord, changedKeys) = current.mergeWith(record)
current = mergedRecord
patches.add(record)
return changedKeys
}

/**
* Lookup record by mutation id, if it's found removes it from the history and
* computes the new current record.
*
* @return the changed keys or null if
*/
fun removePatch(mutationId: Uuid): RemovalResult {
val recordIndex = patches.indexOfFirst { mutationId == it.mutationId }
if (recordIndex == -1) {
// The mutation did not impact this Record
return RemovalResult(emptySet(), false)
}

if (patches.size == 1) {
// The mutation impacted this Record and it was the only one in the history
return RemovalResult(current.fieldKeys(), true)
}

/**
* There are multiple patches, go over them and compute the new current value
* Remember the oldRecord so that we can compute the changed keys
*/
val oldRecord = current

patches.removeAt(recordIndex).key

var cur: Record? = null
val start = max(0, recordIndex - 1)
for (i in start until patches.size) {
val record = patches[i]
if (cur == null) {
cur = record
} else {
val (mergedRecord, _) = cur.mergeWith(record)
cur = mergedRecord
}
}
current = cur!!

return RemovalResult(changedKeys(oldRecord, current), false)
}
}
}

0 comments on commit 72af156

Please sign in to comment.