Skip to content

Commit

Permalink
Merge 638d285 into 725a755
Browse files Browse the repository at this point in the history
  • Loading branch information
OkonSamuel committed Jun 11, 2020
2 parents 725a755 + 638d285 commit bad48fb
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 199 deletions.
11 changes: 7 additions & 4 deletions .travis.yml
Expand Up @@ -3,18 +3,21 @@ language: julia
os:
- linux
env:
- JULIA_NUM_THREADS=30
- JULIA_NUM_THREADS=3
julia:
- 1.0
- 1.1
- 1.2
- 1.3
#- 1.1
#- 1.2
#- 1.3
- 1.4
- nightly
matrix:
allow_failures:
- julia: nightly
notifications:
email: false
git:
depth: 9999999
after_success:
- julia -e 'import Pkg; Pkg.add("Coverage"); using Coverage; Coveralls.submit(process_folder())'

Expand Down
4 changes: 2 additions & 2 deletions Project.toml
Expand Up @@ -17,8 +17,8 @@ RecipesBase = "3cdcf5f2-1ef4-517c-9805-6587b60abb01"
ComputationalResources = "^0.3"
Distributions = "^0.22,^0.23"
MLJBase = "^0.12.2,^0.13"
MLJModelInterface = "^0.2,^0.3"
ProgressMeter = "^1.1"
MLJModelInterface = "^0.3"
ProgressMeter = "^1.3"
RecipesBase = "^0.8,^0,9,^1.0"
julia = "^1"

