Skip to content

Commit

Permalink
Merge pull request #42 from Infomaniak/conversationPerformanceIssues
Browse files Browse the repository at this point in the history
feat(ParallelWrokerCore): ArrayAccumulator / ParallelTaskMapper + UT
  • Loading branch information
valentinperignon committed May 31, 2023
2 parents 882ed55 + cf97d8c commit f39617b
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 35 deletions.
65 changes: 65 additions & 0 deletions Sources/InfomaniakCore/Asynchronous/ArrayAccumulator.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Infomaniak Core - iOS
Copyright (C) 2021 Infomaniak Network SA
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import Foundation

/// A generic async/await accumulator, order preserving.
///
/// This is a thread safe actor.
/// It is backed by a fix length array, size defined at init.
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public actor ArrayAccumulator<T> {
/// Local Error Domain
public enum ErrorDomain: Error {
case outOfBounds
}

/// A buffer array
private var buffer: [T?]

/// Init an ArrayAccumulator
/// - Parameters:
/// - count: The count of items in the accumulator
/// - wrapping: The type of the content wrapped in an array
public init(count: Int, wrapping: T.Type) {
buffer = [T?](repeating: nil, count: count)
}

/// Set an item at a specified index
/// - Parameters:
/// - item: the item to be stored
/// - index: The index where we store the item
public func set(item: T?, atIndex index: Int) throws {
guard index < buffer.count else {
throw ErrorDomain.outOfBounds
}
buffer[index] = item
}

/// The accumulated ordered nullable content at the time of calling
/// - Returns: The ordered nullable content at the time of calling
public var accumulation: [T?] {
return buffer
}

/// The accumulated ordered result at the time of calling. Nil values are removed.
/// - Returns: The ordered result at the time of calling. Nil values are removed.
public var compactAccumulation: [T] {
return buffer.compactMap { $0 }
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Infomaniak kDrive - iOS App
Infomaniak Core - iOS
Copyright (C) 2021 Infomaniak Network SA
This program is free software: you can redistribute it and/or modify
Expand Down
74 changes: 74 additions & 0 deletions Sources/InfomaniakCore/Asynchronous/ParallelTaskMapper.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Infomaniak Core - iOS
Copyright (C) 2021 Infomaniak Network SA
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import Foundation

/// A concurrent way to map some computation with a closure to a collection of generic items.
///
/// Use default settings for optimised queue depth
///
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct ParallelTaskMapper {
/// internal processing TaskQueue
let taskQueue: TaskQueue

/// Init function
/// - Parameter concurrency: execution depth, keep default for optimized threading.
public init(concurrency: Int = max(4, ProcessInfo.processInfo.activeProcessorCount) /* parallel by default */ ) {
assert(concurrency > 0, "zero concurrency locks execution")
taskQueue = TaskQueue(concurrency: concurrency)
}

/// Map a task to a collection of items
///
/// With this, you can easily _parallelize_ *async/await* code.
///
/// This is using an underlying `TaskQueue` (with an optimized queue depth)
/// Using it to apply work to each item of a given collection.
/// - Parameters:
/// - collection: The input collection of items to be processed
/// - toOperation: The operation to be applied to the `collection` of items
/// - Returns: An ordered processed collection of the desired type
public func map<Input, Output>(
collection: [Input],
toOperation operation: @escaping @Sendable (_ item: Input) async throws -> Output?
) async throws -> [Output?] {
// Using an ArrayAccumulator to preserve the order of results
let accumulator = ArrayAccumulator(count: collection.count, wrapping: Output.self)

// Using a TaskGroup to track completion
_ = try await withThrowingTaskGroup(of: Void.self, returning: Void.self) { taskGroup in
for (index, item) in collection.enumerated() {
taskGroup.addTask {
let result = try await self.taskQueue.enqueue {
try await operation(item)
}

try? await accumulator.set(item: result, atIndex: index)
}
}

// await completion of all tasks
try await taskGroup.waitForAll()
}

// Get the accumulated results
let accumulated = await accumulator.accumulation
return accumulated
}
}
29 changes: 29 additions & 0 deletions Sources/InfomaniakCore/Asynchronous/Task+Finish.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Infomaniak Core - iOS
Copyright (C) 2023 Infomaniak Network SA
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import Foundation

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Task {
@discardableResult
/// Await the end of a Task. Result can be discarded.
/// - Returns: A discardable result
func finish() async -> Result<Success, Failure> {
await result
}
}
93 changes: 93 additions & 0 deletions Tests/InfomaniakCoreTests/UTArrayAccumulator.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Infomaniak Core - iOS
Copyright (C) 2023 Infomaniak Network SA
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import InfomaniakCore
import XCTest

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
final class UTArrayAccumulator: XCTestCase {
func testAllGood() async {
// GIVEN
let expectedCount = 5
let accumulator = ArrayAccumulator(count: expectedCount, wrapping: Int.self)

// WHEN
try? await accumulator.set(item: 3, atIndex: 1)
try? await accumulator.set(item: 1, atIndex: 0)

// THEN
let accumulated = await accumulator.accumulation
XCTAssertEqual(accumulated.count, expectedCount)

let compactAccumulated = await accumulator.compactAccumulation
XCTAssertEqual(compactAccumulated.count, 2)

XCTAssertEqual(compactAccumulated[0], 1)
XCTAssertEqual(compactAccumulated[1], 3)
}

func testOutOfBounds() async {
// GIVEN
let expectedCount = 5
let accumulator = ArrayAccumulator(count: expectedCount, wrapping: String.self)

// WHEN
do {
try await accumulator.set(item: "aa", atIndex: expectedCount)

// THEN
XCTFail("Unexpected")
} catch {
XCTAssertEqual(error as! ArrayAccumulator<String>.ErrorDomain, ArrayAccumulator<String>.ErrorDomain.outOfBounds)
}
}

func testBoundZero() async {
// GIVEN
let expectedCount = 5
let accumulator = ArrayAccumulator(count: expectedCount, wrapping: String.self)

// WHEN
do {
try await accumulator.set(item: "aa", atIndex: 0)

// THEN
let resultCount = await accumulator.compactAccumulation.count
XCTAssertEqual(resultCount, 1)
} catch {
XCTFail("Unexpected")
}
}

func testBoundLast() async {
// GIVEN
let expectedCount = 5
let accumulator = ArrayAccumulator(count: expectedCount, wrapping: String.self)

// WHEN
do {
try await accumulator.set(item: "aa", atIndex: expectedCount - 1)

// THEN
let resultCount = await accumulator.compactAccumulation.count
XCTAssertEqual(resultCount, 1)
} catch {
XCTFail("Unexpected")
}
}
}
44 changes: 44 additions & 0 deletions Tests/InfomaniakCoreTests/UTCollectionTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Infomaniak Core - iOS
Copyright (C) 2023 Infomaniak Network SA
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import InfomaniakCore
import XCTest

final class UTCollectionTests: XCTestCase {
func testSafeIndexSuccess() {
// GIVEN
let shortArray = [1]

// WHEN
let fetched = shortArray[safe: 0]

// THEN
XCTAssertEqual(fetched, 1)
}

func testSafeIndexNil() {
// GIVEN
let shortArray = [1]

// WHEN
let fetched = shortArray[safe: 1]

// THEN
XCTAssertNil(fetched)
}
}
56 changes: 56 additions & 0 deletions Tests/InfomaniakCoreTests/UTParallelTaskMapper.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Infomaniak Core - iOS
Copyright (C) 2023 Infomaniak Network SA
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import InfomaniakCore
import XCTest

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
final class UTParallelTaskMapper: XCTestCase {
func testAsyncMap() async {
// GIVEN
let taskMapper = ParallelTaskMapper()
let collectionToProcess = Array(0 ... 50)

// WHEN
do {
let result = try await taskMapper.map(collection: collectionToProcess) { item in
// Make the process take some short arbitrary time to complete
let randomShortTime = UInt64.random(in: 1 ... 100)
try await Task.sleep(nanoseconds: randomShortTime)

return item * 10
}

// THEN
// We check order is preserved
_ = result.reduce(-1) { partialResult, item in
guard let item = item else {
fatalError("Unexpected")
}
XCTAssertGreaterThan(item, partialResult)
return item
}

XCTAssertEqual(result.count, collectionToProcess.count)

} catch {
XCTFail("Unexpected")
return
}
}
}
Loading

0 comments on commit f39617b

Please sign in to comment.