Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove unsafe use of dictionary in multithreading #49

Merged
merged 18 commits into from
Jun 10, 2020
Merged
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ RecipesBase = "3cdcf5f2-1ef4-517c-9805-6587b60abb01"
ComputationalResources = "^0.3"
Distributions = "^0.22,^0.23"
MLJBase = "^0.12.2,^0.13"
MLJModelInterface = "^0.2,^0.3"
ProgressMeter = "^1.1"
MLJModelInterface = "^0.3"
ProgressMeter = "^1.3"
RecipesBase = "^0.8,^0,9,^1.0"
julia = "^1"

Expand Down
2 changes: 1 addition & 1 deletion src/MLJTuning.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export learning_curve!, learning_curve
import MLJBase
using MLJBase
import MLJBase: Bounded, Unbounded, DoublyUnbounded,
LeftUnbounded, RightUnbounded
LeftUnbounded, RightUnbounded, _process_accel_settings
using RecipesBase
using Distributed
import Distributions
Expand Down
126 changes: 57 additions & 69 deletions src/learning_curves.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ the (possibly nested) RNG field, and a vector `rngs` of RNG's, one for
each curve. Alternatively, set `rngs` to the number of curves desired,
in which case RNG's are automatically generated. The individual curve
computations can be distributed across multiple processes using
`acceleration=CPUProcesses()`. See the second example below for a
`acceleration=CPUProcesses()` or `acceleration=CPUThreads()`. See the second example below for a
demonstration.

