Skip to content

Commit

Permalink
Merge pull request #2084 from OneSignal/fix/RecoverFromDroppedLoginBu…
Browse files Browse the repository at this point in the history
…gTests-not-running-in-very-rare-cases

[Fix] RecoverFromDroppedLoginBug not running in very rare cases
  • Loading branch information
jkasten2 committed May 10, 2024
2 parents 21eb136 + d6d0f52 commit 50e4c46
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface IOperationRepo {
*/
fun <T : Operation> containsInstanceOf(type: KClass<T>): Boolean

fun addOperationLoadedListener(handler: IOperationRepoLoadedListener)
suspend fun awaitInitialized()
}

// Extension function so the syntax containsInstanceOf<Operation>() can be used over
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package com.onesignal.core.internal.operations.impl

import com.onesignal.common.events.EventProducer
import com.onesignal.common.events.IEventNotifier
import com.onesignal.common.threading.WaiterWithValue
import com.onesignal.core.internal.config.ConfigModelStore
import com.onesignal.core.internal.operations.ExecutionResult
import com.onesignal.core.internal.operations.GroupComparisonType
import com.onesignal.core.internal.operations.IOperationExecutor
import com.onesignal.core.internal.operations.IOperationRepo
import com.onesignal.core.internal.operations.IOperationRepoLoadedListener
import com.onesignal.core.internal.operations.Operation
import com.onesignal.core.internal.startup.IStartableService
import com.onesignal.core.internal.time.ITime
import com.onesignal.debug.LogLevel
import com.onesignal.debug.internal.logging.Logging
import com.onesignal.user.internal.operations.impl.states.NewRecordsState
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
Expand All @@ -30,7 +28,7 @@ internal class OperationRepo(
private val _configModelStore: ConfigModelStore,
private val _time: ITime,
private val _newRecordState: NewRecordsState,
) : IOperationRepo, IStartableService, IEventNotifier<IOperationRepoLoadedListener> {
) : IOperationRepo, IStartableService {
internal class OperationQueueItem(
val operation: Operation,
val waiter: WaiterWithValue<Boolean>? = null,
Expand All @@ -52,17 +50,10 @@ internal class OperationRepo(
private val waiter = WaiterWithValue<LoopWaiterMessage>()
private var paused = false
private var coroutineScope = CoroutineScope(newSingleThreadContext(name = "OpRepo"))
private val loadedSubscription: EventProducer<IOperationRepoLoadedListener> = EventProducer()
private val initialized = CompletableDeferred<Unit>()

override val hasSubscribers: Boolean
get() = loadedSubscription.hasSubscribers

override fun unsubscribe(handler: IOperationRepoLoadedListener) {
loadedSubscription.unsubscribe(handler)
}

override fun subscribe(handler: IOperationRepoLoadedListener) {
loadedSubscription.subscribe(handler)
override suspend fun awaitInitialized() {
initialized.await()
}

/** *** Buckets ***
Expand Down Expand Up @@ -101,10 +92,6 @@ internal class OperationRepo(
}
}

override fun addOperationLoadedListener(handler: IOperationRepoLoadedListener) {
subscribe(handler)
}

override fun start() {
paused = false
coroutineScope.launch {
Expand Down Expand Up @@ -431,6 +418,6 @@ internal class OperationRepo(
)
if (successful) successfulIndex++
}
loadedSubscription.fire { it.onOperationRepoLoaded() }
initialized.complete(Unit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package com.onesignal.user.internal.migrations
import com.onesignal.common.IDManager
import com.onesignal.core.internal.config.ConfigModelStore
import com.onesignal.core.internal.operations.IOperationRepo
import com.onesignal.core.internal.operations.IOperationRepoLoadedListener
import com.onesignal.core.internal.operations.containsInstanceOf
import com.onesignal.core.internal.startup.IStartableService
import com.onesignal.debug.internal.logging.Logging
import com.onesignal.user.internal.identity.IdentityModelStore
import com.onesignal.user.internal.operations.LoginUserOperation
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch

/**
* Purpose: Automatically recovers a stalled User in the OperationRepo due
Expand All @@ -31,21 +33,20 @@ class RecoverFromDroppedLoginBug(
private val _operationRepo: IOperationRepo,
private val _identityModelStore: IdentityModelStore,
private val _configModelStore: ConfigModelStore,
) : IStartableService, IOperationRepoLoadedListener {
) : IStartableService {
override fun start() {
_operationRepo.addOperationLoadedListener(this)
}

override fun onOperationRepoLoaded() {
if (isInBadState()) {
Logging.warn(
"User with externalId:" +
"${_identityModelStore.model.externalId} " +
"was in a bad state, causing it to not update on OneSignal's " +
"backend! We are recovering and replaying all unsent " +
"operations now.",
)
recoverByAddingBackDroppedLoginOperation()
GlobalScope.launch(Dispatchers.IO) {
_operationRepo.awaitInitialized()
if (isInBadState()) {
Logging.warn(
"User with externalId:" +
"${_identityModelStore.model.externalId} " +
"was in a bad state, causing it to not update on OneSignal's " +
"backend! We are recovering and replaying all unsent " +
"operations now.",
)
recoverByAddingBackDroppedLoginOperation()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.time.withTimeout
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.coroutines.yield
import java.util.UUID
Expand Down Expand Up @@ -599,24 +601,17 @@ class OperationRepoTests : FunSpec({
result shouldBe null
}

test("ensure onOperationRepoLoaded is called once loading is completed") {
test("ensure awaitInitialized() unsuspends") {
// Given
val mocks = Mocks()
val spyListener = spyk<IOperationRepoLoadedListener>()

// When
mocks.operationRepo.addOperationLoadedListener(spyListener)
mocks.operationRepo.start()
// enqueueAndWait used to know we are fully loaded.
mocks.operationRepo.enqueueAndWait(mockOperation())

// Then
mocks.operationRepo.hasSubscribers shouldBe true
coVerifyOrder {
mocks.operationRepo.subscribe(any())
mocks.operationModelStore.loadOperations()
spyListener.onOperationRepoLoaded()
}
withTimeout(1_000) { mocks.operationRepo.awaitInitialized() }
}

test("ensure loadSavedOperations doesn't duplicate existing OperationItems") {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,58 +1,111 @@
package com.onesignal.user.internal.migrations

import com.onesignal.common.threading.Waiter
import com.onesignal.core.internal.config.ConfigModelStore
import com.onesignal.core.internal.operations.IOperationRepoLoadedListener
import com.onesignal.core.internal.operations.impl.OperationModelStore
import com.onesignal.core.internal.operations.impl.OperationRepo
import com.onesignal.core.internal.time.impl.Time
import com.onesignal.debug.LogLevel
import com.onesignal.debug.internal.logging.Logging
import com.onesignal.mocks.MockHelper
import com.onesignal.user.internal.operations.ExecutorMocks
import com.onesignal.user.internal.operations.LoginUserOperation
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import io.mockk.every
import io.mockk.just
import io.mockk.mockk
import io.mockk.runs
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.delay
import kotlinx.coroutines.withTimeout

private class Mocks {
val operationModelStore: OperationModelStore =
run {
val mockOperationModelStore = mockk<OperationModelStore>()
every { mockOperationModelStore.loadOperations() } just runs
every { mockOperationModelStore.list() } returns listOf()
every { mockOperationModelStore.add(any()) } just runs
every { mockOperationModelStore.remove(any()) } just runs
mockOperationModelStore
}
val configModelStore = MockHelper.configModelStore()
val operationRepo =
spyk(
OperationRepo(
listOf(),
operationModelStore,
configModelStore,
Time(),
ExecutorMocks.getNewRecordState(configModelStore),
),
)

var oneSignalId = "local-id"
val identityModelStore by lazy {
MockHelper.identityModelStore {
it.onesignalId = oneSignalId
it.externalId = "myExtId"
}
}
val recovery = spyk(RecoverFromDroppedLoginBug(operationRepo, identityModelStore, configModelStore))

val expectedOperation by lazy {
LoginUserOperation(
configModelStore.model.appId,
identityModelStore.model.onesignalId,
identityModelStore.model.externalId,
null,
)
}

fun verifyExpectedLoginOperation(expectedOp: LoginUserOperation = expectedOperation) {
verify(exactly = 1) {
operationRepo.enqueue(
withArg {
(it is LoginUserOperation) shouldBe true
val op = it as LoginUserOperation
op.appId shouldBe expectedOp.appId
op.externalId shouldBe expectedOp.externalId
op.existingOnesignalId shouldBe expectedOp.existingOnesignalId
op.onesignalId shouldBe expectedOp.onesignalId
},
)
}
}
}

class RecoverFromDroppedLoginBugTests : FunSpec({
test("ensure RecoverFromDroppedLoginBug receive onOperationRepoLoaded callback from operationRepo") {
beforeAny {
Logging.logLevel = LogLevel.NONE
}

test("ensure it adds missing operation") {
// Given
val mockOperationModelStore = mockk<OperationModelStore>()
val mockConfigModelStore = mockk<ConfigModelStore>()
val operationRepo =
spyk(
OperationRepo(
listOf(),
mockOperationModelStore,
mockConfigModelStore,
Time(),
ExecutorMocks.getNewRecordState(mockConfigModelStore),
),
)
every { mockOperationModelStore.loadOperations() } just runs
every { mockOperationModelStore.list() } returns listOf()
val recovery = spyk(RecoverFromDroppedLoginBug(operationRepo, MockHelper.identityModelStore(), mockConfigModelStore))
val mocks = Mocks()

// When
recovery.start()
val waiter = Waiter()
operationRepo.addOperationLoadedListener(
object : IOperationRepoLoadedListener {
override fun onOperationRepoLoaded() {
waiter.wake()
}
},
)
operationRepo.start()
// Waiting here ensures recovery.onOperationRepoLoaded() is called consistently
waiter.waitForWake()
mocks.recovery.start()
mocks.operationRepo.start()
mocks.operationRepo.awaitInitialized()

// Then
verify(exactly = 1) {
operationRepo.subscribe(recovery)
recovery.onOperationRepoLoaded()
}
mocks.verifyExpectedLoginOperation()
}

test("ensure it adds missing operation, even if operationRepo is already initialized") {
// Given
val mocks = Mocks()

// When
mocks.operationRepo.start()
// give operation repo some time to fully initialize
delay(200)

mocks.recovery.start()
withTimeout(1_000) { mocks.operationRepo.awaitInitialized() }

// Then
mocks.verifyExpectedLoginOperation()
}
})

0 comments on commit 50e4c46

Please sign in to comment.