Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUM-3670 backpressure strategy #1938

Merged
merged 4 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions dd-sdk-android-core/api/apiSurface
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ class com.datadog.android.core.SdkReference
constructor(String? = null, (com.datadog.android.api.SdkCore) -> Unit = {})
fun get(): com.datadog.android.api.SdkCore?
fun <T> allowThreadDiskReads(() -> T): T
enum com.datadog.android.core.configuration.BackPressureMitigation
- DROP_OLDEST
- IGNORE_NEWEST
data class com.datadog.android.core.configuration.BackPressureStrategy
constructor(Int, () -> Unit, (Any) -> Unit, BackPressureMitigation)
enum com.datadog.android.core.configuration.BatchProcessingLevel
constructor(Int)
- LOW
Expand All @@ -184,6 +189,7 @@ data class com.datadog.android.core.configuration.Configuration
fun setEncryption(com.datadog.android.security.Encryption): Builder
fun setPersistenceStrategyFactory(com.datadog.android.core.persistence.PersistenceStrategy.Factory?): Builder
fun setCrashReportsEnabled(Boolean): Builder
fun setBackpressureStrategy(BackPressureStrategy): Builder
companion object
class com.datadog.android.core.configuration.HostsSanitizer
fun sanitizeHosts(List<String>, String): List<String>
Expand Down Expand Up @@ -279,6 +285,10 @@ class com.datadog.android.core.sampling.RateBasedSampler : Sampler
interface com.datadog.android.core.sampling.Sampler
fun sample(): Boolean
fun getSampleRate(): Float?
interface com.datadog.android.core.thread.FlushableExecutorService : java.util.concurrent.ExecutorService
fun drainTo(MutableCollection<Runnable>)
interface Factory
fun create(com.datadog.android.api.InternalLogger, com.datadog.android.core.configuration.BackPressureStrategy): FlushableExecutorService
interface com.datadog.android.event.EventMapper<T: Any>
fun map(T): T?
class com.datadog.android.event.MapperSerializer<T: Any> : com.datadog.android.core.persistence.Serializer<T>
Expand Down
33 changes: 33 additions & 0 deletions dd-sdk-android-core/api/dd-sdk-android-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,30 @@ public final class com/datadog/android/core/StrictModeExtKt {
public static final fun allowThreadDiskReads (Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
}

public final class com/datadog/android/core/configuration/BackPressureMitigation : java/lang/Enum {
public static final field DROP_OLDEST Lcom/datadog/android/core/configuration/BackPressureMitigation;
public static final field IGNORE_NEWEST Lcom/datadog/android/core/configuration/BackPressureMitigation;
public static fun valueOf (Ljava/lang/String;)Lcom/datadog/android/core/configuration/BackPressureMitigation;
public static fun values ()[Lcom/datadog/android/core/configuration/BackPressureMitigation;
}

public final class com/datadog/android/core/configuration/BackPressureStrategy {
public fun <init> (ILkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;Lcom/datadog/android/core/configuration/BackPressureMitigation;)V
public final fun component1 ()I
public final fun component2 ()Lkotlin/jvm/functions/Function0;
public final fun component3 ()Lkotlin/jvm/functions/Function1;
public final fun component4 ()Lcom/datadog/android/core/configuration/BackPressureMitigation;
public final fun copy (ILkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;Lcom/datadog/android/core/configuration/BackPressureMitigation;)Lcom/datadog/android/core/configuration/BackPressureStrategy;
public static synthetic fun copy$default (Lcom/datadog/android/core/configuration/BackPressureStrategy;ILkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;Lcom/datadog/android/core/configuration/BackPressureMitigation;ILjava/lang/Object;)Lcom/datadog/android/core/configuration/BackPressureStrategy;
public fun equals (Ljava/lang/Object;)Z
public final fun getBackpressureMitigation ()Lcom/datadog/android/core/configuration/BackPressureMitigation;
public final fun getCapacity ()I
public final fun getOnItemDropped ()Lkotlin/jvm/functions/Function1;
public final fun getOnThresholdReached ()Lkotlin/jvm/functions/Function0;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class com/datadog/android/core/configuration/BatchProcessingLevel : java/lang/Enum {
public static final field HIGH Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public static final field LOW Lcom/datadog/android/core/configuration/BatchProcessingLevel;
Expand Down Expand Up @@ -492,6 +516,7 @@ public final class com/datadog/android/core/configuration/Configuration$Builder
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun build ()Lcom/datadog/android/core/configuration/Configuration;
public final fun setAdditionalConfiguration (Ljava/util/Map;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setBackpressureStrategy (Lcom/datadog/android/core/configuration/BackPressureStrategy;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setBatchProcessingLevel (Lcom/datadog/android/core/configuration/BatchProcessingLevel;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setBatchSize (Lcom/datadog/android/core/configuration/BatchSize;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setCrashReportsEnabled (Z)Lcom/datadog/android/core/configuration/Configuration$Builder;
Expand Down Expand Up @@ -735,6 +760,14 @@ public abstract interface class com/datadog/android/core/sampling/Sampler {
public abstract fun sample ()Z
}

public abstract interface class com/datadog/android/core/thread/FlushableExecutorService : java/util/concurrent/ExecutorService {
public abstract fun drainTo (Ljava/util/Collection;)V
}

public abstract interface class com/datadog/android/core/thread/FlushableExecutorService$Factory {
public abstract fun create (Lcom/datadog/android/api/InternalLogger;Lcom/datadog/android/core/configuration/BackPressureStrategy;)Lcom/datadog/android/core/thread/FlushableExecutorService;
}

public abstract interface class com/datadog/android/event/EventMapper {
public abstract fun map (Ljava/lang/Object;)Ljava/lang/Object;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.configuration

/**
* Defines the mitigation to use when a queue hits the maximum back pressure capacity.
*/
enum class BackPressureMitigation {

/** Drop the oldest items already in the queue to make room for new ones. */
DROP_OLDEST,
0xnm marked this conversation as resolved.
Show resolved Hide resolved

/** Ignore newest items that are not yet in the queue. */
IGNORE_NEWEST
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.configuration

/**
0xnm marked this conversation as resolved.
Show resolved Hide resolved
* @param capacity the maximum size of the queue
* @param onThresholdReached callback called when the queue reaches full capacity
* @param onItemDropped called when an item is dropped because of this backpressure strategy
* @param backpressureMitigation the mitigation to use when reaching the capacity
*/
data class BackPressureStrategy(
val capacity: Int,
val onThresholdReached: () -> Unit,
val onItemDropped: (Any) -> Unit,
val backpressureMitigation: BackPressureMitigation
)
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ internal constructor(
val encryption: Encryption?,
val site: DatadogSite,
val batchProcessingLevel: BatchProcessingLevel,
val persistenceStrategyFactory: PersistenceStrategy.Factory?
val persistenceStrategyFactory: PersistenceStrategy.Factory?,
val backpressureStrategy: BackPressureStrategy
)

// region Builder
Expand Down Expand Up @@ -70,6 +71,7 @@ internal constructor(

private var coreConfig = DEFAULT_CORE_CONFIG
private var crashReportsEnabled: Boolean = true
private var backpressureStrategy: BackPressureStrategy = DEFAULT_BACKPRESSURE_STRATEGY

internal var hostsSanitizer = HostsSanitizer()

Expand Down Expand Up @@ -236,7 +238,7 @@ internal constructor(
}

/**
* Allows to control if JVM crashes are tracked or not. Default value is [true].
* Allows to control if JVM crashes are tracked or not. Default value is `true`.
*
* @param crashReportsEnabled whether crashes are tracked and sent to Datadog
*/
Expand All @@ -245,6 +247,19 @@ internal constructor(
return this
}

/**
* Sets the strategy to handle scalability issues.
* Many operations (data processing, event I/O, …) are queued in background threads.
* This configuration lets one decide how to handle the edge case when the queue starts growing, which can lead
* to a lot of memory usage, delayed processing, and possibly OOM or ANR.
* @param backpressureStrategy the backpressure strategy (default strategy ignores new tasks if a queue reaches
* 1024 items)
*/
fun setBackpressureStrategy(backpressureStrategy: BackPressureStrategy): Builder {
0xnm marked this conversation as resolved.
Show resolved Hide resolved
this.backpressureStrategy = backpressureStrategy
return this
}

internal fun allowClearTextHttp(): Builder {
coreConfig = coreConfig.copy(
needsClearTextHttp = true
Expand All @@ -262,6 +277,15 @@ internal constructor(
*/
private const val NO_VARIANT: String = ""

private const val DEFAULT_BACKPRESSURE_THRESHOLD = 1024

internal val DEFAULT_BACKPRESSURE_STRATEGY = BackPressureStrategy(
DEFAULT_BACKPRESSURE_THRESHOLD,
{},
{},
BackPressureMitigation.IGNORE_NEWEST
)

internal val DEFAULT_CORE_CONFIG = Core(
needsClearTextHttp = false,
enableDeveloperModeWhenDebuggable = false,
Expand All @@ -273,7 +297,8 @@ internal constructor(
encryption = null,
site = DatadogSite.US1,
batchProcessingLevel = BatchProcessingLevel.MEDIUM,
persistenceStrategyFactory = null
persistenceStrategyFactory = null,
backpressureStrategy = DEFAULT_BACKPRESSURE_STRATEGY
)

internal const val NETWORK_REQUESTS_TRACKING_FEATURE_NAME = "Network requests"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.internal.thread

import com.datadog.android.api.InternalLogger
import com.datadog.android.core.configuration.BackPressureStrategy
import com.datadog.android.core.thread.FlushableExecutorService
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

internal class BackPressureExecutorService(
val logger: InternalLogger,
backpressureStrategy: BackPressureStrategy
) : ThreadPoolExecutor(
CORE_POOL_SIZE,
CORE_POOL_SIZE,
0xnm marked this conversation as resolved.
Show resolved Hide resolved
THREAD_POOL_MAX_KEEP_ALIVE_MS,
TimeUnit.MILLISECONDS,
BackPressuredBlockingQueue(logger, backpressureStrategy)
),
FlushableExecutorService {

// region FlushableExecutorService

@Suppress("TooGenericExceptionCaught")
override fun drainTo(destination: MutableCollection<Runnable>) {
try {
queue.drainTo(destination)
} catch (e: IllegalArgumentException) {
onDrainException(e)
} catch (e: NullPointerException) {
onDrainException(e)
} catch (e: UnsupportedOperationException) {
onDrainException(e)
} catch (e: ClassCastException) {
onDrainException(e)
}
}

// endregion

// region ThreadPoolExecutor

override fun afterExecute(r: Runnable?, t: Throwable?) {
super.afterExecute(r, t)
loggingAfterExecute(r, t, logger)
}

// endregion

private fun onDrainException(e: RuntimeException) {
logger.log(
InternalLogger.Level.ERROR,
listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY),
{ "Unable to drain BackPressureExecutorService queue" },
e
)
}

companion object {
private const val CORE_POOL_SIZE = 1
private val THREAD_POOL_MAX_KEEP_ALIVE_MS = TimeUnit.SECONDS.toMillis(5)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.internal.thread

import com.datadog.android.api.InternalLogger
import com.datadog.android.core.configuration.BackPressureMitigation
import com.datadog.android.core.configuration.BackPressureStrategy
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

internal class BackPressuredBlockingQueue<E : Any>(
private val logger: InternalLogger,
private val backPressureStrategy: BackPressureStrategy
) : LinkedBlockingQueue<E>(
backPressureStrategy.capacity
) {
override fun offer(e: E): Boolean {
return addWithBackPressure(e) {
@Suppress("UnsafeThirdPartyFunctionCall") // can't have NPE here
super.offer(it)
}
}

override fun offer(e: E, timeout: Long, unit: TimeUnit?): Boolean {
@Suppress("UnsafeThirdPartyFunctionCall") // can't have NPE here
val accepted = super.offer(e, timeout, unit)
if (!accepted) {
return offer(e)
xgouchet marked this conversation as resolved.
Show resolved Hide resolved
} else {
if (remainingCapacity() == 0) {
xgouchet marked this conversation as resolved.
Show resolved Hide resolved
onThresholdReached()
}
return true
}
}

private fun addWithBackPressure(
e: E,
operation: (E) -> Boolean
): Boolean {
val remainingCapacity = remainingCapacity()
return if (remainingCapacity == 0) {
when (backPressureStrategy.backpressureMitigation) {
BackPressureMitigation.DROP_OLDEST -> {
val first = take()
onItemDropped(first)
operation(e)
}

BackPressureMitigation.IGNORE_NEWEST -> {
onItemDropped(e)
true
}
}
} else {
if (remainingCapacity == 1) {
onThresholdReached()
0xnm marked this conversation as resolved.
Show resolved Hide resolved
}
operation(e)
}
}

private fun onThresholdReached() {
backPressureStrategy.onThresholdReached()
logger.log(
level = InternalLogger.Level.WARN,
targets = listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY),
xgouchet marked this conversation as resolved.
Show resolved Hide resolved
messageBuilder = { "BackPressuredBlockingQueue reached capacity:${backPressureStrategy.capacity}" },
0xnm marked this conversation as resolved.
Show resolved Hide resolved
throwable = null,
onlyOnce = false,
additionalProperties = mapOf("backpressure.capacity" to backPressureStrategy.capacity)
)
}

private fun onItemDropped(item: E) {
backPressureStrategy.onItemDropped(item)

logger.log(
level = InternalLogger.Level.ERROR,
targets = listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY),
messageBuilder = { "Dropped item in BackPressuredBlockingQueue queue: $item" },
throwable = null,
onlyOnce = false,
additionalProperties = mapOf("backpressure.capacity" to backPressureStrategy.capacity)
)
}
}