```julia
Expand Down Expand Up @@ -107,6 +107,7 @@ function learning_curve(model::Supervised, args...;
"`AbstractVector{<:AbstractRNG}`. ")
end
end
_acceleration = _process_accel_settings(acceleration)
if (acceleration isa CPUProcesses &&
acceleration_grid isa CPUProcesses)
message =
Expand All @@ -127,7 +128,7 @@ function learning_curve(model::Supervised, args...;
" `acceleration_grid = CPUThreads()`."
@warn message
end

tuned_model = TunedModel(model=model,
range=range,
tuning=Grid(resolution=resolution,
Expand All @@ -141,10 +142,8 @@ function learning_curve(model::Supervised, args...;
acceleration=acceleration_grid)

tuned = machine(tuned_model, args...)
## One tuned_mach per thread
tuned_machs = Dict(1 => tuned)

results = _tuning_results(rngs, acceleration, tuned_machs, rng_name, verbosity)
results = _tuning_results(rngs, _acceleration, tuned, rng_name, verbosity)

parameter_name=results.parameter_names[1]
parameter_scale=results.parameter_scales[1]
Expand All @@ -163,12 +162,12 @@ _collate(plotting1, plotting2) =
plotting2.measurements),))

# fallback:
_tuning_results(rngs, acceleration, tuned_machs, rngs_name, verbosity) =
error("acceleration=$acceleration unsupported. ")
#_tuning_results(rngs, acceleration, tuned, rngs_name, verbosity) =
# error("acceleration=$acceleration unsupported. ")

# single curve:
_tuning_results(rngs::Nothing, acceleration, tuned_machs, rngs_name, verbosity) =
_single_curve(tuned_machs[1], verbosity)
_tuning_results(rngs::Nothing, acceleration, tuned, rngs_name, verbosity) =
_single_curve(tuned, verbosity)

function _single_curve(tuned, verbosity)
fit!(tuned, verbosity=verbosity, force=true)
Expand All @@ -177,9 +176,8 @@ end

# CPU1:
function _tuning_results(rngs::AbstractVector, acceleration::CPU1,
tuned_machs, rng_name, verbosity)
tuned, rng_name, verbosity)
local ret
tuned = tuned_machs[1]
old_rng = recursive_getproperty(tuned.model.model, rng_name)
n_rngs = length(rngs)
verbosity < 1 || begin
Expand All @@ -191,7 +189,7 @@ function _tuning_results(rngs::AbstractVector, acceleration::CPU1,
color = :yellow)
update!(p,0)
end
@sync begin

ret = mapreduce(_collate, rngs) do rng
recursive_setproperty!(tuned.model.model, rng_name, rng)
fit!(tuned, verbosity=verbosity-1, force=true)
Expand All @@ -202,17 +200,15 @@ function _tuning_results(rngs::AbstractVector, acceleration::CPU1,
end
r
end
end
recursive_setproperty!(tuned.model.model, rng_name, old_rng)

return ret
end

# CPUProcesses:
function _tuning_results(rngs::AbstractVector, acceleration::CPUProcesses,
tuned_machs, rng_name, verbosity)
tuned, rng_name, verbosity)

tuned = tuned_machs[1]
old_rng = recursive_getproperty(tuned.model.model, rng_name)
n_rngs = length(rngs)
channel = RemoteChannel(()->Channel{Bool}(min(1000, n_rngs)), 1)
Expand All @@ -225,21 +221,20 @@ function _tuning_results(rngs::AbstractVector, acceleration::CPUProcesses,
barlen = 18,
color = :yellow))
# printing the progress bar
verbosity < 1 || @async begin
verbosity < 1 || @async begin
update!(p,0)
while take!(channel)
p.counter +=1
ProgressMeter.updateProgress!(p)
end
close(channel)
end
@sync begin
ret = @distributed (_collate) for rng in rngs
recursive_setproperty!(tuned.model.model, rng_name, rng)
fit!(tuned, verbosity=verbosity-1, force=true)
r=tuned.report.plotting
verbosity < 1 || begin
put!(channel, true)
end
verbosity < 1 || put!(channel, true)
r
end
end
Expand All @@ -253,16 +248,17 @@ end
# CPUThreads:
@static if VERSION >= v"1.3.0-DEV.573"
function _tuning_results(rngs::AbstractVector, acceleration::CPUThreads,
tuned_machs, rng_name, verbosity)
tuned, rng_name, verbosity)

n_threads = Threads.nthreads()
if n_threads == 1
return _tuning_results(rngs, CPU1(),
tuned_machs, rng_name, verbosity)
tuned, rng_name, verbosity)
end

n_rngs = length(rngs)
old_rng = recursive_getproperty(tuned_machs[1].model.model, rng_name)
ntasks = acceleration.settings
partitions = MLJBase.chunks(1:n_rngs, ntasks)
verbosity < 1 || begin
p = Progress(n_rngs,
dt = 0,
Expand All @@ -271,58 +267,50 @@ function _tuning_results(rngs::AbstractVector, acceleration::CPUThreads,
barlen = 18,
color = :yellow)
update!(p,0)
ch = Channel{Bool}(length(partitions))
end
loc = ReentrantLock()

results = Vector{Any}(undef, n_rngs) ##since we use Grid for now
partitions = Iterators.partition(1:n_rngs, cld(n_rngs, n_threads))
local ret


@sync begin

@sync for rng_part in partitions
Threads.@spawn begin

foreach(rng_part) do k
id = Threads.threadid()
if !haskey(tuned_machs, id)
## deepcopy of model is because other threads can still change the state
## of tuned_machs[id].model.model
tuned_machs[id] =
machine(TunedModel(model = deepcopy(tuned_machs[1].model.model),
range=tuned_machs[1].model.range,
tuning=tuned_machs[1].model.tuning,
resampling=tuned_machs[1].model.resampling,
operation=tuned_machs[1].model.operation,
measure=tuned_machs[1].model.measure,
train_best=tuned_machs[1].model.train_best,
weights=tuned_machs[1].model.weights,
repeats=tuned_machs[1].model.repeats,
acceleration=tuned_machs[1].model.acceleration),
tuned_machs[1].args...)
end
recursive_setproperty!(tuned_machs[id].model.model, rng_name, rngs[k])
fit!(tuned_machs[id], verbosity=verbosity-1, force=true)
verbosity < 1 || begin
lock(loc)do
p.counter +=1
tasks = Vector{Task}(undef, length(partitions))

@sync begin
verbosity < 1 || @async begin
while take!(ch)
p.counter +=1
ProgressMeter.updateProgress!(p)
end
end

results[k] = tuned_machs[id].report.plotting
end

end
close(ch)
end

@sync for (i,rng_part) in enumerate(partitions)
tasks[i] = Threads.@spawn begin
# One t_tuned per task
## deepcopy of model is because other threads can still change the state
## of tuned.model.model
t_tuned =
machine(TunedModel(model = deepcopy(tuned.model.model),
range=tuned.model.range,
tuning=tuned.model.tuning,
resampling=tuned.model.resampling,
operation=tuned.model.operation,
measure=tuned.model.measure,
train_best=tuned.model.train_best,
weights=tuned.model.weights,
repeats=tuned.model.repeats,
acceleration=tuned.model.acceleration),
tuned.args...)
mapreduce(_collate, rng_part) do k
recursive_setproperty!(t_tuned.model.model, rng_name, rngs[k])
fit!(t_tuned, verbosity=verbosity-1, force=true)
verbosity < 1 || put!(ch, true)
t_tuned.report.plotting
end

end
end
ret = reduce(_collate, results)
recursive_setproperty!(tuned_machs[1].model.model, rng_name, old_rng)
end
end
verbosity < 1 || put!(ch, false)
end

ret = reduce(_collate, fetch.(tasks))
return ret


end

end