-
-
Notifications
You must be signed in to change notification settings - Fork 155
/
io.py
320 lines (233 loc) · 7.07 KB
/
io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import numpy as np
from aesara.graph.basic import Apply, Constant, Variable
from aesara.graph.op import Op
from aesara.graph.sched import key_to_cmp
from aesara.link.c.type import Generic
from aesara.tensor.type import tensor
class LoadFromDisk(Op):
"""
An operation to load an array from disk.
See Also
--------
load
Notes
-----
Non-differentiable.
"""
__props__ = ("dtype", "shape", "mmap_mode")
def __init__(self, dtype, shape, mmap_mode=None):
self.dtype = np.dtype(dtype) # turn "float64" into np.float64
self.shape = shape
if mmap_mode not in (None, "c"):
raise ValueError(
"The only supported values for mmap_mode "
"are None and 'c', got %s" % mmap_mode
)
self.mmap_mode = mmap_mode
def make_node(self, path):
if isinstance(path, str):
path = Constant(Generic(), path)
return Apply(self, [path], [tensor(self.dtype, shape=self.shape)])
def perform(self, node, inp, out):
path = inp[0]
if path.split(".")[-1] == "npz":
raise ValueError(f"Expected a .npy file, got {path} instead")
result = np.load(path, mmap_mode=self.mmap_mode)
if result.dtype != self.dtype:
raise TypeError(
f"Expected an array of type {self.dtype}, got {result.dtype} instead"
)
out[0][0] = result
def __str__(self):
return "Load{{dtype: {}, shape: {}, mmep: {}}}".format(
self.dtype,
self.shape,
self.mmap_mode,
)
def load(path, dtype, shape, mmap_mode=None):
"""
Load an array from an .npy file.
Parameters
----------
path
A Generic symbolic variable, that will contain a string
dtype : data-type
The data type of the array to be read.
shape
The static shape information of the loaded array.
mmap_mode
How the file will be loaded. None means that the
data will be copied into an array in memory, 'c' means that the file
will be mapped into virtual memory, so only the parts that are
needed will be actually read from disk and put into memory.
Other modes supported by numpy.load ('r', 'r+', 'w+') cannot
be supported by Aesara.
Examples
--------
>>> from aesara import *
>>> path = Variable(Generic(), None)
>>> x = tensor.load(path, 'int64', (None,))
>>> y = x*2
>>> fn = function([path], y)
>>> fn("stored-array.npy") # doctest: +SKIP
array([0, 2, 4, 6, 8], dtype=int64)
"""
return LoadFromDisk(dtype, shape, mmap_mode)(path)
##########################
# MPI
##########################
try:
from mpi4py import MPI
except ImportError:
mpi_enabled = False
else:
comm = MPI.COMM_WORLD
mpi_enabled = True
class MPIRecv(Op):
"""
An operation to asynchronously receive an array to a remote host using MPI.
See Also
--------
MPIRecv
MPIWait
Notes
-----
Non-differentiable.
"""
__props__ = ("source", "tag", "shape", "dtype")
def __init__(self, source, tag, shape, dtype):
self.source = source
self.tag = tag
self.shape = shape
self.dtype = np.dtype(dtype) # turn "float64" into numpy.float64
self.static_shape = (None,) * len(shape)
def make_node(self):
return Apply(
self,
[],
[
Variable(Generic(), None),
tensor(self.dtype, shape=self.static_shape),
],
)
def perform(self, node, inp, out):
data = np.zeros(self.shape, dtype=self.dtype)
request = comm.Irecv(data, self.source, self.tag)
out[0][0] = request
out[1][0] = data
def __str__(self):
return f"MPIRecv{{source: {int(self.source)}, tag: {int(self.tag)}, shape: {self.shape}, dtype: {self.dtype}}}"
def infer_shape(self, fgraph, node, shapes):
return [None, self.shape]
def do_constant_folding(self, fgraph, node):
return False
class MPIRecvWait(Op):
"""
An operation to wait on a previously received array using MPI.
See Also
--------
MPIRecv
Notes
-----
Non-differentiable.
"""
__props__ = ("tag",)
def __init__(self, tag):
self.tag = tag
def make_node(self, request, data):
return Apply(
self,
[request, data],
[tensor(data.dtype, shape=data.type.shape)],
)
def perform(self, node, inp, out):
request = inp[0]
data = inp[1]
request.wait()
out[0][0] = data
def infer_shape(self, fgraph, node, shapes):
return [shapes[1]]
view_map = {0: [1]}
class MPISend(Op):
"""
An operation to asynchronously Send an array to a remote host using MPI.
See Also
--------
MPIRecv
MPISendWait
Notes
-----
Non-differentiable.
"""
__props__ = ("dest", "tag")
def __init__(self, dest, tag):
self.dest = dest
self.tag = tag
def make_node(self, data):
return Apply(self, [data], [Variable(Generic(), None), data.type()])
view_map = {1: [0]}
def perform(self, node, inp, out):
data = inp[0]
request = comm.Isend(data, self.dest, self.tag)
out[0][0] = request
out[1][0] = data
def __str__(self):
return f"MPISend{{dest: {int(self.dest)}, tag: {int(self.tag)}}}"
class MPISendWait(Op):
"""
An operation to wait on a previously sent array using MPI.
See Also
--------
MPISend
Notes
-----
Non-differentiable.
"""
__props__ = ("tag",)
def __init__(self, tag):
self.tag = tag
def make_node(self, request, data):
return Apply(self, [request, data], [Variable(Generic(), None)])
def perform(self, node, inp, out):
request = inp[0]
request.wait()
out[0][0] = True
def isend(var, dest, tag):
"""
Non blocking send.
"""
return MPISend(dest, tag)(var)
def send(var, dest, tag):
"""
Blocking send.
"""
return MPISendWait(tag)(*isend(var, dest, tag))
def irecv(shape, dtype, source, tag):
"""
Non-blocking receive.
"""
return MPIRecv(source, tag, shape, dtype)()
def recv(shape, dtype, source, tag):
"""
Blocking receive.
"""
return MPIRecvWait(tag)(*irecv(shape, dtype, source, tag))
# Ordering keys for scheduling
def mpi_send_wait_key(a):
"""Wait as long as possible on Waits, Start Send/Recvs early."""
if isinstance(a.op, (MPIRecvWait, MPISendWait)):
return 1
if isinstance(a.op, (MPIRecv, MPISend)):
return -1
return 0
def mpi_tag_key(a):
"""Break MPI ties by using the variable tag - prefer lower tags first."""
if isinstance(a.op, (MPISend, MPIRecv, MPIRecvWait, MPISendWait)):
return a.op.tag
else:
return 0
mpi_send_wait_cmp = key_to_cmp(mpi_send_wait_key)
mpi_tag_cmp = key_to_cmp(mpi_tag_key)
mpi_keys = (mpi_send_wait_key, mpi_tag_key)
mpi_cmps = (mpi_send_wait_cmp, mpi_tag_cmp)
__all__ = ["load"]