diff --git a/firebase-dataconnect/CHANGELOG.md b/firebase-dataconnect/CHANGELOG.md index ef07e21a09d..3fb6b3e9e8f 100644 --- a/firebase-dataconnect/CHANGELOG.md +++ b/firebase-dataconnect/CHANGELOG.md @@ -1,5 +1,7 @@ # Unreleased - +* [changed] Code robustness improvements related to state management in + `FirebaseDataConnect` objects. + ([#6861](https://github.com/firebase/firebase-android-sdk/pull/6861)) # 16.0.0 * [changed] DataConnectOperationException added, enabling support for partial diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index 9ad64cc6054..accd230aff4 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -33,8 +33,6 @@ import com.google.firebase.dataconnect.querymgr.LiveQueries import com.google.firebase.dataconnect.querymgr.LiveQuery import com.google.firebase.dataconnect.querymgr.QueryManager import com.google.firebase.dataconnect.querymgr.RegisteredDataDeserializer -import com.google.firebase.dataconnect.util.NullableReference -import com.google.firebase.dataconnect.util.SuspendingLazy import com.google.firebase.util.nextAlphanumericString import com.google.protobuf.Struct import java.util.concurrent.Executor @@ -54,10 +52,9 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.update import kotlinx.coroutines.flow.updateAndGet import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import kotlinx.serialization.DeserializationStrategy import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.modules.SerializersModule @@ -71,8 +68,8 @@ internal interface FirebaseDataConnectInternal : FirebaseDataConnect { val nonBlockingExecutor: Executor val nonBlockingDispatcher: CoroutineDispatcher - val lazyGrpcClient: SuspendingLazy - val lazyQueryManager: SuspendingLazy + val grpcClient: DataConnectGrpcClient + val queryManager: QueryManager suspend fun awaitAuthReady() suspend fun awaitAppCheckReady() @@ -119,15 +116,6 @@ internal class FirebaseDataConnectImpl( } ) - // Protects `closed`, `grpcClient`, `emulatorSettings`, and `queryManager`. - private val mutex = Mutex() - - // All accesses to this variable _must_ have locked `mutex`. - private var emulatorSettings: EmulatedServiceSettings? = null - - // All accesses to this variable _must_ have locked `mutex`. - private var closed = false - private val dataConnectAuth: DataConnectAuth = DataConnectAuth( deferredAuthProvider = deferredAuthProvider, @@ -152,127 +140,171 @@ internal class FirebaseDataConnectImpl( dataConnectAppCheck.awaitTokenProvider() } - private val lazyGrpcRPCs = - SuspendingLazy(mutex) { - if (closed) throw IllegalStateException("FirebaseDataConnect instance has been closed") + private sealed interface State { + data class New(val emulatorSettings: EmulatedServiceSettings?) : State { + constructor() : this(null) + } + data class Initialized( + val grpcRPCs: DataConnectGrpcRPCs, + val grpcClient: DataConnectGrpcClient, + val queryManager: QueryManager + ) : State + data class Closing(val grpcRPCs: DataConnectGrpcRPCs, val closeJob: Deferred) : State + object Closed : State + } - data class DataConnectBackendInfo( - val host: String, - val sslEnabled: Boolean, - val isEmulator: Boolean - ) - val backendInfoFromSettings = - DataConnectBackendInfo( - host = settings.host, - sslEnabled = settings.sslEnabled, - isEmulator = false - ) - val backendInfoFromEmulatorSettings = - emulatorSettings?.run { - DataConnectBackendInfo(host = "$host:$port", sslEnabled = false, isEmulator = true) - } - val backendInfo = - if (backendInfoFromEmulatorSettings == null) { - backendInfoFromSettings - } else { - if (!settings.isDefaultHost()) { - logger.warn( - "Host has been set in DataConnectSettings and useEmulator, " + - "emulator host will be used." - ) + private val state = MutableStateFlow(State.New()) + + override val grpcClient: DataConnectGrpcClient + get() = initialize().grpcClient + override val queryManager: QueryManager + get() = initialize().queryManager + + private fun initialize(): State.Initialized { + val newState = + state.updateAndGet { currentState -> + when (currentState) { + is State.New -> { + val grpcRPCs = createDataConnectGrpcRPCs(currentState.emulatorSettings) + val grpcClient = createDataConnectGrpcClient(grpcRPCs) + val queryManager = createQueryManager(grpcClient) + State.Initialized(grpcRPCs, grpcClient, queryManager) } - backendInfoFromEmulatorSettings + is State.Initialized -> currentState + is State.Closing -> currentState + is State.Closed -> currentState } + } - logger.debug { "connecting to Data Connect backend: $backendInfo" } - val grpcMetadata = - DataConnectGrpcMetadata.forSystemVersions( - firebaseApp = app, - dataConnectAuth = dataConnectAuth, - dataConnectAppCheck = dataConnectAppCheck, - connectorLocation = config.location, - parentLogger = logger, - ) - val dataConnectGrpcRPCs = - DataConnectGrpcRPCs( - context = context, - host = backendInfo.host, - sslEnabled = backendInfo.sslEnabled, - blockingCoroutineDispatcher = blockingDispatcher, - grpcMetadata = grpcMetadata, - parentLogger = logger, + return when (newState) { + is State.New -> + throw IllegalStateException( + "newState should be Initialized, but got New (error code sh2rf4wwjx)" ) + is State.Initialized -> newState + is State.Closing, + State.Closed -> throw IllegalStateException("FirebaseDataConnect instance has been closed") + } + } - if (backendInfo.isEmulator) { - logEmulatorVersion(dataConnectGrpcRPCs) - streamEmulatorErrors(dataConnectGrpcRPCs) + private fun createDataConnectGrpcRPCs( + emulatorSettings: EmulatedServiceSettings? + ): DataConnectGrpcRPCs { + data class DataConnectBackendInfo( + val host: String, + val sslEnabled: Boolean, + val isEmulator: Boolean + ) + val backendInfoFromSettings = + DataConnectBackendInfo( + host = settings.host, + sslEnabled = settings.sslEnabled, + isEmulator = false + ) + val backendInfoFromEmulatorSettings = + emulatorSettings?.run { + DataConnectBackendInfo(host = "$host:$port", sslEnabled = false, isEmulator = true) + } + val backendInfo = + if (backendInfoFromEmulatorSettings == null) { + backendInfoFromSettings + } else { + if (!settings.isDefaultHost()) { + logger.warn( + "Host has been set in DataConnectSettings and useEmulator, " + + "emulator host will be used." + ) + } + backendInfoFromEmulatorSettings } - dataConnectGrpcRPCs - } - - override val lazyGrpcClient = - SuspendingLazy(mutex) { - DataConnectGrpcClient( - projectId = projectId, - connector = config, - grpcRPCs = lazyGrpcRPCs.getLocked(), + logger.debug { "connecting to Data Connect backend: $backendInfo" } + val grpcMetadata = + DataConnectGrpcMetadata.forSystemVersions( + firebaseApp = app, dataConnectAuth = dataConnectAuth, dataConnectAppCheck = dataConnectAppCheck, - logger = Logger("DataConnectGrpcClient").apply { debug { "created by $instanceId" } }, + connectorLocation = config.location, + parentLogger = logger, + ) + val dataConnectGrpcRPCs = + DataConnectGrpcRPCs( + context = context, + host = backendInfo.host, + sslEnabled = backendInfo.sslEnabled, + blockingCoroutineDispatcher = blockingDispatcher, + grpcMetadata = grpcMetadata, + parentLogger = logger, ) - } - override val lazyQueryManager = - SuspendingLazy(mutex) { - if (closed) throw IllegalStateException("FirebaseDataConnect instance has been closed") - val grpcClient = lazyGrpcClient.getLocked() - - val registeredDataDeserializerFactory = - object : LiveQuery.RegisteredDataDeserializerFactory { - override fun newInstance( - dataDeserializer: DeserializationStrategy, - dataSerializersModule: SerializersModule?, - parentLogger: Logger - ) = - RegisteredDataDeserializer( - dataDeserializer = dataDeserializer, - dataSerializersModule = dataSerializersModule, - blockingCoroutineDispatcher = blockingDispatcher, - parentLogger = parentLogger, - ) - } - val liveQueryFactory = - object : LiveQueries.LiveQueryFactory { - override fun newLiveQuery( - key: LiveQuery.Key, - operationName: String, - variables: Struct, - parentLogger: Logger - ) = - LiveQuery( - key = key, - operationName = operationName, - variables = variables, - parentCoroutineScope = coroutineScope, - nonBlockingCoroutineDispatcher = nonBlockingDispatcher, - grpcClient = grpcClient, - registeredDataDeserializerFactory = registeredDataDeserializerFactory, - parentLogger = parentLogger, - ) - } - val liveQueries = LiveQueries(liveQueryFactory, blockingDispatcher, parentLogger = logger) - QueryManager(liveQueries) + if (backendInfo.isEmulator) { + logEmulatorVersion(dataConnectGrpcRPCs) + streamEmulatorErrors(dataConnectGrpcRPCs) } + return dataConnectGrpcRPCs + } + + private fun createDataConnectGrpcClient(grpcRPCs: DataConnectGrpcRPCs): DataConnectGrpcClient = + DataConnectGrpcClient( + projectId = projectId, + connector = config, + grpcRPCs = grpcRPCs, + dataConnectAuth = dataConnectAuth, + dataConnectAppCheck = dataConnectAppCheck, + logger = Logger("DataConnectGrpcClient").apply { debug { "created by $instanceId" } }, + ) + + private fun createQueryManager(grpcClient: DataConnectGrpcClient): QueryManager { + val registeredDataDeserializerFactory = + object : LiveQuery.RegisteredDataDeserializerFactory { + override fun newInstance( + dataDeserializer: DeserializationStrategy, + dataSerializersModule: SerializersModule?, + parentLogger: Logger + ) = + RegisteredDataDeserializer( + dataDeserializer = dataDeserializer, + dataSerializersModule = dataSerializersModule, + blockingCoroutineDispatcher = blockingDispatcher, + parentLogger = parentLogger, + ) + } + val liveQueryFactory = + object : LiveQueries.LiveQueryFactory { + override fun newLiveQuery( + key: LiveQuery.Key, + operationName: String, + variables: Struct, + parentLogger: Logger + ) = + LiveQuery( + key = key, + operationName = operationName, + variables = variables, + parentCoroutineScope = coroutineScope, + nonBlockingCoroutineDispatcher = nonBlockingDispatcher, + grpcClient = grpcClient, + registeredDataDeserializerFactory = registeredDataDeserializerFactory, + parentLogger = parentLogger, + ) + } + val liveQueries = LiveQueries(liveQueryFactory, blockingDispatcher, parentLogger = logger) + return QueryManager(liveQueries) + } + override fun useEmulator(host: String, port: Int): Unit = runBlocking { - mutex.withLock { - if (lazyGrpcClient.initializedValueOrNull != null) { - throw IllegalStateException( - "Cannot call useEmulator() after instance has already been initialized." - ) + state.update { currentState -> + when (currentState) { + is State.New -> + currentState.copy(emulatorSettings = EmulatedServiceSettings(host = host, port = port)) + is State.Initialized -> + throw IllegalStateException( + "Cannot call useEmulator() after instance has already been initialized." + ) + is State.Closing -> currentState + is State.Closed -> currentState } - emulatorSettings = EmulatedServiceSettings(host = host, port = port) } } @@ -380,19 +412,17 @@ internal class FirebaseDataConnectImpl( ) } - private val closeJob = MutableStateFlow(NullableReference>(null)) - override fun close() { logger.debug { "close() called" } - @Suppress("DeferredResultUnused") runBlocking { nonBlockingClose() } + @Suppress("DeferredResultUnused") closeInternal() } override suspend fun suspendingClose() { logger.debug { "suspendingClose() called" } - nonBlockingClose().await() + closeInternal()?.await() } - private suspend fun nonBlockingClose(): Deferred { + private fun closeInternal(): Deferred? { coroutineScope.cancel() // Remove the reference to this `FirebaseDataConnect` instance from the @@ -400,49 +430,60 @@ internal class FirebaseDataConnectImpl( // called with the same arguments that a new instance of `FirebaseDataConnect` will be created. creator.remove(this) - mutex.withLock { closed = true } - // Close Auth and AppCheck synchronously to avoid race conditions with auth callbacks. // Since close() is re-entrant, this is safe even if they have already been closed. dataConnectAuth.close() dataConnectAppCheck.close() - // Create the "close job" to asynchronously close the gRPC client. - @OptIn(DelicateCoroutinesApi::class) - val newCloseJob = - GlobalScope.async(start = CoroutineStart.LAZY) { - lazyGrpcRPCs.initializedValueOrNull?.close() - } - newCloseJob.invokeOnCompletion { exception -> - if (exception === null) { - logger.debug { "close() completed successfully" } - } else { - logger.warn(exception) { "close() failed" } - } - } - - // Register the new "close job". Do not overwrite a close job that is already in progress (to - // avoid having more than one close job in progress at a time) or a close job that completed - // successfully (since there is nothing to do if a previous close job was successful). - val updatedCloseJobRef = - closeJob.updateAndGet { currentCloseJobRef: NullableReference> -> - if (currentCloseJobRef.ref !== null && !currentCloseJobRef.ref.isCancelled) { - currentCloseJobRef + fun createCloseJob(grpcRPCs: DataConnectGrpcRPCs): Deferred { + @OptIn(DelicateCoroutinesApi::class) + val closeJob = GlobalScope.async(start = CoroutineStart.LAZY) { grpcRPCs.close() } + closeJob.invokeOnCompletion { exception -> + if (exception !== null) { + logger.warn(exception) { "close() failed" } } else { - NullableReference(newCloseJob) + logger.debug { "close() completed successfully" } + state.update { currentState -> + check(currentState is State.Closing) { + "currentState is ${currentState}, but expected Closing (error code hsee7gfxvz)" + } + check(currentState.closeJob === closeJob) { + "currentState.closeJob is ${currentState.closeJob}, but expected $closeJob " + + "(error code n3x86pr6qn)" + } + State.Closed + } } } + return closeJob + } - // Start the updated "close job" (if it was already started then start() is a no-op). - val updatedCloseJob = - checkNotNull(updatedCloseJobRef.ref) { - "internal error: closeJob.updateAndGet() returned a NullableReference whose 'ref' " + - "property was null; however it should NOT have been null (error code y5fk4ntdnd)" + val newState = + state.updateAndGet { currentState -> + when (currentState) { + is State.New -> State.Closed + is State.Initialized -> + State.Closing(currentState.grpcRPCs, createCloseJob(currentState.grpcRPCs)) + is State.Closing -> + if (currentState.closeJob.isCancelled) { + currentState.copy(closeJob = createCloseJob(currentState.grpcRPCs)) + } else { + currentState + } + is State.Closed -> State.Closed + } } - updatedCloseJob.start() - // Return the "close job", which _may_ already be completed, so the caller can await it. - return updatedCloseJob + return when (newState) { + is State.Initialized, + is State.New -> + throw IllegalStateException( + "internal error: newState is $newState, but expected Closing or Closed " + + "(error code n3x86pr6qn)" + ) + is State.Closing -> newState.closeJob.apply { start() } + is State.Closed -> null + } } // The generated SDK relies on equals() and hashCode() using object identity. diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/MutationRefImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/MutationRefImpl.kt index edd978a2ec9..8e5684c2fe6 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/MutationRefImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/MutationRefImpl.kt @@ -60,8 +60,7 @@ internal class MutationRefImpl( override suspend fun execute(): MutationResultImpl { val requestId = "mut" + Random.nextAlphanumericString(length = 10) - return dataConnect.lazyGrpcClient - .get() + return dataConnect.grpcClient .executeMutation( requestId = requestId, operationName = operationName, diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QueryRefImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QueryRefImpl.kt index 53e247e3d14..1b630a12cfd 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QueryRefImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QueryRefImpl.kt @@ -49,7 +49,7 @@ internal class QueryRefImpl( variablesSerializersModule = variablesSerializersModule, ) { override suspend fun execute(): QueryResultImpl = - dataConnect.lazyQueryManager.get().execute(this).let { QueryResultImpl(it.ref.getOrThrow()) } + dataConnect.queryManager.execute(this).let { QueryResultImpl(it.ref.getOrThrow()) } override fun subscribe(): QuerySubscription = QuerySubscriptionImpl(this) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt index 2ca9aea6771..7ec870e93ce 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt @@ -58,7 +58,7 @@ internal class QuerySubscriptionImpl(query: QueryRefImpl val querySubscriptionResult = QuerySubscriptionResultImpl(query, sequencedResult) send(querySubscriptionResult) @@ -70,7 +70,7 @@ internal class QuerySubscriptionImpl(query: QueryRefImpl(relaxed = true) { every { blockingDispatcher } returns UnconfinedTestDispatcher(testScheduler) - every { lazyGrpcClient } returns - SuspendingLazy { - mockk { - coEvery { - executeMutation( - capture(requestIdSlot), - capture(operationNameSlot), - capture(variablesSlot), - capture(callerSdkTypeSlot), - ) - } returns result.getOrThrow() - } + every { grpcClient } returns + mockk { + coEvery { + executeMutation( + capture(requestIdSlot), + capture(operationNameSlot), + capture(variablesSlot), + capture(callerSdkTypeSlot), + ) + } returns result.getOrThrow() } } } diff --git a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/QueryRefImplUnitTest.kt b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/QueryRefImplUnitTest.kt index fc6baffe601..904779cb4ce 100644 --- a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/QueryRefImplUnitTest.kt +++ b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/QueryRefImplUnitTest.kt @@ -29,7 +29,6 @@ import com.google.firebase.dataconnect.testutil.property.arbitrary.queryRefImpl import com.google.firebase.dataconnect.testutil.property.arbitrary.shouldHavePropertiesEqualTo import com.google.firebase.dataconnect.testutil.shouldContainWithNonAbuttingText import com.google.firebase.dataconnect.util.SequencedReference -import com.google.firebase.dataconnect.util.SuspendingLazy import io.kotest.assertions.assertSoftly import io.kotest.assertions.throwables.shouldThrow import io.kotest.assertions.withClue @@ -577,11 +576,9 @@ class QueryRefImplUnitTest { querySlot: CapturingSlot> ): FirebaseDataConnectInternal = mockk(relaxed = true) { - every { lazyQueryManager } returns - SuspendingLazy { - mockk { - coEvery { execute(capture(querySlot)) } returns SequencedReference(123, result) - } + every { queryManager } returns + mockk { + coEvery { execute(capture(querySlot)) } returns SequencedReference(123, result) } } }