Skip to content

Commit

Permalink
Merge pull request #1938 from DataDog/xgouchet/RUM-3670/backpressure_…
Browse files Browse the repository at this point in the history
…strategy

RUM-3670 backpressure strategy
  • Loading branch information
xgouchet committed Mar 26, 2024
2 parents d9b9645 + 707aa84 commit c9243d1
Show file tree
Hide file tree
Showing 14 changed files with 1,222 additions and 22 deletions.
10 changes: 10 additions & 0 deletions dd-sdk-android-core/api/apiSurface
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ class com.datadog.android.core.SdkReference
constructor(String? = null, (com.datadog.android.api.SdkCore) -> Unit = {})
fun get(): com.datadog.android.api.SdkCore?
fun <T> allowThreadDiskReads(() -> T): T
enum com.datadog.android.core.configuration.BackPressureMitigation
- DROP_OLDEST
- IGNORE_NEWEST
data class com.datadog.android.core.configuration.BackPressureStrategy
constructor(Int, () -> Unit, (Any) -> Unit, BackPressureMitigation)
enum com.datadog.android.core.configuration.BatchProcessingLevel
constructor(Int)
- LOW
Expand All @@ -184,6 +189,7 @@ data class com.datadog.android.core.configuration.Configuration
fun setEncryption(com.datadog.android.security.Encryption): Builder
fun setPersistenceStrategyFactory(com.datadog.android.core.persistence.PersistenceStrategy.Factory?): Builder
fun setCrashReportsEnabled(Boolean): Builder
fun setBackpressureStrategy(BackPressureStrategy): Builder
companion object
class com.datadog.android.core.configuration.HostsSanitizer
fun sanitizeHosts(List<String>, String): List<String>
Expand Down Expand Up @@ -279,6 +285,10 @@ class com.datadog.android.core.sampling.RateBasedSampler : Sampler
interface com.datadog.android.core.sampling.Sampler
fun sample(): Boolean
fun getSampleRate(): Float?
interface com.datadog.android.core.thread.FlushableExecutorService : java.util.concurrent.ExecutorService
fun drainTo(MutableCollection<Runnable>)
interface Factory
fun create(com.datadog.android.api.InternalLogger, com.datadog.android.core.configuration.BackPressureStrategy): FlushableExecutorService
interface com.datadog.android.event.EventMapper<T: Any>
fun map(T): T?
class com.datadog.android.event.MapperSerializer<T: Any> : com.datadog.android.core.persistence.Serializer<T>
Expand Down
33 changes: 33 additions & 0 deletions dd-sdk-android-core/api/dd-sdk-android-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,30 @@ public final class com/datadog/android/core/StrictModeExtKt {
public static final fun allowThreadDiskReads (Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
}

public final class com/datadog/android/core/configuration/BackPressureMitigation : java/lang/Enum {
public static final field DROP_OLDEST Lcom/datadog/android/core/configuration/BackPressureMitigation;
public static final field IGNORE_NEWEST Lcom/datadog/android/core/configuration/BackPressureMitigation;
public static fun valueOf (Ljava/lang/String;)Lcom/datadog/android/core/configuration/BackPressureMitigation;
public static fun values ()[Lcom/datadog/android/core/configuration/BackPressureMitigation;
}

public final class com/datadog/android/core/configuration/BackPressureStrategy {
public fun <init> (ILkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;Lcom/datadog/android/core/configuration/BackPressureMitigation;)V
public final fun component1 ()I
public final fun component2 ()Lkotlin/jvm/functions/Function0;
public final fun component3 ()Lkotlin/jvm/functions/Function1;
public final fun component4 ()Lcom/datadog/android/core/configuration/BackPressureMitigation;
public final fun copy (ILkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;Lcom/datadog/android/core/configuration/BackPressureMitigation;)Lcom/datadog/android/core/configuration/BackPressureStrategy;
public static synthetic fun copy$default (Lcom/datadog/android/core/configuration/BackPressureStrategy;ILkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;Lcom/datadog/android/core/configuration/BackPressureMitigation;ILjava/lang/Object;)Lcom/datadog/android/core/configuration/BackPressureStrategy;
public fun equals (Ljava/lang/Object;)Z
public final fun getBackpressureMitigation ()Lcom/datadog/android/core/configuration/BackPressureMitigation;
public final fun getCapacity ()I
public final fun getOnItemDropped ()Lkotlin/jvm/functions/Function1;
public final fun getOnThresholdReached ()Lkotlin/jvm/functions/Function0;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class com/datadog/android/core/configuration/BatchProcessingLevel : java/lang/Enum {
public static final field HIGH Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public static final field LOW Lcom/datadog/android/core/configuration/BatchProcessingLevel;
Expand Down Expand Up @@ -492,6 +516,7 @@ public final class com/datadog/android/core/configuration/Configuration$Builder
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun build ()Lcom/datadog/android/core/configuration/Configuration;
public final fun setAdditionalConfiguration (Ljava/util/Map;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setBackpressureStrategy (Lcom/datadog/android/core/configuration/BackPressureStrategy;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setBatchProcessingLevel (Lcom/datadog/android/core/configuration/BatchProcessingLevel;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setBatchSize (Lcom/datadog/android/core/configuration/BatchSize;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setCrashReportsEnabled (Z)Lcom/datadog/android/core/configuration/Configuration$Builder;
Expand Down Expand Up @@ -735,6 +760,14 @@ public abstract interface class com/datadog/android/core/sampling/Sampler {
public abstract fun sample ()Z
}

public abstract interface class com/datadog/android/core/thread/FlushableExecutorService : java/util/concurrent/ExecutorService {
public abstract fun drainTo (Ljava/util/Collection;)V
}

public abstract interface class com/datadog/android/core/thread/FlushableExecutorService$Factory {
public abstract fun create (Lcom/datadog/android/api/InternalLogger;Lcom/datadog/android/core/configuration/BackPressureStrategy;)Lcom/datadog/android/core/thread/FlushableExecutorService;
}

public abstract interface class com/datadog/android/event/EventMapper {
public abstract fun map (Ljava/lang/Object;)Ljava/lang/Object;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.configuration

/**
* Defines the mitigation to use when a queue hits the maximum back pressure capacity.
*/
enum class BackPressureMitigation {

/** Drop the oldest items already in the queue to make room for new ones. */
DROP_OLDEST,

/** Ignore newest items that are not yet in the queue. */
IGNORE_NEWEST
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.configuration

/**
* @param capacity the maximum size of the queue
* @param onThresholdReached callback called when the queue reaches full capacity
* @param onItemDropped called when an item is dropped because of this backpressure strategy
* @param backpressureMitigation the mitigation to use when reaching the capacity
*/
data class BackPressureStrategy(
val capacity: Int,
val onThresholdReached: () -> Unit,
val onItemDropped: (Any) -> Unit,
val backpressureMitigation: BackPressureMitigation
)
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ internal constructor(
val encryption: Encryption?,
val site: DatadogSite,
val batchProcessingLevel: BatchProcessingLevel,
val persistenceStrategyFactory: PersistenceStrategy.Factory?
val persistenceStrategyFactory: PersistenceStrategy.Factory?,
val backpressureStrategy: BackPressureStrategy
)

// region Builder
Expand Down Expand Up @@ -70,6 +71,7 @@ internal constructor(

private var coreConfig = DEFAULT_CORE_CONFIG
private var crashReportsEnabled: Boolean = true
private var backpressureStrategy: BackPressureStrategy = DEFAULT_BACKPRESSURE_STRATEGY

internal var hostsSanitizer = HostsSanitizer()

Expand Down Expand Up @@ -236,7 +238,7 @@ internal constructor(
}

/**
* Allows to control if JVM crashes are tracked or not. Default value is [true].
* Allows to control if JVM crashes are tracked or not. Default value is `true`.
*
* @param crashReportsEnabled whether crashes are tracked and sent to Datadog
*/
Expand All @@ -245,6 +247,19 @@ internal constructor(
return this
}

/**
* Sets the strategy to handle scalability issues.
* Many operations (data processing, event I/O, …) are queued in background threads.
* This configuration lets one decide how to handle the edge case when the queue starts growing, which can lead
* to a lot of memory usage, delayed processing, and possibly OOM or ANR.
* @param backpressureStrategy the backpressure strategy (default strategy ignores new tasks if a queue reaches
* 1024 items)
*/
fun setBackpressureStrategy(backpressureStrategy: BackPressureStrategy): Builder {
this.backpressureStrategy = backpressureStrategy
return this
}

internal fun allowClearTextHttp(): Builder {
coreConfig = coreConfig.copy(
needsClearTextHttp = true
Expand All @@ -262,6 +277,15 @@ internal constructor(
*/
private const val NO_VARIANT: String = ""

private const val DEFAULT_BACKPRESSURE_THRESHOLD = 1024

internal val DEFAULT_BACKPRESSURE_STRATEGY = BackPressureStrategy(
DEFAULT_BACKPRESSURE_THRESHOLD,
{},
{},
BackPressureMitigation.IGNORE_NEWEST
)

internal val DEFAULT_CORE_CONFIG = Core(
needsClearTextHttp = false,
enableDeveloperModeWhenDebuggable = false,
Expand All @@ -273,7 +297,8 @@ internal constructor(
encryption = null,
site = DatadogSite.US1,
batchProcessingLevel = BatchProcessingLevel.MEDIUM,
persistenceStrategyFactory = null
persistenceStrategyFactory = null,
backpressureStrategy = DEFAULT_BACKPRESSURE_STRATEGY
)

internal const val NETWORK_REQUESTS_TRACKING_FEATURE_NAME = "Network requests"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.internal.thread

import com.datadog.android.api.InternalLogger
import com.datadog.android.core.configuration.BackPressureStrategy
import com.datadog.android.core.thread.FlushableExecutorService
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

/**
* A single threaded executor service using a BackPressureStrategy.
*/
internal class BackPressureExecutorService(
val logger: InternalLogger,
backpressureStrategy: BackPressureStrategy
) : ThreadPoolExecutor(
CORE_POOL_SIZE,
CORE_POOL_SIZE,
THREAD_POOL_MAX_KEEP_ALIVE_MS,
TimeUnit.MILLISECONDS,
BackPressuredBlockingQueue(logger, backpressureStrategy)
),
FlushableExecutorService {

// region FlushableExecutorService

@Suppress("TooGenericExceptionCaught")
override fun drainTo(destination: MutableCollection<Runnable>) {
try {
queue.drainTo(destination)
} catch (e: IllegalArgumentException) {
onDrainException(e)
} catch (e: NullPointerException) {
onDrainException(e)
} catch (e: UnsupportedOperationException) {
onDrainException(e)
} catch (e: ClassCastException) {
onDrainException(e)
}
}

// endregion

// region ThreadPoolExecutor

override fun afterExecute(r: Runnable?, t: Throwable?) {
super.afterExecute(r, t)
loggingAfterExecute(r, t, logger)
}

// endregion

private fun onDrainException(e: RuntimeException) {
logger.log(
InternalLogger.Level.ERROR,
listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY),
{ "Unable to drain BackPressureExecutorService queue" },
e
)
}

companion object {
private const val CORE_POOL_SIZE = 1
private val THREAD_POOL_MAX_KEEP_ALIVE_MS = TimeUnit.SECONDS.toMillis(5)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.internal.thread

import com.datadog.android.api.InternalLogger
import com.datadog.android.core.configuration.BackPressureMitigation
import com.datadog.android.core.configuration.BackPressureStrategy
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

internal class BackPressuredBlockingQueue<E : Any>(
private val logger: InternalLogger,
private val backPressureStrategy: BackPressureStrategy
) : LinkedBlockingQueue<E>(
backPressureStrategy.capacity
) {
override fun offer(e: E): Boolean {
return addWithBackPressure(e) {
@Suppress("UnsafeThirdPartyFunctionCall") // can't have NPE here
super.offer(it)
}
}

override fun offer(e: E, timeout: Long, unit: TimeUnit?): Boolean {
@Suppress("UnsafeThirdPartyFunctionCall") // can't have NPE here
val accepted = super.offer(e, timeout, unit)
if (!accepted) {
return offer(e)
} else {
if (remainingCapacity() == 0) {
onThresholdReached()
}
return true
}
}

private fun addWithBackPressure(
e: E,
operation: (E) -> Boolean
): Boolean {
val remainingCapacity = remainingCapacity()
return if (remainingCapacity == 0) {
when (backPressureStrategy.backpressureMitigation) {
BackPressureMitigation.DROP_OLDEST -> {
val first = take()
onItemDropped(first)
operation(e)
}

BackPressureMitigation.IGNORE_NEWEST -> {
onItemDropped(e)
true
}
}
} else {
if (remainingCapacity == 1) {
onThresholdReached()
}
operation(e)
}
}

private fun onThresholdReached() {
backPressureStrategy.onThresholdReached()
logger.log(
level = InternalLogger.Level.WARN,
targets = listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY),
messageBuilder = { "BackPressuredBlockingQueue reached capacity:${backPressureStrategy.capacity}" },
throwable = null,
onlyOnce = false,
additionalProperties = mapOf("backpressure.capacity" to backPressureStrategy.capacity)
)
}

private fun onItemDropped(item: E) {
backPressureStrategy.onItemDropped(item)

logger.log(
level = InternalLogger.Level.ERROR,
targets = listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY),
messageBuilder = { "Dropped item in BackPressuredBlockingQueue queue: $item" },
throwable = null,
onlyOnce = false,
additionalProperties = mapOf("backpressure.capacity" to backPressureStrategy.capacity)
)
}
}

0 comments on commit c9243d1

Please sign in to comment.