/
dutils.jl
245 lines (148 loc) · 6.25 KB
/
dutils.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# This file contains utility functions to deal with distributed
# computing
"""
update_best(channel::RemoteChannel, bestx::SharedArray{Float64, 1})
Listen to a `channel` for results found by lmlovo. If there is an
improvement for the objective function, the shared array `bestx` is
updated.
**Attention**: There might be an unstable state if there is a process
reading `bestx` while this function is updating it. This should not
be a problem, since it is used as a starting point.
**Attention 2**: this function is currently out of use.
"""
function update_best(channel::RemoteChannel, bestx::SharedArray{Float64, 1})
@debug("Running updater.")
n = length(bestx)
N::Int = 0
while isopen(channel)
θ = try
take!(channel)
catch e
if isa(e, InvalidStateException)
break
end
@warn("Something wrong when reading from channel. Will skip.", e)
continue
end
@debug("Updater has read values from channel.")
for i = 1:n
bestx[i] = (N * bestx[i] + θ[i]) / (N + 1)
end
N += 1
end
@debug("Update channel closed. Exiting thread.")
end
"""
function consume_tqueue(bqueue::RemoteChannel, tqueue::RemoteChannel,
squeue::RemoteChannel, model::Function, gmodel!::Function,
data::Array{Float64, 2}, n::Int, pliminf::Int,
plimsup::Int, MAXMS::Int, seedMS::MersenneTwister)
This function represents one worker, which runs lmlovo in a multistart
fashion.
It takes a job from the RemoteChannel `tqueue` and runs `lmlovo`
function to it. It might run using a multistart strategy, if
`MAXMS>1`. It sends the best results found for each value obtained in
`tqueue` to channel `squeue`, which will be consumed by the main
process. All the other arguments are the same for [`praff`](@ref)
function.
"""
function consume_tqueue(bqueue::RemoteChannel, tqueue::RemoteChannel,
squeue::RemoteChannel,
model::Function, gmodel!::Function,
data::Array{Float64, 2}, n::Int, pliminf::Int,
plimsup::Int, MAXMS::Int,
seedMS::MersenneTwister, initguess::Vector{Float64})
@debug("Started worker $(myid())")
while isopen(tqueue)
p = try
take!(tqueue)
catch e
if isa(e, InvalidStateException)
break
end
@warn("Something wrong when reading task. Will skip task.", e)
continue
end
@debug("Received task $(p)")
if (p.start < pliminf) || (p.stop > plimsup) ||
(length(p) == 0)
@warn("Invalid value for task: $(p). Will skip task.")
continue
end
for k in p
wbest = RAFFOutput(k)
nf = 0
nj = 0
ni = 0
# Multi-start strategy
for j = 1:MAXMS
# New random starting point
θ = randn(seedMS, n)
θ .= θ .+ initguess
# Call function and store results
rout = lmlovo(model, gmodel!, θ, data, n, k)
nf += rout.nf
nj += rout.nj
ni += rout.iter
(rout.status == 1) && (rout.f < wbest.f) && (wbest = rout)
# This block is related to a strategy of smart
# starting points for the multistart
# process. Currently, it makes no sense to use it.
# if rout.f < wbest.f
# # Send asynchronously the result to channel if success
# if rout.status == 1
# @async try
# put!(bqueue, rout.solution)
# @debug("Added new point to queue.", rout.solution, rout.f)
# catch e
# @warn(string("Problems when saving best point found in queue. ",
# "Will skip this step"), e)
# end
# end
# end
end
@debug("Finished. p = $(k) and f = $(wbest.f).")
try
wbest = RAFFOutput(wbest.status, wbest.solution, ni, wbest.p, wbest.f, nf, nj,
wbest.outliers)
put!(squeue, wbest)
catch e
if isa(e, InvalidStateException)
@warn("Solution queue prematurely closed. Unable to save solution for p=$(k).")
return
end
@warn("Something wrong when sending the solution to queue for p=$(k).", e)
end
end
end
end
"""
check_and_close(bqueue::RemoteChannel, tqueue::RemoteChannel,
squeue::RemoteChannel, futures::Vector{Future};
secs::Float64=0.1)
Check if there is at least one worker process in the vector of
`futures` that has not prematurely finished. If there is no alive
worker, close task, solution and best queues, `tqueue`, `squeue` and
`bqueue`, respectively.
"""
function check_and_close(bqueue::RemoteChannel, tqueue::RemoteChannel,
squeue::RemoteChannel, futures::Vector{Future};
secs::Float64=0.1)
n_alive = length(futures)
@debug("Checking worker status.")
for (i, f) in enumerate(futures)
if timedwait(()->isready(f), secs) == :ok
@warn("Worker $(i) seems to have finished prematurely.",
fetch(f))
n_alive -= 1
end
end
@debug("Workers online: $(n_alive)")
# Only closes all queues if there are tasks to be completed
if n_alive == 0 && isopen(tqueue)
@warn("No live worker found. Will close queues and finish.")
close(bqueue)
close(tqueue)
close(squeue)
end
end