forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
topology.jl
85 lines (71 loc) · 2.29 KB
/
topology.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# This file is a part of Julia. License is MIT: http://julialang.org/license
include("testdefs.jl")
addprocs(4; topology="master_slave")
@test_throws RemoteException remotecall_fetch(2, ()->remotecall_fetch(3,myid))
function test_worker_counts()
# check if the nprocs/nworkers/workers are the same on the remaining workers
np=nprocs()
nw=nworkers()
ws=sort(workers())
for p in workers()
@test (true, true, true) == remotecall_fetch(p, (x,y,z)->(x==nprocs(), y==nworkers(), z==sort(workers())), np, nw, ws)
end
end
function remove_workers_and_test()
while nworkers() > 0
@test :ok == rmprocs(workers()[1]; waitfor=2.0)
test_worker_counts()
if nworkers() == nprocs()
break
end
end
end
remove_workers_and_test()
# connect even pids to other even pids, odd to odd.
type TopoTestManager <: ClusterManager
np::Integer
end
function Base.launch(manager::TopoTestManager, params::Dict, launched::Array, c::Condition)
dir = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
for i in 1:manager.np
io, pobj = open(detach(
setenv(`$(Base.julia_cmd(exename)) $exeflags --bind-to $(Base.LPROC.bind_addr) --worker`, dir=dir)), "r")
wconfig = WorkerConfig()
wconfig.process = pobj
wconfig.io = io
wconfig.ident = i
wconfig.connect_idents = collect(i+2:2:manager.np)
push!(launched, wconfig)
end
notify(c)
end
const map_pid_ident=Dict()
function Base.manage(manager::TopoTestManager, id::Integer, config::WorkerConfig, op::Symbol)
if op == :register
map_pid_ident[id] = get(config.ident)
elseif op == :interrupt
kill(get(config.process), 2)
end
end
addprocs(TopoTestManager(8); topology="custom")
while true
if any(x->get(map_pid_ident, x, 0)==0, workers())
yield()
else
break
end
end
for p1 in workers()
for p2 in workers()
i1 = map_pid_ident[p1]
i2 = map_pid_ident[p2]
if (iseven(i1) && iseven(i2)) || (isodd(i1) && isodd(i2))
@test p2 == remotecall_fetch(p1, p->remotecall_fetch(p,myid), p2)
else
@test_throws RemoteException remotecall_fetch(p1, p->remotecall_fetch(p,myid), p2)
end
end
end
remove_workers_and_test()