Skip to content

Commit

Permalink
Aggregation function for Array.Parallel module (#14924)
Browse files Browse the repository at this point in the history
* Aggregation functions for Array.Parallel module
  • Loading branch information
T-Gro committed Apr 20, 2023
1 parent ed1c8a7 commit f69e2f5
Show file tree
Hide file tree
Showing 10 changed files with 576 additions and 34 deletions.
94 changes: 93 additions & 1 deletion src/FSharp.Core/array.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2081,7 +2081,7 @@ module Array =
let private maxPartitions = Environment.ProcessorCount // The maximum number of partitions to use
let private minChunkSize = 256 // The minimum size of a chunk to be sorted in parallel

let private createPartitionsUpTo maxIdxExclusive (array: 'T[]) =
let private createPartitionsUpToWithMinChunkSize maxIdxExclusive minChunkSize (array: 'T[]) =
[|
let chunkSize =
match maxIdxExclusive with
Expand All @@ -2098,6 +2098,98 @@ module Array =
yield new ArraySegment<'T>(array, offset, maxIdxExclusive - offset)
|]

let private createPartitionsUpTo maxIdxExclusive (array: 'T[]) =
createPartitionsUpToWithMinChunkSize maxIdxExclusive minChunkSize array

(* This function is there also as a support vehicle for other aggregations.
It is public in order to be called from inlined functions, the benefit of inlining call into it is significant *)
[<CompiledName("ReduceBy")>]
let reduceBy (projection: 'T -> 'U) (reduction: 'U -> 'U -> 'U) (array: 'T[]) =
checkNonNull "array" array

if array.Length = 0 then
invalidArg "array" LanguagePrimitives.ErrorStrings.InputArrayEmptyString

let chunks = createPartitionsUpToWithMinChunkSize array.Length 2 array // We need at least 2 elements/chunk for 'reduction'

let chunkResults =
Microsoft.FSharp.Primitives.Basics.Array.zeroCreateUnchecked chunks.Length

Parallel.For(
0,
chunks.Length,
fun chunkIdx ->
let chunk = chunks[chunkIdx]
let mutable res = projection array[chunk.Offset]
let lastIdx = chunk.Offset + chunk.Count - 1

for i = chunk.Offset + 1 to lastIdx do
let projected = projection array[i]
res <- reduction res projected

chunkResults[chunkIdx] <- res
)
|> ignore

let mutable finalResult = chunkResults[0]

for i = 1 to chunkResults.Length - 1 do
finalResult <- reduction finalResult chunkResults[i]

finalResult

[<CompiledName("Reduce")>]
let inline reduce ([<InlineIfLambda>] reduction) (array: _[]) =
array |> reduceBy id reduction

let inline vFst struct (a, _) =
a

let inline vSnd struct (_, b) =
b

[<CompiledName("MinBy")>]
let inline minBy ([<InlineIfLambda>] projection) (array: _[]) =

array
|> reduceBy (fun x -> struct (projection x, x)) (fun a b -> if vFst a < vFst b then a else b)
|> vSnd

[<CompiledName("Min")>]
let inline min (array: _[]) =
array |> reduce (fun a b -> if a < b then a else b)

[<CompiledName("SumBy")>]
let inline sumBy ([<InlineIfLambda>] projection: 'T -> ^U) (array: 'T[]) : ^U =
if array.Length = 0 then
LanguagePrimitives.GenericZero
else
array |> reduceBy projection Operators.Checked.(+)

[<CompiledName("Sum")>]
let inline sum (array: ^T[]) : ^T =
array |> sumBy id

[<CompiledName("MaxBy")>]
let inline maxBy projection (array: _[]) =

array
|> reduceBy (fun x -> struct (projection x, x)) (fun a b -> if vFst a > vFst b then a else b)
|> vSnd

[<CompiledName("Max")>]
let inline max (array: _[]) =
array |> reduce (fun a b -> if a > b then a else b)

[<CompiledName("AverageBy")>]
let inline averageBy ([<InlineIfLambda>] projection: 'T -> ^U) (array: 'T[]) : ^U =
let sum = array |> reduceBy projection Operators.Checked.(+)
LanguagePrimitives.DivideByInt sum (array.Length)

[<CompiledName("Average")>]
let inline average (array: 'T[]) =
array |> averageBy id

[<CompiledName("Zip")>]
let zip (array1: _[]) (array2: _[]) =
checkNonNull "array1" array1
Expand Down
Loading

0 comments on commit f69e2f5

Please sign in to comment.