Skip to content

Commit

Permalink
fix: fix race condition where an event can be processed before a sess…
Browse files Browse the repository at this point in the history
…ion gets extended (#198)

fix: fix race condition where an event can be processed before a session gets extended
  • Loading branch information
izaaz committed May 30, 2024
1 parent 5c155e2 commit c3627d0
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 13 deletions.
10 changes: 0 additions & 10 deletions android/src/main/java/com/amplitude/android/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import kotlinx.coroutines.launch
open class Amplitude(
configuration: Configuration
) : Amplitude(configuration) {

internal var inForeground = false
private lateinit var androidContextPlugin: AndroidContextPlugin

val sessionId: Long
Expand Down Expand Up @@ -93,21 +91,13 @@ open class Amplitude(
}

fun onEnterForeground(timestamp: Long) {
inForeground = true

if ((configuration as Configuration).optOut) {
return
}

val dummyEnterForegroundEvent = BaseEvent()
dummyEnterForegroundEvent.eventType = DUMMY_ENTER_FOREGROUND_EVENT
dummyEnterForegroundEvent.timestamp = timestamp
timeline.process(dummyEnterForegroundEvent)
}

fun onExitForeground(timestamp: Long) {
inForeground = false

val dummyExitForegroundEvent = BaseEvent()
dummyExitForegroundEvent.eventType = DUMMY_EXIT_FOREGROUND_EVENT
dummyExitForegroundEvent.timestamp = timestamp
Expand Down
8 changes: 5 additions & 3 deletions android/src/main/java/com/amplitude/android/Timeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Timeline(
private val eventMessageChannel: Channel<EventQueueMessage> = Channel(Channel.UNLIMITED)

private val _sessionId = AtomicLong(initialSessionId ?: -1L)
private var _foreground = false
val sessionId: Long
get() {
return _sessionId.get()
Expand Down Expand Up @@ -50,7 +51,7 @@ class Timeline(
incomingEvent.timestamp = System.currentTimeMillis()
}

eventMessageChannel.trySend(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
eventMessageChannel.trySend(EventQueueMessage(incomingEvent))
}

private suspend fun processEventMessage(message: EventQueueMessage) {
Expand All @@ -68,11 +69,13 @@ class Timeline(
} else if (event.eventType == Amplitude.DUMMY_ENTER_FOREGROUND_EVENT) {
skipEvent = true
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
_foreground = true
} else if (event.eventType == Amplitude.DUMMY_EXIT_FOREGROUND_EVENT) {
skipEvent = true
refreshSessionTime(eventTimestamp)
_foreground = false
} else {
if (!message.inForeground) {
if (!_foreground) {
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
} else {
refreshSessionTime(eventTimestamp)
Expand Down Expand Up @@ -181,5 +184,4 @@ class Timeline(

data class EventQueueMessage(
val event: BaseEvent,
val inForeground: Boolean
)
73 changes: 73 additions & 0 deletions android/src/test/java/com/amplitude/android/AmplitudeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import com.amplitude.id.IMIdentityStorageProvider
import io.mockk.every
import io.mockk.mockk
import io.mockk.mockkConstructor
import io.mockk.mockkObject
import io.mockk.mockkStatic
import io.mockk.slot
import io.mockk.spyk
import io.mockk.unmockkObject
import io.mockk.verify
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.StandardTestDispatcher
Expand All @@ -29,6 +31,7 @@ import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import kotlin.concurrent.thread

open class StubPlugin : EventPlugin {
override val type: Plugin.Type = Plugin.Type.Before
Expand Down Expand Up @@ -231,6 +234,76 @@ class AmplitudeTest {
}
}

/**
* Here is what we want to test. In the older version of the SDK, we were unintentionally
* extending a session. Here's how enter foreground was being handled before.
* - Application is entering foreground
* - Amplitude.inForeground flag is set to true.
* - Immediately after this, we expect the dummy foreground event to be processed and create
* a new session if needed.
*
* If another event is fired between 2 and 3 from a different thread, we would unintentionally
* extend the session thinking that the app was in foreground.
*
* The delay between foreground flag being set and the session being initialized was a problem.
*
* We fix this by moving the foreground property inside the timeline. We expect every event
* processed before foreground = true in Timeline to be considered a background fired event.
*
* This test checks for that scenario
*/
@Test
fun amplitude_should_correctly_start_new_session() = runTest {
val testSessionId = 1000L

// Creates a mocked timeline that waits for 500ms before processing the event. This
// is to create an artificial delay
val baseEventParam = slot<BaseEvent>()
val timeline = Timeline(testSessionId)
mockkObject(timeline)
every { timeline.process(incomingEvent = capture(baseEventParam)) } answers {
if (baseEventParam.captured.eventType == Amplitude.DUMMY_ENTER_FOREGROUND_EVENT) {
Thread.sleep(500)
}
callOriginal()
}

amplitude = object : Amplitude(createConfiguration(sessionId = testSessionId, minTimeBetweenSessionsMillis = 50)) {
override fun createTimeline(): Timeline {
timeline.amplitude = this
return timeline
}
}
setDispatcher(testScheduler)

if (amplitude?.isBuilt!!.await()) {
// Fire a foreground event. This is fired using the delayed timeline. The event is
// actually processed after 500ms
val thread1 = thread {
amplitude?.onEnterForeground(1120)
}
Thread.sleep(100)
// Un-mock the object so that there's no delay anymore
unmockkObject(timeline)

// Fire a test event that will be added to the queue before the foreground event.
val thread2 = thread {
val event = BaseEvent()
event.eventType = "test_event"
event.timestamp = 1100L
amplitude?.track(event)
}
thread1.join()
thread2.join()

// Wait for all events to have been processed
advanceUntilIdle()

// test_event should have created a new session and not extended an existing session
Assertions.assertEquals(1100, amplitude?.sessionId)
}
}

companion object {
const val instanceName = "testInstance"
}
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/com/amplitude/core/platform/Timeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ open class Timeline {
lateinit var amplitude: Amplitude

open fun process(incomingEvent: BaseEvent) {
// Note for future reference:
// Checking for opt out within the timeline processing since events can be added to the
// timeline from various sources. For example, the session start and end events are fired
// from within the timeline.
if (amplitude.configuration.optOut) {
return
}

val beforeResult = applyPlugins(Plugin.Type.Before, incomingEvent)
val enrichmentResult = applyPlugins(Plugin.Type.Enrichment, beforeResult)

Expand Down

0 comments on commit c3627d0

Please sign in to comment.