-
Notifications
You must be signed in to change notification settings - Fork 4
/
clusters.py
229 lines (174 loc) · 6.07 KB
/
clusters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from dask.distributed import Client, LocalCluster
from dask_jobqueue import LSFCluster
from dask_jobqueue.lsf import LSFJob
import dask.config
from pathlib import Path
import os
import sys
import time
import yaml
# a class to help exit gracefully on our cluster
class janelia_LSFJob(LSFJob):
cancel_command = "bkill -d"
class janelia_LSFCluster(LSFCluster):
job_cls = janelia_LSFJob
class _cluster(object):
def __init__(self):
self.client = None
self.cluster = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if not self.persist_yaml:
if os.path.exists(self.yaml_path):
os.remove(self.yaml_path)
self.client.close()
self.cluster.__exit__(exc_type, exc_value, traceback)
def set_client(self, client):
self.client = client
def set_cluster(self, cluster):
self.cluster = cluster
def modify_dask_config(
self, options, yaml_name='ClusterWrap.yaml', persist_yaml=False,
):
dask.config.set(options)
yaml_path = str(Path.home()) + '/.config/dask/' + yaml_name
with open(yaml_path, 'w') as f:
yaml.dump(dask.config.config, f, default_flow_style=False)
self.yaml_path = yaml_path
self.persist_yaml = persist_yaml
def get_dashboard(self):
if self.cluster is not None:
return self.cluster.dashboard_link
# TODO: not needed?
# def adapt_cluster(self, min_workers=None, max_workers=None):
# None
class janelia_lsf_cluster(_cluster):
HOURLY_RATE_PER_CORE = 0.07
def __init__(
self,
ncpus=4,
processes=1,
threads=None,
min_workers=1,
max_workers=4,
walltime="3:59",
config={},
**kwargs
):
# call super constructor
super().__init__()
# set config defaults
# comm.timeouts values are needed for scaling up big clusters
config_defaults = {
'distributed.comm.timeouts.connect':'180s',
'distributed.comm.timeouts.tcp':'360s',
}
config_defaults = {**config_defaults, **config}
self.modify_dask_config(config_defaults)
# store ncpus/per worker and worker limits
self.adapt = None
self.ncpus = ncpus
self.min_workers = min_workers
self.max_workers = max_workers
# set environment vars
# prevent overthreading outside dask
tpw = 2*ncpus # threads per worker
job_script_prologue = [
f"export MKL_NUM_THREADS={tpw}",
f"export NUM_MKL_THREADS={tpw}",
f"export OPENBLAS_NUM_THREADS={tpw}",
f"export OPENMP_NUM_THREADS={tpw}",
f"export OMP_NUM_THREADS={tpw}",
]
# set local and log directories
USER = os.environ["USER"]
CWD = os.getcwd()
PID = os.getpid()
if "local_directory" not in kwargs:
kwargs["local_directory"] = f"/scratch/{USER}/"
if "log_directory" not in kwargs:
log_dir = f"{CWD}/dask_worker_logs_{PID}/"
Path(log_dir).mkdir(parents=False, exist_ok=True)
kwargs["log_directory"] = log_dir
# compute ncpus/RAM relationship
memory = str(15*ncpus)+'GB'
mem = int(15e9*ncpus)
# determine nthreads
if threads is None:
threads = ncpus
# create cluster
cluster = janelia_LSFCluster(
ncpus=ncpus,
processes=processes,
memory=memory,
mem=mem,
walltime=walltime,
cores=threads,
job_script_prologue=job_script_prologue,
**kwargs,
)
# connect cluster to client
client = Client(cluster)
self.set_cluster(cluster)
self.set_client(client)
print("Cluster dashboard link: ", cluster.dashboard_link)
sys.stdout.flush()
# set adaptive cluster bounds
self.adapt_cluster(min_workers, max_workers)
def __exit__(self, exc_type, exc_value, traceback):
super().__exit__(exc_type, exc_value, traceback)
def adapt_cluster(self, min_workers=None, max_workers=None):
# store limits
if min_workers is not None:
self.min_workers = min_workers
if max_workers is not None:
self.max_workers = max_workers
self.adapt = self.cluster.adapt(
minimum_jobs=self.min_workers,
maximum_jobs=self.max_workers,
)
# give feedback to user
mn, mx, nc = self.min_workers, self.max_workers, self.ncpus # shorthand
cost = round(mx * nc * self.HOURLY_RATE_PER_CORE, 2)
print(f"Cluster adapting between {mn} and {mx} workers with {nc} cores per worker")
print(f"*** This cluster has an upper bound cost of {cost} dollars per hour ***")
class local_cluster(_cluster):
def __init__(
self,
config={},
memory_limit=None,
**kwargs,
):
# initialize base class
super().__init__()
# set config defaults
config_defaults = {}
config = {**config_defaults, **config}
self.modify_dask_config(config)
# set LocalCluster defaults
if "host" not in kwargs:
kwargs["host"] = ""
if memory_limit is not None:
kwargs["memory_limit"] = memory_limit
# set up cluster, connect scheduler/client
cluster = LocalCluster(**kwargs)
client = Client(cluster)
self.set_cluster(cluster)
self.set_client(client)
class remote_cluster(_cluster):
def __init__(
self,
cluster, # a dask cluster object, could also be IP address, Cristian what do you prefer?
config={},
):
# initialize base class
super().__init__()
# set config defaults
config_defaults = {}
config = {**config_defaults, **config}
self.modify_dask_config(config)
# setup client
client = Client(cluster)
self.set_cluster(cluster)
self.set_client(client)