forked from royerlab/aydin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
277 lines (213 loc) · 7.28 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import os
from collections import Counter
from copy import deepcopy
from os.path import exists
from pathlib import Path
from typing import List, Tuple
import numpy
import zarr
import dask.array as da
from aydin.util.log.log import lprint
def is_zarr_storage(input_path):
"""Method to check if given file is a zarr storage or not.
Parameters
----------
input_path : str
Returns
-------
bool
Result of whether the file in the given path is a zarr storage or not.
"""
try:
z = zarr.open(input_path)
if len(z.shape) >= 0:
lprint(f"This path is a ZARR storage: {input_path}")
else:
raise Exception
# IF we reach this point, then we could open the file and therefore it is a Zarr file...
return True
except Exception:
return False
def read_zarr_array(input_path):
"""Method that reads a zarr file. If the file is a zarr.Array, this will
read as an zarr.Array. If the file is a zarr.Group, this method can only
read it if it is in dexp-zarr group format.
Parameters
----------
input_path : str
Returns
-------
numpy.typing.ArrayLike
"""
g = zarr.open(input_path, mode='a')
if isinstance(g, zarr.Array):
return da.from_zarr(input_path)
else:
arrays = [
g[key][key] for key in g.group_keys() if "mip" not in g[key][key].name
]
array = da.stack(arrays, axis=0)
return array
def get_files_with_most_frequent_extension(path) -> List[str]:
"""Method that looks into the given path and return the list of files with
the most frequent file extension.
Parameters
----------
path : str
Returns
-------
List[str]
"""
files_in_folder = os.listdir(path)
extensions = [Path(file).suffix[1:] for file in files_in_folder]
counts = Counter(extensions)
most_frequent_extension = sorted(counts, key=counts.__getitem__)[-1]
files = [
file for file in files_in_folder if file.endswith(f".{most_frequent_extension}")
]
return files
def get_output_image_path(
path: str, operation_type: str = "denoised", output_folder: str = None
) -> Tuple[str, int]:
"""Method to get correct output path for given input path and operation type.
Parameters
----------
path : str
operation_type : str
Currently supported values: 'denoised', 'hyperstacked'.
output_folder : str
Returns
-------
Tuple
(Correct output path, counter).
"""
if operation_type not in ["denoised", "hyperstacked"]:
raise ValueError(
f"invalud value for operation_type parameter: {operation_type}"
)
if output_folder:
path = os.path.join(output_folder, Path(path).name)
image_formats = [
".zarr.zip",
".zarr",
".tiff",
".png",
".tif",
".TIF",
".czi",
".npy",
".nd2",
]
for image_format in image_formats:
if image_format in path:
output_path = (
f"{path.split(image_format)[0]}_{operation_type}{image_format}"
)
output_image_format = image_format
break
else: # means no break in this context
lprint("Image file format is not supported, will be writing result as tif")
output_path = f"{path[:path.rfind('.')]}_{operation_type}.tif"
output_image_format = ".tif"
counter = 1
response_counter = None
while exists(output_path):
response_counter = counter
output_path = f"{output_path.split('_denoised')[0]}_{operation_type}{counter}{output_image_format}"
counter += 1
return output_path, response_counter
def get_options_json_path(
path: str, passed_counter: int = None, output_folder: str = None
) -> str:
if output_folder:
path = os.path.join(output_folder, Path(path).name)
options_path = f"{path[:path.rfind('.')]}_options.json"
if passed_counter is None:
counter = 1
while exists(options_path):
options_path = f"{options_path.split('_options')[0]}_options{counter}.json"
counter += 1
else:
options_path = (
f"{options_path.split('_options')[0]}_options{passed_counter}.json"
)
return options_path
def get_save_model_path(
path: str, passed_counter: int = None, output_folder: str = None
) -> str:
if output_folder:
path = os.path.join(output_folder, Path(path).name)
model_path = f"{path[:path.rfind('.')]}_model"
if passed_counter is None:
counter = 1
while exists(model_path):
model_path = f"{model_path.split('_model')[0]}_model{counter}"
counter += 1
else:
model_path = f"{model_path.split('_model')[0]}_model{passed_counter}"
return model_path
def split_image_channels(image_array, metadata):
"""Method that takes a multichannel image and its metadata and splits
into single channel images.
Parameters
----------
image_array : numpy.typing.ArrayLike
metadata : FileMetadata
Returns
-------
tuple(List[numpy.typing.ArrayLike], List[FileMetadata])
Tuple of splitted_arrays and metadatas.
"""
channel_axis = metadata.axes.find("C")
if channel_axis == -1:
lprint("Array has no channel axis detected")
return
# Handle image splitting
splitted_arrays = numpy.split(
image_array, metadata.shape[channel_axis], axis=channel_axis
)
splitted_arrays = [numpy.squeeze(array) for array in splitted_arrays]
# Handle metadata changes
metadata.batch_axes = tuple(
x for ind, x in enumerate(metadata.batch_axes) if ind != channel_axis
)
metadata.channel_axes = tuple(
x for ind, x in enumerate(metadata.channel_axes) if ind != channel_axis
)
metadata.axes = metadata.axes.replace("C", "")
metadata.shape = tuple(
x for idx, x in enumerate(metadata.shape) if idx != channel_axis
)
metadata.splitted = True
metadatas = [metadata] * len(splitted_arrays)
return splitted_arrays, metadatas
def hyperstack_arrays(image_arrays, metadatas):
"""Method that takes a list of arrays of same shape and their corresponding
metadatas, then hyperstacks those into a single image.
Parameters
----------
image_arrays : List[numpy.typing.ArrayLike]
metadatas : List[FileMetadata]
Returns
-------
tuple(numpy.typing.ArrayLike, FileMetadata)
Tuple of hyperstacked image array and its metadata.
"""
if len(image_arrays) < 2:
lprint("Need at least two images to hyperstack.")
return image_arrays, metadatas
shape_of_first_image = ()
for idx, metadata in enumerate(metadatas):
if idx == 0:
shape_of_first_image = metadata.shape
elif shape_of_first_image != metadata.shape:
raise Exception(
"Images are not same shape, hence cannot hyperstack images."
)
metadata = deepcopy(metadatas[-1])
metadata.axes = "B" + metadata.axes
metadata.batch_axes = (True,) + metadata.batch_axes
metadata.channel_axes = (False,) + metadata.channel_axes
metadata.shape = (len(image_arrays),) + metadata.shape
image_array = numpy.stack(image_arrays)
return image_array, metadata