Skip to content

Commit

Permalink
Discard Guard, instead use a Mutex directly in DefaultApolloStore
Browse files Browse the repository at this point in the history
  • Loading branch information
BoD committed Jul 25, 2022
1 parent 460bba3 commit 1de5b75
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 172 deletions.

This file was deleted.

@@ -0,0 +1,13 @@
package com.apollographql.apollo3.cache.normalized.internal

import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal actual fun runBlockingWithMutex(mutex: Mutex, block: () -> Unit) {
runBlocking {
mutex.withLock {
block()
}
}
}
Expand Up @@ -18,6 +18,8 @@ import com.benasher44.uuid.Uuid
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.reflect.KClass

internal class DefaultApolloStore(
Expand All @@ -30,15 +32,12 @@ internal class DefaultApolloStore(
// If multiple watchers start notifying each other and potentially themselves, the buffer of changedKeysEvent will grow forever.
// I think as long as the refetchPolicy is [FetchPolicy.CacheOnly] everything should be fine as there is no reentrant emission.
// If the refetechPolicy is something else, we should certainly try to detect it in the cache interceptor
extraBufferCapacity = 10,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
extraBufferCapacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)

override val changedKeys = changedKeysEvents.asSharedFlow()

private val cacheHolder = Guard("OptimisticCache") {
OptimisticCache().chain(normalizedCacheFactory.createChain()) as OptimisticCache
}
private val cache = OptimisticCache().chain(normalizedCacheFactory.createChain()) as OptimisticCache
private val cacheMutex = Mutex()

override suspend fun publish(keys: Set<String>) {
if (keys.isEmpty()) {
Expand All @@ -49,8 +48,8 @@ internal class DefaultApolloStore(
}

override fun clearAll(): Boolean {
cacheHolder.writeAndForget {
it.clearAll()
runBlockingWithMutex(cacheMutex) {
cache.clearAll()
}
return true
}
Expand All @@ -59,7 +58,7 @@ internal class DefaultApolloStore(
cacheKey: CacheKey,
cascade: Boolean,
): Boolean {
return cacheHolder.writeAccess {
return accessCache {
it.remove(cacheKey, cascade)
}
}
Expand All @@ -68,7 +67,7 @@ internal class DefaultApolloStore(
cacheKeys: List<CacheKey>,
cascade: Boolean,
): Int {
return cacheHolder.writeAccess {
return accessCache {
var count = 0
for (cacheKey in cacheKeys) {
if (it.remove(cacheKey, cascade = cascade)) {
Expand All @@ -84,19 +83,15 @@ internal class DefaultApolloStore(
data: D,
customScalarAdapters: CustomScalarAdapters,
): Map<String, Record> {
return operation.normalize(
data,
customScalarAdapters,
cacheKeyGenerator
)
return operation.normalize(data, customScalarAdapters, cacheKeyGenerator)
}

override suspend fun <D : Operation.Data> readOperation(
operation: Operation<D>,
customScalarAdapters: CustomScalarAdapters,
cacheHeaders: CacheHeaders,
): D {
return cacheHolder.readAccess { cache ->
return accessCache { cache ->
operation.readDataFromCache(
customScalarAdapters = customScalarAdapters,
cache = cache,
Expand All @@ -112,22 +107,13 @@ internal class DefaultApolloStore(
customScalarAdapters: CustomScalarAdapters,
cacheHeaders: CacheHeaders,
): D {
return cacheHolder.readAccess { cache ->
fragment.readDataFromCache(
customScalarAdapters = customScalarAdapters,
cache = cache,
cacheResolver = cacheResolver,
cacheHeaders = cacheHeaders,
cacheKey = cacheKey
)
return accessCache { cache ->
fragment.readDataFromCache(customScalarAdapters = customScalarAdapters, cache = cache, cacheResolver = cacheResolver, cacheHeaders = cacheHeaders, cacheKey = cacheKey)
}
}

override suspend fun <R> accessCache(block: (NormalizedCache) -> R): R {
/**
* We don't know how the cache is going to be used, assume write access
*/
return cacheHolder.writeAccess(block)
return cacheMutex.withLock { block(cache) }
}

override suspend fun <D : Operation.Data> writeOperation(
Expand All @@ -137,13 +123,7 @@ internal class DefaultApolloStore(
cacheHeaders: CacheHeaders,
publish: Boolean,
): Set<String> {
return writeOperationWithRecords(
operation = operation,
operationData = operationData,
cacheHeaders = cacheHeaders,
publish = publish,
customScalarAdapters = customScalarAdapters
).second
return writeOperationWithRecords(operation = operation, operationData = operationData, cacheHeaders = cacheHeaders, publish = publish, customScalarAdapters = customScalarAdapters).second
}

override suspend fun <D : Fragment.Data> writeFragment(
Expand All @@ -154,13 +134,8 @@ internal class DefaultApolloStore(
cacheHeaders: CacheHeaders,
publish: Boolean,
): Set<String> {
val changedKeys = cacheHolder.writeAccess { cache ->
val records = fragment.normalize(
data = fragmentData,
customScalarAdapters = customScalarAdapters,
cacheKeyGenerator = cacheKeyGenerator,
rootKey = cacheKey.key
).values
val changedKeys = accessCache { cache ->
val records = fragment.normalize(data = fragmentData, customScalarAdapters = customScalarAdapters, cacheKeyGenerator = cacheKeyGenerator, rootKey = cacheKey.key).values

cache.merge(records, cacheHeaders)
}
Expand All @@ -179,12 +154,8 @@ internal class DefaultApolloStore(
publish: Boolean,
customScalarAdapters: CustomScalarAdapters,
): Pair<Set<Record>, Set<String>> {
val (records, changedKeys) = cacheHolder.writeAccess { cache ->
val records = operation.normalize(
data = operationData,
customScalarAdapters = customScalarAdapters,
cacheKeyGenerator = cacheKeyGenerator
)
val (records, changedKeys) = accessCache { cache ->
val records = operation.normalize(data = operationData, customScalarAdapters = customScalarAdapters, cacheKeyGenerator = cacheKeyGenerator)

records to cache.merge(records.values.toList(), cacheHeaders)
}
Expand All @@ -203,23 +174,15 @@ internal class DefaultApolloStore(
customScalarAdapters: CustomScalarAdapters,
publish: Boolean,
): Set<String> {
val changedKeys = cacheHolder.writeAccess { cache ->
val records = operation.normalize(
data = operationData,
customScalarAdapters = customScalarAdapters,
cacheKeyGenerator = cacheKeyGenerator
).values.map { record ->
Record(
key = record.key,
fields = record.fields,
mutationId = mutationId
)
val changedKeys = accessCache { cache ->
val records = operation.normalize(data = operationData, customScalarAdapters = customScalarAdapters, cacheKeyGenerator = cacheKeyGenerator).values.map { record ->
Record(key = record.key, fields = record.fields, mutationId = mutationId)
}

/**
* TODO: should we forward the cache headers to the optimistic store?
*/
cache.addOptimisticUpdates(records)
(cache as OptimisticCache).addOptimisticUpdates(records)
}

if (publish) {
Expand All @@ -233,8 +196,8 @@ internal class DefaultApolloStore(
mutationId: Uuid,
publish: Boolean,
): Set<String> {
val changedKeys = cacheHolder.writeAccess { cache ->
cache.removeOptimisticUpdates(mutationId)
val changedKeys = accessCache { cache ->
(cache as OptimisticCache).removeOptimisticUpdates(mutationId)
}

if (publish) {
Expand All @@ -245,19 +208,16 @@ internal class DefaultApolloStore(
}

suspend fun merge(record: Record, cacheHeaders: CacheHeaders): Set<String> {
return cacheHolder.writeAccess { cache ->
return accessCache { cache ->
cache.merge(record, cacheHeaders)
}
}

override suspend fun dump(): Map<KClass<*>, Map<String, Record>> {
return cacheHolder.readAccess { cache ->
return accessCache { cache ->
cache.dump()
}
}

override fun dispose() {
cacheHolder.dispose()
}
override fun dispose() {}
}

This file was deleted.

@@ -0,0 +1,5 @@
package com.apollographql.apollo3.cache.normalized.internal

import kotlinx.coroutines.sync.Mutex

internal expect fun runBlockingWithMutex(mutex: Mutex, block: () -> Unit)

This file was deleted.

@@ -0,0 +1,5 @@
package com.apollographql.apollo3.cache.normalized.internal

import kotlinx.coroutines.sync.Mutex

internal actual fun runBlockingWithMutex(mutex: Mutex, block: () -> Unit) = block()

This file was deleted.

@@ -0,0 +1,13 @@
package com.apollographql.apollo3.cache.normalized.internal

import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal actual fun runBlockingWithMutex(mutex: Mutex, block: () -> Unit) {
runBlocking {
mutex.withLock {
block()
}
}
}

0 comments on commit 1de5b75

Please sign in to comment.