-
-
Notifications
You must be signed in to change notification settings - Fork 74
/
elastic.jl
156 lines (126 loc) · 4.68 KB
/
elastic.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# The master process listens on a well-known port
# Launched workers connect to the master and redirect their STDOUTs to the same
# Workers can join and leave the cluster on demand.
export ElasticManager, elastic_worker
const HDR_COOKIE_LEN = Distributed.HDR_COOKIE_LEN
struct ElasticManager <: ClusterManager
active::Dict{Int, WorkerConfig} # active workers
pending::Channel{TCPSocket} # to be added workers
terminated::Set{Int} # terminated worker ids
topology::Symbol
sockname
printing_kwargs
function ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=())
Distributed.init_multi()
cookie !== nothing && cluster_cookie(cookie)
# Automatically check for the IP address of the local machine
if addr == :auto
try
addr = Sockets.getipaddr(IPv4)
catch
error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.")
end
end
l_sock = listen(addr, port)
lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), printing_kwargs)
@async begin
while true
let s = accept(l_sock)
@async process_worker_conn(lman, s)
end
end
end
@async process_pending_connections(lman)
lman
end
end
ElasticManager(port) = ElasticManager(;port=port)
ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port)
ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie)
function process_worker_conn(mgr::ElasticManager, s::TCPSocket)
# Socket is the worker's STDOUT
wc = WorkerConfig()
wc.io = s
# Validate cookie
cookie = read(s, HDR_COOKIE_LEN)
if length(cookie) < HDR_COOKIE_LEN
error("Cookie read failed. Connection closed by peer.")
end
self_cookie = cluster_cookie()
for i in 1:HDR_COOKIE_LEN
if UInt8(self_cookie[i]) != cookie[i]
println(i, " ", self_cookie[i], " ", cookie[i])
error("Invalid cookie sent by remote worker.")
end
end
put!(mgr.pending, s)
end
function process_pending_connections(mgr::ElasticManager)
while true
wait(mgr.pending)
try
addprocs(mgr; topology=mgr.topology)
catch e
showerror(stderr, e)
Base.show_backtrace(stderr, Base.catch_backtrace())
end
end
end
function launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition)
# The workers have already been started.
while isready(mgr.pending)
wc=WorkerConfig()
wc.io = take!(mgr.pending)
push!(launched, wc)
end
notify(c)
end
function manage(mgr::ElasticManager, id::Integer, config::WorkerConfig, op::Symbol)
if op == :register
mgr.active[id] = config
elseif op == :deregister
delete!(mgr.active, id)
push!(mgr.terminated, id)
end
end
function Base.show(io::IO, mgr::ElasticManager)
iob = IOBuffer()
println(iob, "ElasticManager:")
print(iob, " Active workers : [ ")
for id in sort(collect(keys(mgr.active)))
print(iob, id, ",")
end
seek(iob, position(iob)-1)
println(iob, "]")
println(iob, " Number of workers to be added : ", Base.n_avail(mgr.pending))
print(iob, " Terminated workers : [ ")
for id in sort(collect(mgr.terminated))
print(iob, id, ",")
end
seek(iob, position(iob)-1)
println(iob, "]")
println(iob, " Worker connect command : ")
print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...))
print(io, String(take!(iob)))
end
# Does not return. If executing from a REPL try
# @async connect_to_cluster(.....)
# addr, port that a ElasticManager on the master processes is listening on.
function elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=true)
c = connect(addr, port)
write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN])
stdout_to_master && redirect_stdout(c)
start_worker(c, cookie)
end
function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true)
ip = string(em.sockname[1])
port = convert(Int,em.sockname[2])
cookie = cluster_cookie()
exename = absolute_exename ? joinpath(Sys.BINDIR, Base.julia_exename()) : "julia"
project = same_project ? ("--project=$(Pkg.API.Context().env.project_file)",) : ()
join([
exename,
project...,
"-e 'using ClusterManagers; ClusterManagers.elastic_worker(\"$cookie\",\"$ip\",$port)'"
]," ")
end