-
Notifications
You must be signed in to change notification settings - Fork 44
/
data_manager.py
227 lines (188 loc) · 8.85 KB
/
data_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import os
import logging
import requests
import ftplib
import concurrent.futures as cf
from parsl.data_provider.scheme import GlobusScheme
from parsl.executors.base import ParslExecutor
from parsl.data_provider.globus import get_globus
from parsl.app.app import python_app
logger = logging.getLogger(__name__)
def _http_stage_in(working_dir, outputs=[]):
file = outputs[0]
if working_dir:
os.makedirs(working_dir, exist_ok=True)
file.local_path = os.path.join(working_dir, file.filename)
else:
file.local_path = file.filename
resp = requests.get(file.url, stream=True)
with open(file.local_path, 'wb') as f:
for chunk in resp.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
def _ftp_stage_in(working_dir, outputs=[]):
file = outputs[0]
if working_dir:
os.makedirs(working_dir, exist_ok=True)
file.local_path = os.path.join(working_dir, file.filename)
else:
file.local_path = file.filename
with open(file.local_path, 'wb') as f:
ftp = ftplib.FTP(file.netloc)
ftp.login()
ftp.cwd(os.path.dirname(file.path))
ftp.retrbinary('RETR {}'.format(file.filename), f.write)
ftp.quit()
class DataManager(ParslExecutor):
"""The DataManager is responsible for transferring input and output data.
It uses the Executor interface, where staging tasks are submitted
to it, and DataFutures are returned.
"""
@classmethod
def get_data_manager(cls):
"""Return the DataManager of the currently loaded DataFlowKernel.
"""
from parsl.dataflow.dflow import DataFlowKernelLoader
dfk = DataFlowKernelLoader.dfk()
return dfk.executors['data_manager']
def __init__(self, dfk, max_threads=10):
"""Initialize the DataManager.
Args:
- dfk (DataFlowKernel): The DataFlowKernel that this DataManager is managing data for.
Kwargs:
- max_threads (int): Number of threads. Default is 10.
- executors (list of Executors): Executors for which data transfer will be managed.
"""
self._scaling_enabled = False
self.label = 'data_manager'
self.dfk = dfk
self.max_threads = max_threads
self.files = []
self.globus = None
self.managed = True
def start(self):
self.executor = cf.ThreadPoolExecutor(max_workers=self.max_threads)
def submit(self, *args, **kwargs):
"""Submit a staging app. All optimization should be here."""
return self.executor.submit(*args, **kwargs)
def scale_in(self, blocks, *args, **kwargs):
pass
def scale_out(self, *args, **kwargs):
pass
def shutdown(self, block=False):
"""Shutdown the ThreadPool.
Kwargs:
- block (bool): To block for confirmations or not
"""
x = self.executor.shutdown(wait=block)
logger.debug("Done with executor shutdown")
return x
@property
def scaling_enabled(self):
return self._scaling_enabled
def add_file(self, file):
if file.scheme == 'globus':
if not self.globus:
self.globus = get_globus()
# keep a list of all remote files for optimization purposes (TODO)
self.files.append(file)
self._set_local_path(file)
def _set_local_path(self, file):
globus_ep = self._get_globus_endpoint()
file.local_path = os.path.join(globus_ep['working_dir'], file.filename)
def _get_globus_endpoint(self, executor_label=None):
for executor in self.dfk.executors.values():
if (executor_label is None or executor.label == executor_label) and hasattr(executor, "storage_access"):
for scheme in executor.storage_access:
if isinstance(scheme, GlobusScheme):
if executor.working_dir:
working_dir = os.path.normpath(executor.working_dir)
else:
raise ValueError("executor working_dir must be specified for GlobusScheme")
if scheme.endpoint_path and scheme.local_path:
endpoint_path = os.path.normpath(scheme.endpoint_path)
local_path = os.path.normpath(scheme.local_path)
common_path = os.path.commonpath((local_path, working_dir))
if local_path != common_path:
raise Exception('"local_path" must be equal or an absolute subpath of "working_dir"')
relative_path = os.path.relpath(working_dir, common_path)
endpoint_path = os.path.join(endpoint_path, relative_path)
else:
endpoint_path = working_dir
return {'endpoint_uuid': scheme.endpoint_uuid,
'endpoint_path': endpoint_path,
'working_dir': working_dir}
raise Exception('No executor with a Globus endpoint and working_dir defined')
def stage_in(self, file, executor):
"""Transport the file from the input source to the executor.
This function returns a DataFuture.
Args:
- self
- file (File) : file to stage in
- executor (str) : an executor the file is going to be staged in to.
If the executor argument is not specified for a file
with 'globus' scheme, the file will be staged in to
the first executor with the "globus" key in a config.
"""
if file.scheme == 'ftp':
working_dir = self.dfk.executors[executor].working_dir
stage_in_app = self._ftp_stage_in_app(executor=executor)
app_fut = stage_in_app(working_dir, outputs=[file])
return app_fut._outputs[0]
elif file.scheme == 'http' or file.scheme == 'https':
working_dir = self.dfk.executors[executor].working_dir
stage_in_app = self._http_stage_in_app(executor=executor)
app_fut = stage_in_app(working_dir, outputs=[file])
return app_fut._outputs[0]
elif file.scheme == 'globus':
globus_ep = self._get_globus_endpoint(executor)
stage_in_app = self._globus_stage_in_app()
app_fut = stage_in_app(globus_ep, outputs=[file])
return app_fut._outputs[0]
else:
raise Exception('Staging in with unknown file scheme {} is not supported'.format(file.scheme))
def _ftp_stage_in_app(self, executor):
return python_app(executors=[executor])(_ftp_stage_in)
def _http_stage_in_app(self, executor):
return python_app(executors=[executor])(_http_stage_in)
def _globus_stage_in_app(self):
return python_app(executors=['data_manager'])(self._globus_stage_in)
def _globus_stage_in(self, globus_ep, outputs=[]):
file = outputs[0]
file.local_path = os.path.join(
globus_ep['working_dir'], file.filename)
dst_path = os.path.join(
globus_ep['endpoint_path'], file.filename)
self.globus.transfer_file(
file.netloc, globus_ep['endpoint_uuid'],
file.path, dst_path)
def stage_out(self, file, executor):
"""Transport the file from the local filesystem to the remote Globus endpoint.
This function returns a DataFuture.
Args:
- self
- file (File) - file to stage out
- executor (str) - Which executor the file is going to be staged out from.
If the executor argument is not specified for a file
with the 'globus' scheme, the file will be staged in to
the first executor with the "globus" key in a config.
"""
if file.scheme == 'http' or file.scheme == 'https':
raise Exception('HTTP/HTTPS file staging out is not supported')
elif file.scheme == 'ftp':
raise Exception('FTP file staging out is not supported')
elif file.scheme == 'globus':
globus_ep = self._get_globus_endpoint(executor)
stage_out_app = self._globus_stage_out_app()
return stage_out_app(globus_ep, inputs=[file])
else:
raise Exception('Staging out with unknown file scheme {} is not supported'.format(file.scheme))
def _globus_stage_out_app(self):
return python_app(executors=['data_manager'])(self._globus_stage_out)
def _globus_stage_out(self, globus_ep, inputs=[]):
file = inputs[0]
src_path = os.path.join(globus_ep['endpoint_path'], file.filename)
self.globus.transfer_file(
globus_ep['endpoint_uuid'], file.netloc,
src_path, file.path
)