-
Notifications
You must be signed in to change notification settings - Fork 1
/
parallel_map.gleam
104 lines (94 loc) · 3.18 KB
/
parallel_map.gleam
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import gleam/erlang/atom
import gleam/erlang/process
import gleam/iterator.{type Iterator}
import gleam/list
import parallel_map/internal/task_repeater
/// This type is used to specify the number of workers to spawn
pub type WorkerAmount {
/// will spawn that number of workers
WorkerAmount(value: Int)
/// will spawn workers to match the amount of
/// schedulers currently online
MatchSchedulersOnline
}
fn worker_amount_to_int(worker_amount: WorkerAmount) -> Int {
case worker_amount {
WorkerAmount(amount) -> amount
MatchSchedulersOnline ->
do_erlang_system_info(atom.create_from_string("schedulers_online"))
}
}
@external(erlang, "erlang", "system_info")
fn do_erlang_system_info(item: atom.Atom) -> Int
/// This function behaves similarly to gleam_stdlib's iterator.map
///
/// Creates an iterator from an existing iterator and a transformation function
///
/// Each element in the new iterator will be the result of calling the given
/// function on the elements in the given iterator, with the resulting value
/// wrapped in a Result
///
/// If the timeout specified is exceeded while attempting
/// to collect the result of the computation, the element will instead be Error(Nil)
///
/// This function also differs from iterator.map in that it will spawn the workers
/// and perform the computation right when it is called,
/// but it does not attempt to collect the result until the iterator is later run
pub fn iterator_pmap(
input: Iterator(a),
mapping_func: fn(a) -> b,
num_workers: WorkerAmount,
timeout_milliseconds: Int,
) -> Iterator(Result(b, Nil)) {
let worker_amount = case worker_amount_to_int(num_workers) {
x if x > 0 -> x
_ -> panic as "number of workers must be greater than 0"
}
let #(worker_list, subject_list) =
iterator.repeatedly(fn() {
let new_subject = process.new_subject()
let assert Ok(worker_subject) =
task_repeater.new(new_subject, mapping_func)
#(worker_subject, new_subject)
})
|> iterator.take(worker_amount)
|> iterator.to_list()
|> list.unzip
let worker_iterator =
worker_list
|> iterator.from_list
|> iterator.cycle
let subject_iterator =
subject_list
|> iterator.from_list
|> iterator.cycle
let output_length =
iterator.zip(input, worker_iterator)
|> iterator.map(fn(x) {
let #(input_value, worker) = x
task_repeater.call(worker, input_value)
})
|> iterator.length()
subject_iterator
|> iterator.take(output_length)
|> iterator.map(process.receive(_, timeout_milliseconds))
}
/// This function behaves similarly to gleam_stdlib's iterator.map
///
/// Returns a new list containing only the elements of the first list
/// after the function has been applied to each one, with the resulting value
/// wrapped in a Result
///
/// If the timeout specified is exceeded while attempting
/// to collect the result of the computation, the value will instead be Error(Nil)
pub fn list_pmap(
input: List(a),
mapping_func: fn(a) -> b,
num_workers: WorkerAmount,
timeout_milliseconds: Int,
) -> List(Result(b, Nil)) {
input
|> iterator.from_list
|> iterator_pmap(mapping_func, num_workers, timeout_milliseconds)
|> iterator.to_list
}