Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUM-1837 Update logic to send N batches sequentially in each cycle #1531

Merged
merged 30 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bb690f8
RUM-1837 Update logic to send 10 batches sequentially in each cycle
maciejburda Oct 25, 2023
f021603
RUM-1837 Add batch processing level
maciejburda Oct 26, 2023
26148b8
RUM-1837 Lint fix
maciejburda Oct 26, 2023
9d264ad
RUM-1837 Improvements
maciejburda Oct 26, 2023
9ea2899
RUM-1837 Add tests and update docs
maciejburda Oct 30, 2023
186c1fb
RUM-1837 Additional tests
maciejburda Oct 30, 2023
e309489
RUM-1837 Minor improvements
maciejburda Oct 30, 2023
dcd836a
RUM-1837 Update CHANGELOG.md
maciejburda Oct 30, 2023
86a2231
RUM-1837 Fix failing tests
maciejburda Oct 30, 2023
830d982
RUM-1837 Improve implementation
maciejburda Oct 30, 2023
303f534
Update CHANGELOG.md
maciejburda Oct 31, 2023
2954268
RUM-1837 PR fixes
maciejburda Oct 31, 2023
c677e69
RUM-1837 Modify increase/decrease delay logic for consistency
maciejburda Oct 31, 2023
c2d4975
RUM-1837 Update benchmark tests
maciejburda Oct 31, 2023
550bd2b
RUM-1837 Add configuration telemetry
maciejburda Nov 3, 2023
d674613
RUM-1837 Fix bad merge
maciejburda Nov 6, 2023
52e8bcf
RUM-1837 Update telemetry configuration property name
maciejburda Nov 7, 2023
523906d
RUM-1837 Update schema model
maciejburda Nov 8, 2023
e0a9f81
RUM-1837 Update tests
maciejburda Nov 8, 2023
86028aa
RUM-1837 Fix linter issues
maciejburda Nov 8, 2023
ea08527
RUM-1837 PR fixes
maciejburda Nov 10, 2023
f780451
RUM-1837 Refactor
maciejburda Nov 10, 2023
0ce388a
RUM-1837 Fix test
maciejburda Nov 10, 2023
3a57c70
RUM-1837 Correct extension function signature
maciejburda Nov 10, 2023
f12ab64
RUM-1837 Refactor to multiple DispatchWorkItems
maciejburda Nov 13, 2023
daa7235
RUM-1837 PR Fixes
maciejburda Nov 13, 2023
af5ac06
RUM-1837 Update to single upload work
maciejburda Nov 14, 2023
e4cc41b
RUM-1837 PR fixes
maciejburda Nov 15, 2023
760e44e
RUM-1837 Fix linter issues
maciejburda Nov 15, 2023
bcd8ccf
RUM-1837 Minor refactor
maciejburda Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class LoggingStorageBenchmarkTests: XCTestCase {

measureMetrics([.wallClockTime], automaticallyStartMeasuring: false) {
self.startMeasuring()
let batch = reader.readNextBatch()
let batch = reader.readNextBatches(1).first
self.stopMeasuring()

XCTAssertNotNil(batch, "Not enough batch files were created for this benchmark.")
Expand Down
8 changes: 7 additions & 1 deletion BenchmarkTests/DataStorage/RUMStorageBenchmarkTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class RUMStorageBenchmarkTests: XCTestCase {

measureMetrics([.wallClockTime], automaticallyStartMeasuring: false) {
self.startMeasuring()
let batch = reader.readNextBatch()
let batch = reader.readNextBatches(1).first
self.stopMeasuring()

XCTAssertNotNil(batch, "Not enough batch files were created for this benchmark.")
Expand All @@ -82,3 +82,9 @@ class RUMStorageBenchmarkTests: XCTestCase {
}
}
}

extension Reader {
func readNextBatches(_ limit: Int = .max) -> [Batch] {
return readFiles(limit: limit).compactMap { readBatch(from: $0) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class TracingStorageBenchmarkTests: XCTestCase {

measureMetrics([.wallClockTime], automaticallyStartMeasuring: false) {
self.startMeasuring()
let batch = reader.readNextBatch()
let batch = reader.readNextBatches(1).first
self.stopMeasuring()

XCTAssertNotNil(batch, "Not enough batch files were created for this benchmark.")
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- [FEATURE] Change default tracing headers for first party hosts to use both Datadog headers and W3C `tracecontext` headers. See [#1529][]
- [FEATURE] Add tracestate headers when using W3C tracecontext. See [#1536][]
- [BUGFIX] Fix RUM ViewController leaks. See [#1533][]
- [FEATURE] Add `BatchProcessingLevel` configuration allowing to process more batches within single read/upload cycle. See [#1531][]

# 2.4.0 / 18-10-2023

Expand Down Expand Up @@ -554,6 +555,7 @@ Release `2.0` introduces breaking changes. Follow the [Migration Guide](MIGRATIO
[#1529]: https://github.com/DataDog/dd-sdk-ios/pull/1529
[#1533]: https://github.com/DataDog/dd-sdk-ios/pull/1533
[#1536]: https://github.com/DataDog/dd-sdk-ios/pull/1536
[#1531]: https://github.com/DataDog/dd-sdk-ios/pull/1531
[@00fa9a]: https://github.com/00FA9A
[@britton-earnin]: https://github.com/Britton-Earnin
[@hengyu]: https://github.com/Hengyu
Expand Down
7 changes: 7 additions & 0 deletions DatadogCore/Sources/Core/DatadogCore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ internal final class DatadogCore {
/// The core context provider.
internal let contextProvider: DatadogContextProvider

/// Flag defining if background tasks are enabled.
internal let backgroundTasksEnabled: Bool

/// Maximum number of batches per upload.
internal let maxBatchesPerUpload: Int

/// Creates a core instance.
///
/// - Parameters:
Expand All @@ -87,6 +91,7 @@ internal final class DatadogCore {
encryption: DataEncryption?,
contextProvider: DatadogContextProvider,
applicationVersion: String,
maxBatchesPerUpload: Int,
backgroundTasksEnabled: Bool
) {
self.directory = directory
Expand All @@ -95,6 +100,7 @@ internal final class DatadogCore {
self.httpClient = httpClient
self.encryption = encryption
self.contextProvider = contextProvider
self.maxBatchesPerUpload = maxBatchesPerUpload
self.backgroundTasksEnabled = backgroundTasksEnabled
self.applicationVersionPublisher = ApplicationVersionPublisher(version: applicationVersion)
self.consentPublisher = TrackingConsentPublisher(consent: initialConsent)
Expand Down Expand Up @@ -246,6 +252,7 @@ extension DatadogCore: DatadogCoreProtocol {
httpClient: httpClient,
performance: performancePreset,
backgroundTasksEnabled: backgroundTasksEnabled,
maxBatchesPerUpload: maxBatchesPerUpload,
telemetry: telemetry
)

Expand Down
31 changes: 15 additions & 16 deletions DatadogCore/Sources/Core/Storage/FilesOrchestrator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ internal protocol FilesOrchestratorType: AnyObject {

func getNewWritableFile(writeSize: UInt64) throws -> WritableFile
func getWritableFile(writeSize: UInt64) throws -> WritableFile
func getReadableFile(excludingFilesNamed excludedFileNames: Set<String>) -> ReadableFile?
func getReadableFiles(excludingFilesNamed excludedFileNames: Set<String>, limit: Int) -> [ReadableFile]
func delete(readableFile: ReadableFile, deletionReason: BatchDeletedMetric.RemovalReason)

var ignoreFilesAgeWhenReading: Bool { get set }
Expand Down Expand Up @@ -150,33 +150,32 @@ internal class FilesOrchestrator: FilesOrchestratorType {

// MARK: - `ReadableFile` orchestration

func getReadableFile(excludingFilesNamed excludedFileNames: Set<String> = []) -> ReadableFile? {
func getReadableFiles(excludingFilesNamed excludedFileNames: Set<String> = [], limit: Int = .max) -> [ReadableFile] {
do {
let filesWithCreationDate = try directory.files()
let filesFromOldest = try directory.files()
.map { (file: $0, creationDate: fileCreationDateFrom(fileName: $0.name)) }
.compactMap { try deleteFileIfItsObsolete(file: $0.file, fileCreationDate: $0.creationDate) }

guard let (oldestFile, creationDate) = filesWithCreationDate
.filter({ excludedFileNames.contains($0.file.name) == false })
.sorted(by: { $0.creationDate < $1.creationDate })
.first
else {
return nil
}

#if DD_SDK_COMPILED_FOR_TESTING
if ignoreFilesAgeWhenReading {
return oldestFile
return filesFromOldest
.prefix(limit)
.map { $0.file }
}
#endif

let oldestFileAge = dateProvider.now.timeIntervalSince(creationDate)
let fileIsOldEnough = oldestFileAge >= performance.minFileAgeForRead

return fileIsOldEnough ? oldestFile : nil
let filtered = filesFromOldest
.filter {
let fileAge = dateProvider.now.timeIntervalSince($0.creationDate)
return excludedFileNames.contains($0.file.name) == false && fileAge >= performance.minFileAgeForRead
}
return filtered
.prefix(limit)
.map { $0.file }
} catch {
telemetry.error("Failed to obtain readable file", error: error)
return nil
return []
}
}

Expand Down
10 changes: 8 additions & 2 deletions DatadogCore/Sources/Core/Storage/Reading/DataReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ internal final class DataReader: Reader {
self.fileReader = fileReader
}

func readNextBatch() -> Batch? {
func readFiles(limit: Int) -> [ReadableFile] {
queue.sync {
self.fileReader.readNextBatch()
self.fileReader.readFiles(limit: limit)
}
}

func readBatch(from file: ReadableFile) -> Batch? {
queue.sync {
self.fileReader.readBatch(from: file)
}
}

Expand Down
8 changes: 4 additions & 4 deletions DatadogCore/Sources/Core/Storage/Reading/FileReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ internal final class FileReader: Reader {

// MARK: - Reading batches

func readNextBatch() -> Batch? {
guard let file = orchestrator.getReadableFile(excludingFilesNamed: filesRead) else {
return nil
}
func readFiles(limit: Int) -> [ReadableFile] {
return orchestrator.getReadableFiles(excludingFilesNamed: filesRead, limit: limit)
}

func readBatch(from file: ReadableFile) -> Batch? {
do {
let dataBlocks = try decode(stream: file.stream())
return Batch(dataBlocks: dataBlocks, file: file)
Expand Down
10 changes: 9 additions & 1 deletion DatadogCore/Sources/Core/Storage/Reading/Reader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ extension Batch {

/// A type, reading batched data.
internal protocol Reader {
func readNextBatch() -> Batch?
/// Reads files from the storage.
/// - Parameter limit: maximum number of files to read.
func readFiles(limit: Int) -> [ReadableFile]
/// Reads batch from given file.
/// - Parameter file: file to read batch from.
func readBatch(from file: ReadableFile) -> Batch?
/// Marks given batch as read.
/// - Parameter batch: batch to mark as read.
/// - Parameter reason: reason for removing the batch.
func markBatchAsRead(_ batch: Batch, reason: BatchDeletedMetric.RemovalReason)
}
121 changes: 74 additions & 47 deletions DatadogCore/Sources/Core/Upload/DataUploadWorker.swift
maciejburda marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,16 @@ internal class DataUploadWorker: DataUploadWorkerType {
private let contextProvider: DatadogContextProvider
/// Delay used to schedule consecutive uploads.
private let delay: DataUploadDelay
/// Maximum number of batches to upload in one request.
private let maxBatchesPerUpload: Int

/// Upload work scheduled by this worker.
/// Batch upload work scheduled by this worker.
@ReadWriteLock
private var uploadWork: DispatchWorkItem?
/// Indicates if the worker was cancelled.
@ReadWriteLock
private var cancelled = false
maciejburda marked this conversation as resolved.
Show resolved Hide resolved

/// Telemetry interface.
private let telemetry: Telemetry

Expand All @@ -46,6 +53,7 @@ internal class DataUploadWorker: DataUploadWorkerType {
delay: DataUploadDelay,
featureName: String,
telemetry: Telemetry,
maxBatchesPerUpload: Int,
backgroundTaskCoordinator: BackgroundTaskCoordinator? = nil
) {
self.queue = queue
Expand All @@ -55,95 +63,113 @@ internal class DataUploadWorker: DataUploadWorkerType {
self.contextProvider = contextProvider
self.backgroundTaskCoordinator = backgroundTaskCoordinator
self.delay = delay
self.maxBatchesPerUpload = maxBatchesPerUpload
self.featureName = featureName
self.telemetry = telemetry

let uploadWork = DispatchWorkItem { [weak self] in
guard let self = self else {
scheduleNextCycle()
}

private func scheduleNextCycle() {
queue.asyncAfter(deadline: .now() + delay.current) { [weak self] in
guard let self = self, !self.cancelled else {
return
}

let context = contextProvider.read()
let blockersForUpload = self.uploadConditions.blockersForUpload(with: context)
let blockersForUpload = uploadConditions.blockersForUpload(with: context)
let isSystemReady = blockersForUpload.isEmpty
let nextBatch = isSystemReady ? self.fileReader.readNextBatch() : nil
if let batch = nextBatch {
let files = isSystemReady ? fileReader.readFiles(limit: maxBatchesPerUpload) : nil
if let files = files, !files.isEmpty {
DD.logger.debug("⏳ (\(self.featureName)) Uploading batches...")
self.backgroundTaskCoordinator?.beginBackgroundTask()
maciejburda marked this conversation as resolved.
Show resolved Hide resolved
DD.logger.debug("⏳ (\(self.featureName)) Uploading batch...")
self.uploadFile(from: files.reversed(), context: context)
maxep marked this conversation as resolved.
Show resolved Hide resolved
} else {
let batchLabel = files?.isEmpty == false ? "YES" : (isSystemReady ? "NO" : "NOT CHECKED")
DD.logger.debug("💡 (\(self.featureName)) No upload. Batch to upload: \(batchLabel), System conditions: \(blockersForUpload.description)")
self.delay.increase()
self.backgroundTaskCoordinator?.endBackgroundTask()
maciejburda marked this conversation as resolved.
Show resolved Hide resolved
self.scheduleNextCycle()
}
}
}

private func uploadFile(from files: [ReadableFile], context: DatadogContext) {
let uploadWork = DispatchWorkItem { [weak self] in
guard let self = self else {
return
}
var files = files
guard let file = files.popLast() else {
scheduleNextCycle()
return
}
if let batch = self.fileReader.readBatch(from: file) {
do {
// Upload batch
let uploadStatus = try self.dataUploader.upload(
events: batch.events,
context: context
)

// Delete or keep batch depending on the upload status
if uploadStatus.needsRetry {
self.delay.increase()

DD.logger.debug(" → (\(self.featureName)) not delivered, will be retransmitted: \(uploadStatus.userDebugDescription)")
self.delay.increase()
self.scheduleNextCycle()
return
} else {
self.fileReader.markBatchAsRead(batch, reason: .intakeCode(responseCode: uploadStatus.responseCode ?? -1)) // -1 is unexpected here
self.delay.decrease()

DD.logger.debug(" → (\(self.featureName)) accepted, won't be retransmitted: \(uploadStatus.userDebugDescription)")
if files.isEmpty {
self.delay.decrease()
}
self.fileReader.markBatchAsRead(
batch,
reason: .intakeCode(responseCode: uploadStatus.responseCode)
)
}

switch uploadStatus.error {
case .unauthorized:
DD.logger.error("⚠️ Make sure that the provided token still exists and you're targeting the relevant Datadog site.")
case let .httpError(statusCode: statusCode):
telemetry.error("Data upload finished with status code: \(statusCode)")
case let .networkError(error: error):
telemetry.error("Data upload finished with error", error: error)
case .none: break
if let error = uploadStatus.error {
switch error {
case .unauthorized:
DD.logger.error("⚠️ Make sure that the provided token still exists and you're targeting the relevant Datadog site.")
case let .httpError(statusCode: statusCode):
telemetry.error("Data upload finished with status code: \(statusCode)")
case let .networkError(error: error):
telemetry.error("Data upload finished with error", error: error)
}
}
} catch let error {
// If upload can't be initiated do not retry, so drop the batch:
self.fileReader.markBatchAsRead(batch, reason: .invalid)
telemetry.error("Failed to initiate '\(self.featureName)' data upload", error: error)
}
}
if files.isEmpty {
self.scheduleNextCycle()
} else {
let batchLabel = nextBatch != nil ? "YES" : (isSystemReady ? "NO" : "NOT CHECKED")
DD.logger.debug("💡 (\(self.featureName)) No upload. Batch to upload: \(batchLabel), System conditions: \(blockersForUpload.description)")

self.delay.increase()
self.backgroundTaskCoordinator?.endBackgroundTask()
self.uploadFile(from: files, context: context)
}

self.scheduleNextUpload(after: self.delay.current)
}
maciejburda marked this conversation as resolved.
Show resolved Hide resolved

self.uploadWork = uploadWork

scheduleNextUpload(after: self.delay.current)
}

private func scheduleNextUpload(after delay: TimeInterval) {
guard let work = uploadWork else {
return
}

queue.asyncAfter(deadline: .now() + delay, execute: work)
queue.async(execute: uploadWork)
}

/// Sends all unsent data synchronously.
/// - It performs arbitrary upload (without checking upload condition and without re-transmitting failed uploads).
internal func flushSynchronously() {
queue.sync {
while let nextBatch = self.fileReader.readNextBatch() {
queue.sync { [fileReader, dataUploader, contextProvider] in
maciejburda marked this conversation as resolved.
Show resolved Hide resolved
for file in fileReader.readFiles(limit: .max) {
guard let nextBatch = fileReader.readBatch(from: file) else {
continue
}
defer {
// RUMM-3459 Delete the underlying batch with `.flushed` reason that will be ignored in reported
// metrics or telemetry. This is legitimate as long as `flush()` routine is only available for testing
// purposes and never run in production apps.
self.fileReader.markBatchAsRead(nextBatch, reason: .flushed)
fileReader.markBatchAsRead(nextBatch, reason: .flushed)
}
do {
// Try uploading the batch and do one more retry on failure.
_ = try self.dataUploader.upload(events: nextBatch.events, context: contextProvider.read())
_ = try dataUploader.upload(events: nextBatch.events, context: contextProvider.read())
} catch {
_ = try? self.dataUploader.upload(events: nextBatch.events, context: contextProvider.read())
_ = try? dataUploader.upload(events: nextBatch.events, context: contextProvider.read())
}
}
}
Expand All @@ -159,6 +185,7 @@ internal class DataUploadWorker: DataUploadWorkerType {
// fully executed, it will schedule another upload by calling `nextScheduledWork(after:)` at the end.
self.uploadWork?.cancel()
self.uploadWork = nil
self.cancelled = true
}
}
}
Expand Down
Loading