Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] RecoverFromDroppedLoginBug not running in very rare cases #2084

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface IOperationRepo {
*/
fun <T : Operation> containsInstanceOf(type: KClass<T>): Boolean

fun addOperationLoadedListener(handler: IOperationRepoLoadedListener)
suspend fun awaitInitialized()
}

// Extension function so the syntax containsInstanceOf<Operation>() can be used over
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package com.onesignal.core.internal.operations.impl

import com.onesignal.common.events.EventProducer
import com.onesignal.common.events.IEventNotifier
import com.onesignal.common.threading.WaiterWithValue
import com.onesignal.core.internal.config.ConfigModelStore
import com.onesignal.core.internal.operations.ExecutionResult
import com.onesignal.core.internal.operations.GroupComparisonType
import com.onesignal.core.internal.operations.IOperationExecutor
import com.onesignal.core.internal.operations.IOperationRepo
import com.onesignal.core.internal.operations.IOperationRepoLoadedListener
import com.onesignal.core.internal.operations.Operation
import com.onesignal.core.internal.startup.IStartableService
import com.onesignal.core.internal.time.ITime
import com.onesignal.debug.LogLevel
import com.onesignal.debug.internal.logging.Logging
import com.onesignal.user.internal.operations.impl.states.NewRecordsState
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
Expand All @@ -30,7 +28,7 @@ internal class OperationRepo(
private val _configModelStore: ConfigModelStore,
private val _time: ITime,
private val _newRecordState: NewRecordsState,
) : IOperationRepo, IStartableService, IEventNotifier<IOperationRepoLoadedListener> {
) : IOperationRepo, IStartableService {
internal class OperationQueueItem(
val operation: Operation,
val waiter: WaiterWithValue<Boolean>? = null,
Expand All @@ -52,17 +50,10 @@ internal class OperationRepo(
private val waiter = WaiterWithValue<LoopWaiterMessage>()
private var paused = false
private var coroutineScope = CoroutineScope(newSingleThreadContext(name = "OpRepo"))
private val loadedSubscription: EventProducer<IOperationRepoLoadedListener> = EventProducer()
private val initialized = CompletableDeferred<Unit>()

override val hasSubscribers: Boolean
get() = loadedSubscription.hasSubscribers

override fun unsubscribe(handler: IOperationRepoLoadedListener) {
loadedSubscription.unsubscribe(handler)
}

override fun subscribe(handler: IOperationRepoLoadedListener) {
loadedSubscription.subscribe(handler)
override suspend fun awaitInitialized() {
initialized.await()
}

/** *** Buckets ***
Expand Down Expand Up @@ -101,10 +92,6 @@ internal class OperationRepo(
}
}

override fun addOperationLoadedListener(handler: IOperationRepoLoadedListener) {
subscribe(handler)
}

override fun start() {
paused = false
coroutineScope.launch {
Expand Down Expand Up @@ -431,6 +418,6 @@ internal class OperationRepo(
)
if (successful) successfulIndex++
}
loadedSubscription.fire { it.onOperationRepoLoaded() }
initialized.complete(Unit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package com.onesignal.user.internal.migrations
import com.onesignal.common.IDManager
import com.onesignal.core.internal.config.ConfigModelStore
import com.onesignal.core.internal.operations.IOperationRepo
import com.onesignal.core.internal.operations.IOperationRepoLoadedListener
import com.onesignal.core.internal.operations.containsInstanceOf
import com.onesignal.core.internal.startup.IStartableService
import com.onesignal.debug.internal.logging.Logging
import com.onesignal.user.internal.identity.IdentityModelStore
import com.onesignal.user.internal.operations.LoginUserOperation
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch

/**
* Purpose: Automatically recovers a stalled User in the OperationRepo due
Expand All @@ -31,21 +33,20 @@ class RecoverFromDroppedLoginBug(
private val _operationRepo: IOperationRepo,
private val _identityModelStore: IdentityModelStore,
private val _configModelStore: ConfigModelStore,
) : IStartableService, IOperationRepoLoadedListener {
) : IStartableService {
override fun start() {
_operationRepo.addOperationLoadedListener(this)
}

override fun onOperationRepoLoaded() {
if (isInBadState()) {
Logging.warn(
"User with externalId:" +
"${_identityModelStore.model.externalId} " +
"was in a bad state, causing it to not update on OneSignal's " +
"backend! We are recovering and replaying all unsent " +
"operations now.",
)
recoverByAddingBackDroppedLoginOperation()
GlobalScope.launch(Dispatchers.IO) {
_operationRepo.awaitInitialized()
if (isInBadState()) {
Logging.warn(
"User with externalId:" +
"${_identityModelStore.model.externalId} " +
"was in a bad state, causing it to not update on OneSignal's " +
"backend! We are recovering and replaying all unsent " +
"operations now.",
)
recoverByAddingBackDroppedLoginOperation()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.time.withTimeout
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.coroutines.yield
import java.util.UUID
Expand Down Expand Up @@ -599,24 +601,17 @@ class OperationRepoTests : FunSpec({
result shouldBe null
}

test("ensure onOperationRepoLoaded is called once loading is completed") {
test("ensure awaitInitialized() unsuspends") {
// Given
val mocks = Mocks()
val spyListener = spyk<IOperationRepoLoadedListener>()

// When
mocks.operationRepo.addOperationLoadedListener(spyListener)
mocks.operationRepo.start()
// enqueueAndWait used to know we are fully loaded.
mocks.operationRepo.enqueueAndWait(mockOperation())

// Then
mocks.operationRepo.hasSubscribers shouldBe true
coVerifyOrder {
mocks.operationRepo.subscribe(any())
mocks.operationModelStore.loadOperations()
spyListener.onOperationRepoLoaded()
}
withTimeout(1_000) { mocks.operationRepo.awaitInitialized() }
}

test("ensure loadSavedOperations doesn't duplicate existing OperationItems") {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,58 +1,111 @@
package com.onesignal.user.internal.migrations

import com.onesignal.common.threading.Waiter
import com.onesignal.core.internal.config.ConfigModelStore
import com.onesignal.core.internal.operations.IOperationRepoLoadedListener
import com.onesignal.core.internal.operations.impl.OperationModelStore
import com.onesignal.core.internal.operations.impl.OperationRepo
import com.onesignal.core.internal.time.impl.Time
import com.onesignal.debug.LogLevel
import com.onesignal.debug.internal.logging.Logging
import com.onesignal.mocks.MockHelper
import com.onesignal.user.internal.operations.ExecutorMocks
import com.onesignal.user.internal.operations.LoginUserOperation
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import io.mockk.every
import io.mockk.just
import io.mockk.mockk
import io.mockk.runs
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.delay
import kotlinx.coroutines.withTimeout

private class Mocks {
val operationModelStore: OperationModelStore =
run {
val mockOperationModelStore = mockk<OperationModelStore>()
every { mockOperationModelStore.loadOperations() } just runs
every { mockOperationModelStore.list() } returns listOf()
every { mockOperationModelStore.add(any()) } just runs
every { mockOperationModelStore.remove(any()) } just runs
mockOperationModelStore
}
val configModelStore = MockHelper.configModelStore()
val operationRepo =
spyk(
OperationRepo(
listOf(),
operationModelStore,
configModelStore,
Time(),
ExecutorMocks.getNewRecordState(configModelStore),
),
)

var oneSignalId = "local-id"
val identityModelStore by lazy {
MockHelper.identityModelStore {
it.onesignalId = oneSignalId
it.externalId = "myExtId"
}
}
val recovery = spyk(RecoverFromDroppedLoginBug(operationRepo, identityModelStore, configModelStore))

val expectedOperation by lazy {
LoginUserOperation(
configModelStore.model.appId,
identityModelStore.model.onesignalId,
identityModelStore.model.externalId,
null,
)
}

fun verifyExpectedLoginOperation(expectedOp: LoginUserOperation = expectedOperation) {
verify(exactly = 1) {
operationRepo.enqueue(
withArg {
(it is LoginUserOperation) shouldBe true
val op = it as LoginUserOperation
op.appId shouldBe expectedOp.appId
op.externalId shouldBe expectedOp.externalId
op.existingOnesignalId shouldBe expectedOp.existingOnesignalId
op.onesignalId shouldBe expectedOp.onesignalId
},
)
}
}
}

class RecoverFromDroppedLoginBugTests : FunSpec({
test("ensure RecoverFromDroppedLoginBug receive onOperationRepoLoaded callback from operationRepo") {
beforeAny {
Logging.logLevel = LogLevel.NONE
}

test("ensure it adds missing operation") {
// Given
val mockOperationModelStore = mockk<OperationModelStore>()
val mockConfigModelStore = mockk<ConfigModelStore>()
val operationRepo =
spyk(
OperationRepo(
listOf(),
mockOperationModelStore,
mockConfigModelStore,
Time(),
ExecutorMocks.getNewRecordState(mockConfigModelStore),
),
)
every { mockOperationModelStore.loadOperations() } just runs
every { mockOperationModelStore.list() } returns listOf()
val recovery = spyk(RecoverFromDroppedLoginBug(operationRepo, MockHelper.identityModelStore(), mockConfigModelStore))
val mocks = Mocks()

// When
recovery.start()
val waiter = Waiter()
operationRepo.addOperationLoadedListener(
object : IOperationRepoLoadedListener {
override fun onOperationRepoLoaded() {
waiter.wake()
}
},
)
operationRepo.start()
// Waiting here ensures recovery.onOperationRepoLoaded() is called consistently
waiter.waitForWake()
mocks.recovery.start()
mocks.operationRepo.start()
mocks.operationRepo.awaitInitialized()

// Then
verify(exactly = 1) {
operationRepo.subscribe(recovery)
recovery.onOperationRepoLoaded()
}
mocks.verifyExpectedLoginOperation()
}

test("ensure it adds missing operation, even if operationRepo is already initialized") {
// Given
val mocks = Mocks()

// When
mocks.operationRepo.start()
// give operation repo some time to fully initialize
delay(200)

mocks.recovery.start()
withTimeout(1_000) { mocks.operationRepo.awaitInitialized() }

// Then
mocks.verifyExpectedLoginOperation()
}
})
Loading