Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test write only storage stack end-to-end #7111

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/arcs/core/host/ParticleContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class ParticleContext(
// Trigger the StorageProxy sync request for each readable handle. Once
// the StorageEvent.READY notifications have all, been received, we can
// call particle.onReady (handled by notify below).
awaitingReady.forEach { it.maybeInitiateSync() }
awaitingReady.toSet().forEach { it.maybeInitiateSync() }
}

log.debug { "runParticleAsync finished" }
Expand Down
15 changes: 12 additions & 3 deletions java/arcs/core/host/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import arcs.core.entity.Reference
import arcs.core.host.api.HandleHolder
import arcs.core.host.api.Particle
import arcs.core.storage.StorageKey
import arcs.core.storage.StorageKeyProtocol
import arcs.core.storage.referencemode.ReferenceModeStorageKey
import arcs.core.type.Tag
import arcs.flags.BuildFlags
import kotlin.reflect.KClass
Expand Down Expand Up @@ -151,11 +153,18 @@ object NoOpArcHostParticle : Particle {

/**
* Examines all [Plan.HandleConnection]s in a given [Plan.Partition] and returns true if and only if
* every connection with a matching key is both a [Tag.CollectionType] and uses a [HandleMode]
* that cannot read.
* every connection with a matching key is both a [Tag.CollectionType], uses a [HandleMode]
* that cannot read, and is a database backed (as opposed to volatile or ramdisk) key.
*/
fun isWriteOnlyStorageKey(partition: Plan.Partition, key: StorageKey): Boolean =
BuildFlags.WRITE_ONLY_STORAGE_STACK &&
partition.particles.flatMap { it.handles.values }.filter { it.storageKey == key }.all {
!it.mode.canRead && it.type.tag == Tag.Collection
!it.mode.canRead && it.type.tag == Tag.Collection && isDatabaseKey(it.storageKey)
}

/** True iff and key is backed by a database, not a ramdisk or volatile. */
fun isDatabaseKey(key: StorageKey): Boolean = when (key.protocol) {
StorageKeyProtocol.ReferenceMode -> isDatabaseKey((key as ReferenceModeStorageKey).backingKey)
StorageKeyProtocol.Database -> true
else -> false
}
21 changes: 19 additions & 2 deletions java/arcs/core/storage/WriteOnlyStorageProxyImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,29 @@ class WriteOnlyStorageProxyImpl<Data : CrdtData, Op : CrdtOperation, T> private
// WriteOnly proxies do not perform sync requests.
override fun prepareForSync() = Unit

override fun maybeInitiateSync() = Unit
override fun maybeInitiateSync() {
notifyCallback?.let { callback ->
scheduler.schedule(
HandleCallbackTask(callbackId!!, "notify(READY)") {
callback(StorageEvent.READY)
}
)
}
callbackId = null
notifyCallback = null
}

private var callbackId: CallbackIdentifier? = null
private var notifyCallback: ((StorageEvent) -> Unit)? = null

// WriteOnly proxies are immediately ready.
override fun registerForStorageEvents(id: CallbackIdentifier, notify: (StorageEvent) -> Unit) {
checkNotClosed()
notify(StorageEvent.READY)
require(callbackId == null && notifyCallback == null) {
"You can only registerForStorageEvents once."
}
callbackId = id
notifyCallback = notify
}

// WriteOnly proxies can't have errors as there are no events.
Expand Down
25 changes: 23 additions & 2 deletions javatests/arcs/core/host/UtilsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import arcs.core.entity.ForeignReferenceCheckerImpl
import arcs.core.host.api.HandleHolder
import arcs.core.storage.api.DriverAndKeyConfigurator
import arcs.core.storage.driver.RamDisk
import arcs.core.storage.keys.DatabaseStorageKey
import arcs.core.storage.keys.RamDiskStorageKey
import arcs.core.storage.referencemode.ReferenceModeStorageKey
import arcs.core.storage.testutil.testStorageEndpointManager
Expand Down Expand Up @@ -171,9 +172,13 @@ class UtilsTest(private val params: Params) {
}
}

private fun generateHandle(key: String, type: Type) =
private fun generateHandle(key: String, type: Type, db: Boolean = true) =
Plan.Handle(
RamDiskStorageKey(key),
if (!db) RamDiskStorageKey(key)
else ReferenceModeStorageKey(
backingKey = DatabaseStorageKey.Persistent("backing$key", "1234a", dbName = "test"),
storageKey = DatabaseStorageKey.Persistent("entity$key", "1234a", dbName = "test"),
),
type,
emptyList()
)
Expand Down Expand Up @@ -213,6 +218,22 @@ class UtilsTest(private val params: Params) {
assertThat(isWriteOnlyStorageKey(partition, handle.storageKey)).isTrue()
}

@Test
fun isWriteOnlyStorageKey_withOneParticleAndAllWriteOnlyRamDiskConnections_isFalse() {
BuildFlags.WRITE_ONLY_STORAGE_STACK = true

val handle = generateHandle("foo", collectionType, db = false)
val connection = generateConnection(handle, collectionType, HandleMode.Write)
val particle = generateParticle("foo", "bar" to connection)

val partition = Plan.Partition(
"fooId",
"fooHost",
listOf(particle)
)
assertThat(isWriteOnlyStorageKey(partition, handle.storageKey)).isFalse()
}

@Test
fun isWriteOnlyStorageKey_withOneParticleAndOneSingletonConnection_isFalse() {
BuildFlags.WRITE_ONLY_STORAGE_STACK = true
Expand Down
5 changes: 4 additions & 1 deletion javatests/arcs/core/storage/WriteOnlyStorageProxyImplTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,13 @@ class WriteOnlyStorageProxyImplTest {
}

@Test
fun storageEventReadyImmediatelyCalled() = runTest {
fun storageEventReadyCalledAfterMaybeInitiateSync() = runTest {
val proxy = mockProxy()
val callback: () -> Unit = mock()
proxy.registerForStorageEvents(callbackId) { callback() }
proxy.prepareForSync()
proxy.maybeInitiateSync()
scheduler.waitForIdle()
verify(callback).invoke()
}

Expand Down
2 changes: 1 addition & 1 deletion third_party/java/arcs/flags/flags.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ ARCS_BUILD_FLAGS = [
name = "write_only_storage_stack",
desc = "Optimized write-only storage stack.",
bug_id = "b/181723292",
status = "NOT_READY",
status = "LAUNCHED",
stopwords = [
"write.?only.?storage.?stack",
"DatabaseOp",
Expand Down