Expand Down
2 changes: 1 addition & 1 deletion src/MLJTuning.jl
Expand Up @@ -18,7 +18,7 @@ export learning_curve!, learning_curve
import MLJBase
using MLJBase
import MLJBase: Bounded, Unbounded, DoublyUnbounded,
LeftUnbounded, RightUnbounded
LeftUnbounded, RightUnbounded, _process_accel_settings, chunks
using RecipesBase
using Distributed
import Distributions
Expand Down
170 changes: 80 additions & 90 deletions src/learning_curves.jl
Expand Up @@ -25,7 +25,7 @@ the (possibly nested) RNG field, and a vector `rngs` of RNG's, one for
each curve. Alternatively, set `rngs` to the number of curves desired,
in which case RNG's are automatically generated. The individual curve
computations can be distributed across multiple processes using
`acceleration=CPUProcesses()`. See the second example below for a
`acceleration=CPUProcesses()` or `acceleration=CPUThreads()`. See the second example below for a
demonstration.
```julia
Expand Down Expand Up @@ -107,6 +107,7 @@ function learning_curve(model::Supervised, args...;
"`AbstractVector{<:AbstractRNG}`. ")
end
end

if (acceleration isa CPUProcesses &&
acceleration_grid isa CPUProcesses)
message =
Expand All @@ -121,13 +122,15 @@ function learning_curve(model::Supervised, args...;
acceleration_grid isa CPUProcesses)
message =
"The combination acceleration=$(acceleration) and"*
" acceleration_grid=$(acceleration_grid) is"*
" not generally optimal. You may want to consider setting"*
" acceleration_grid=$(acceleration_grid) isn't supported. \n"*
"Resetting to"*
" `acceleration = CPUProcesses()` and"*
" `acceleration_grid = CPUThreads()`."
@warn message
acceleration = CPUProcesses()
acceleration_grid = CPUThreads()
end

_acceleration = _process_accel_settings(acceleration)
tuned_model = TunedModel(model=model,
range=range,
tuning=Grid(resolution=resolution,
Expand All @@ -141,10 +144,8 @@ function learning_curve(model::Supervised, args...;
acceleration=acceleration_grid)

tuned = machine(tuned_model, args...)
## One tuned_mach per thread
tuned_machs = Dict(1 => tuned)

results = _tuning_results(rngs, acceleration, tuned_machs, rng_name, verbosity)
results = _tuning_results(rngs, _acceleration, tuned, rng_name, verbosity)

parameter_name=results.parameter_names[1]
parameter_scale=results.parameter_scales[1]
Expand All @@ -163,12 +164,12 @@ _collate(plotting1, plotting2) =
plotting2.measurements),))

# fallback:
_tuning_results(rngs, acceleration, tuned_machs, rngs_name, verbosity) =
error("acceleration=$acceleration unsupported. ")
#_tuning_results(rngs, acceleration, tuned, rngs_name, verbosity) =
# error("acceleration=$acceleration unsupported. ")

# single curve:
_tuning_results(rngs::Nothing, acceleration, tuned_machs, rngs_name, verbosity) =
_single_curve(tuned_machs[1], verbosity)
_tuning_results(rngs::Nothing, acceleration, tuned, rngs_name, verbosity) =
_single_curve(tuned, verbosity)

function _single_curve(tuned, verbosity)
fit!(tuned, verbosity=verbosity, force=true)
Expand All @@ -177,9 +178,8 @@ end

# CPU1:
function _tuning_results(rngs::AbstractVector, acceleration::CPU1,
tuned_machs, rng_name, verbosity)
tuned, rng_name, verbosity)
local ret
tuned = tuned_machs[1]
old_rng = recursive_getproperty(tuned.model.model, rng_name)
n_rngs = length(rngs)
verbosity < 1 || begin
Expand All @@ -191,7 +191,7 @@ function _tuning_results(rngs::AbstractVector, acceleration::CPU1,
color = :yellow)
update!(p,0)
end
@sync begin

ret = mapreduce(_collate, rngs) do rng
recursive_setproperty!(tuned.model.model, rng_name, rng)
fit!(tuned, verbosity=verbosity-1, force=true)
Expand All @@ -202,67 +202,65 @@ function _tuning_results(rngs::AbstractVector, acceleration::CPU1,
end
r
end
end
recursive_setproperty!(tuned.model.model, rng_name, old_rng)

return ret
end

# CPUProcesses:
function _tuning_results(rngs::AbstractVector, acceleration::CPUProcesses,
tuned_machs, rng_name, verbosity)
tuned, rng_name, verbosity)

tuned = tuned_machs[1]
old_rng = recursive_getproperty(tuned.model.model, rng_name)
n_rngs = length(rngs)
channel = RemoteChannel(()->Channel{Bool}(min(1000, n_rngs)), 1)

local ret
@sync begin
verbosity < 1 || (p = Progress(n_rngs,
verbosity < 1 || begin
p = Progress(n_rngs,
dt = 0,
desc = "Evaluating Learning curve with $(n_rngs) rngs: ",
barglyphs = BarGlyphs("[=> ]"),
barlen = 18,
color = :yellow))
# printing the progress bar
verbosity < 1 || @async begin
update!(p,0)
while take!(channel)
color = :yellow)
channel = RemoteChannel(()->Channel{Bool}(min(1000, n_rngs)), 1)
end
# printing the progress bar
verbosity < 1 || @async begin
update!(p,0)
while take!(channel)
p.counter +=1
ProgressMeter.updateProgress!(p)
end
end
@sync begin
end
end

ret = @distributed (_collate) for rng in rngs
recursive_setproperty!(tuned.model.model, rng_name, rng)
fit!(tuned, verbosity=verbosity-1, force=true)
r=tuned.report.plotting
verbosity < 1 || begin
put!(channel, true)
end
r
end
end
recursive_setproperty!(tuned.model.model, rng_name, old_rng)
verbosity < 1 || put!(channel, false)
end

recursive_setproperty!(tuned.model.model, rng_name, rng)
fit!(tuned, verbosity=verbosity-1, force=true)
r=tuned.report.plotting
verbosity < 1 || put!(channel, true)
r
end
recursive_setproperty!(tuned.model.model, rng_name, old_rng)
verbosity < 1 || put!(channel, false)
end
return ret
end

# CPUThreads:
@static if VERSION >= v"1.3.0-DEV.573"
function _tuning_results(rngs::AbstractVector, acceleration::CPUThreads,
tuned_machs, rng_name, verbosity)
tuned, rng_name, verbosity)

n_threads = Threads.nthreads()
if n_threads == 1
return _tuning_results(rngs, CPU1(),
tuned_machs, rng_name, verbosity)
tuned, rng_name, verbosity)
end

old_rng = recursive_getproperty(tuned.model.model, rng_name)
n_rngs = length(rngs)
old_rng = recursive_getproperty(tuned_machs[1].model.model, rng_name)
ntasks = acceleration.settings
partitions = MLJBase.chunks(1:n_rngs, ntasks)
verbosity < 1 || begin
p = Progress(n_rngs,
dt = 0,
Expand All @@ -271,58 +269,50 @@ function _tuning_results(rngs::AbstractVector, acceleration::CPUThreads,
barlen = 18,
color = :yellow)
update!(p,0)
ch = Channel{Bool}(length(partitions))
end
loc = ReentrantLock()

results = Vector{Any}(undef, n_rngs) ##since we use Grid for now
partitions = Iterators.partition(1:n_rngs, cld(n_rngs, n_threads))
local ret


@sync begin

@sync for rng_part in partitions
Threads.@spawn begin

foreach(rng_part) do k
id = Threads.threadid()
if !haskey(tuned_machs, id)
## deepcopy of model is because other threads can still change the state
## of tuned_machs[id].model.model
tuned_machs[id] =
machine(TunedModel(model = deepcopy(tuned_machs[1].model.model),
range=tuned_machs[1].model.range,
tuning=tuned_machs[1].model.tuning,
resampling=tuned_machs[1].model.resampling,
operation=tuned_machs[1].model.operation,
measure=tuned_machs[1].model.measure,
train_best=tuned_machs[1].model.train_best,
weights=tuned_machs[1].model.weights,
repeats=tuned_machs[1].model.repeats,
acceleration=tuned_machs[1].model.acceleration),
tuned_machs[1].args...)
end
recursive_setproperty!(tuned_machs[id].model.model, rng_name, rngs[k])
fit!(tuned_machs[id], verbosity=verbosity-1, force=true)
verbosity < 1 || begin
lock(loc)do
p.counter +=1
tasks = Vector{Task}(undef, length(partitions))

@sync begin
verbosity < 1 || @async begin
while take!(ch)
p.counter +=1
ProgressMeter.updateProgress!(p)
end
end

results[k] = tuned_machs[id].report.plotting
end

end
end
# One t_tuned per task
## deepcopy of model is because other threads can still change the state
## of tuned.model.model
tmachs = [tuned, [machine(TunedModel(model = deepcopy(tuned.model.model),
range=tuned.model.range,
tuning=tuned.model.tuning,
resampling=tuned.model.resampling,
operation=tuned.model.operation,
measure=tuned.model.measure,
train_best=tuned.model.train_best,
weights=tuned.model.weights,
repeats=tuned.model.repeats,
acceleration=tuned.model.acceleration),
tuned.args...) for _ in 2:length(partitions)]...]
@sync for (i,rng_part) in enumerate(partitions)
tasks[i] = Threads.@spawn begin
mapreduce(_collate, rng_part) do k
recursive_setproperty!(tmachs[i].model.model, rng_name, rngs[k])
fit!(tmachs[i], verbosity=verbosity-1, force=true)
verbosity < 1 || put!(ch, true)
tmachs[i].report.plotting
end

end
end
ret = reduce(_collate, results)
recursive_setproperty!(tuned_machs[1].model.model, rng_name, old_rng)
end
verbosity < 1 || put!(ch, false)
end

ret = reduce(_collate, fetch.(tasks))
recursive_setproperty!(tuned.model.model, rng_name, old_rng)
return ret


end

end


0 comments on commit bad48fb

Please sign in to comment.