/
input.py
366 lines (285 loc) · 10.3 KB
/
input.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
"""
Provides everything need to handle ALS main inputs : images.
We need to read file and in the future, get images from INDI
"""
import logging
from abc import abstractmethod
from pathlib import Path
import cv2
from PyQt5.QtCore import pyqtSignal, QObject, QT_TRANSLATE_NOOP
from astropy.io import fits
import exifread
from rawpy import imread
from rawpy._rawpy import LibRawNonFatalError, LibRawFatalError
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from als import config
from als.code_utilities import log
from als.messaging import MESSAGE_HUB
from als.model.base import Image
_LOGGER = logging.getLogger(__name__)
_IGNORED_FILENAME_START_PATTERNS = ['.', '~', 'tmp']
_DEFAULT_SCAN_FILE_SIZE_RETRY_PERIOD_IN_SEC = 0.5
EXPOSURE_TIME_EXIF_TAG = 'EXIF ExposureTime'
SCANNER_TYPE_FILESYSTEM = "FS"
class InputError(Exception):
"""
Base class for all Exception subclasses in this module
"""
class ScannerStartError(InputError):
"""
Raised when folder scanner start is in error.
"""
class InputScanner:
"""
Base abstract class for all code responsible of ALS "image acquisition".
Subclasses are responsible for :
- replying to start & stop commands
- reading images from actual source
- creating Image objects
- broadcasting every new image
"""
new_image_path_signal = pyqtSignal(str)
"""Qt signal emitted when a new image is detected by scanner"""
@log
def broadcast_image_path(self, path: str):
"""
Send a signal with newly detected image path to anyone who cares
:param path: the new image path
:type path: str
"""
if path is not None:
self.new_image_path_signal.emit(path)
@abstractmethod
def start(self):
"""
Starts checking for new images
:raises: ScannerStartError if startup fails
"""
@abstractmethod
def stop(self):
"""
Stops checking for new images
"""
@staticmethod
@log
def create_scanner(scanner_type: str = SCANNER_TYPE_FILESYSTEM):
"""
Factory for image scanners.
:param scanner_type: the type of scanner to create. Accepted values are :
- "FS" for a filesystem scanner
:type scanner_type: str.
:return: the right scanner implementation
:rtype: InputScanner subclass
"""
if scanner_type == SCANNER_TYPE_FILESYSTEM:
return FolderScanner()
raise ValueError(f"Unsupported scanner type : {scanner_type}")
class FolderScanner(FileSystemEventHandler, InputScanner, QObject):
"""
Watches file changes (creation, move) in a specific filesystem folder
the watched directory is retrieved from user config on scanner startup
"""
@log
def __init__(self):
FileSystemEventHandler.__init__(self)
InputScanner.__init__(self)
QObject.__init__(self)
self._observer = None
@log
def start(self):
"""
Starts scanning scan folder for new files
"""
try:
scan_folder_path = config.get_scan_folder_path()
self._observer = PollingObserver()
self._observer.schedule(self, scan_folder_path, recursive=True)
self._observer.start()
except OSError as os_error:
raise ScannerStartError(os_error)
@log
def stop(self):
"""
Stops scanning scan folder for new files
"""
if self._observer is not None:
self._observer.stop()
self._observer = None
@log
def on_moved(self, event):
if event.event_type == 'moved':
image_path = event.dest_path
_LOGGER.debug(f"File move detected : {image_path}")
self.broadcast_image_path(image_path)
@log
def on_created(self, event):
if event.event_type == 'created':
image_path = event.src_path
_LOGGER.debug(f"File creation detected : {image_path}")
self.broadcast_image_path(image_path)
@log
def read_disk_image(path: Path):
"""
Reads an image from disk
:param path: path to the file to load image from
:type path: pathlib.Path
:return: the image read from disk or None if image is ignored or an error occurred
:rtype: Image or None
"""
ignore_image = False
image = None
for pattern in _IGNORED_FILENAME_START_PATTERNS:
if path.name.startswith(pattern):
ignore_image = True
break
if not ignore_image:
if path.suffix.lower() in ['.fit', '.fits', '.fts']:
image = _read_fit_image(path)
elif path.suffix.lower() in ['.jpg', '.jpeg', '.png', '.tif', '.tiff']:
image = _read_standard_image(path)
else:
image = _read_raw_image(path)
if image is not None:
image.origin = f"FILE : {str(path.resolve())}"
MESSAGE_HUB.dispatch_info(
__name__,
QT_TRANSLATE_NOOP("", "Successful image read from {}"),
[image.origin, ]
)
return image
@log
def _read_fit_image(path: Path):
"""
read FIT image from filesystem
:param path: path to image file to load from
:type path: pathlib.Path
:return: the loaded image, with data and headers parsed or None if a known error occurred
:rtype: Image or None
"""
try:
with fits.open(str(path.resolve())) as fit:
# pylint: disable=E1101
data = fit[0].data
header = fit[0].header
image = Image(data)
if 'BAYERPAT' in header:
image.bayer_pattern = header['BAYERPAT']
if 'EXPTIME' in header:
image.exposure_time = header['EXPTIME']
_LOGGER.debug(f"*SD-EXP_T* extracted exposure time: {image.exposure_time}")
except (OSError, TypeError) as error:
_report_fs_error(path, error)
return None
return image
@log
def _read_standard_image(path: Path):
"""
read standard image from filesystem using OpenCV
:param path: path to image file to load from
:type path: pathlib.Path
:return: the loaded image or None if a known error occurred
:rtype: Image or None
"""
data = cv2.imread(str(path.resolve()), cv2.IMREAD_UNCHANGED)
# convert color layers order for color images
if data.ndim > 2:
data = cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
return Image(data)
@log
def _read_raw_image(path: Path):
"""
Reads a RAW DLSR image from file
:param path: path to the file to read from
:type path: pathlib.Path
:return: the image or None if a known error occurred
:rtype: Image or None
"""
try:
with imread(str(path.resolve())) as raw_image:
# in here, we make sure we store the bayer pattern as it would be advertised if image was a FITS image.
#
# lets assume image comes from a DSLR sensor with the most common bayer pattern.
#
# The actual/physical bayer pattern would look like a repetition of :
#
# +---+---+
# | R | G |
# +---+---+
# | G | B |
# +---+---+
#
# RawPy will report the bayer pattern description as 2 discrete values :
#
# 1) raw_image.raw_pattern : a 2x2 numpy array representing the indices used to express the bayer patten
#
# in our example, its value is :
#
# +---+---+
# | 0 | 1 |
# +---+---+
# | 3 | 2 |
# +---+---+
#
# and its flatten version is :
#
# [0, 1, 3, 2]
#
# 2) raw_image.color_desc : a bytes literal formed of the color of each pixel of the bayer pattern, in
# ascending index order from raw_image.raw_pattern
#
# in our example, its value is : b'RGBG'
#
# We need to express/store this pattern in a more common way, i.e. as it would be described in a FITS
# header. Or put simply, we want to express the bayer pattern as it would be described if
# raw_image.raw_pattern was :
#
# +---+---+
# | 0 | 1 |
# +---+---+
# | 2 | 3 |
# +---+---+
bayer_pattern_indices = raw_image.raw_pattern.flatten()
bayer_pattern_desc = raw_image.color_desc.decode()
_LOGGER.debug(f"Bayer pattern indices = {bayer_pattern_indices}")
_LOGGER.debug(f"Bayer pattern description = {bayer_pattern_desc}")
assert len(bayer_pattern_indices) == len(bayer_pattern_desc)
bayer_pattern = ""
for i, index in enumerate(bayer_pattern_indices):
assert bayer_pattern_indices[i] < len(bayer_pattern_indices)
bayer_pattern += bayer_pattern_desc[index]
_LOGGER.debug(f"Computed, FITS-compatible bayer pattern = {bayer_pattern}")
new_image = Image(raw_image.raw_image_visible.copy())
new_image.bayer_pattern = bayer_pattern
extract_exifs(new_image, path)
return new_image
except LibRawNonFatalError as non_fatal_error:
_report_fs_error(path, non_fatal_error)
return None
except LibRawFatalError as fatal_error:
_report_fs_error(path, fatal_error)
return None
@log
def extract_exifs(image, image_path):
"""
Try and get some exifs from the image file
For now, we only look for exposure time, but who knows...
The image in modified in place
:param image: the image to feed
:type image: Image
:param image_path: path to original file
:type image_path: Path
:return: None
"""
with open(image_path, 'rb') as raw:
tags = exifread.process_file(raw)
if tags and EXPOSURE_TIME_EXIF_TAG in tags.keys():
exposure_time = float(tags[EXPOSURE_TIME_EXIF_TAG].values[0])
_LOGGER.debug(f"*SD-EXP_T* extracted exposure time: {exposure_time}")
image.exposure_time = exposure_time
@log
def _report_fs_error(path: Path, error: Exception):
MESSAGE_HUB.dispatch_error(
__name__,
QT_TRANSLATE_NOOP("", "Error reading from file {} : {}"),
[str(path.resolve()), str(error)])