/
base.py
1612 lines (1305 loc) · 56.9 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Licensed under a 3-clause BSD style license - see PYFITS.rst
import datetime
import os
import sys
import warnings
from contextlib import suppress
from inspect import signature, Parameter
import numpy as np
from .. import conf
from ..file import _File
from ..header import Header, _pad_length
from ..util import (_is_int, _is_pseudo_unsigned, _unsigned_zero,
itersubclasses, decode_ascii, _get_array_mmap, first,
_free_space_check, _extract_number)
from ..verify import _Verify, _ErrList
from ....utils import lazyproperty
from ....utils.exceptions import AstropyUserWarning
from ....utils.decorators import deprecated_renamed_argument
class _Delayed:
pass
DELAYED = _Delayed()
BITPIX2DTYPE = {8: 'uint8', 16: 'int16', 32: 'int32', 64: 'int64',
-32: 'float32', -64: 'float64'}
"""Maps FITS BITPIX values to Numpy dtype names."""
DTYPE2BITPIX = {'uint8': 8, 'int16': 16, 'uint16': 16, 'int32': 32,
'uint32': 32, 'int64': 64, 'uint64': 64, 'float32': -32,
'float64': -64}
"""
Maps Numpy dtype names to FITS BITPIX values (this includes unsigned
integers, with the assumption that the pseudo-unsigned integer convention
will be used in this case.
"""
class InvalidHDUException(Exception):
"""
A custom exception class used mainly to signal to _BaseHDU.__new__ that
an HDU cannot possibly be considered valid, and must be assumed to be
corrupted.
"""
def _hdu_class_from_header(cls, header):
"""
Used primarily by _BaseHDU.__new__ to find an appropriate HDU class to use
based on values in the header. See the _BaseHDU.__new__ docstring.
"""
klass = cls # By default, if no subclasses are defined
if header:
for c in reversed(list(itersubclasses(cls))):
try:
# HDU classes built into astropy.io.fits are always considered,
# but extension HDUs must be explicitly registered
if not (c.__module__.startswith('astropy.io.fits.') or
c in cls._hdu_registry):
continue
if c.match_header(header):
klass = c
break
except NotImplementedError:
continue
except Exception as exc:
warnings.warn(
'An exception occurred matching an HDU header to the '
'appropriate HDU type: {0}'.format(exc),
AstropyUserWarning)
warnings.warn('The HDU will be treated as corrupted.',
AstropyUserWarning)
klass = _CorruptedHDU
del exc
break
return klass
class _BaseHDUMeta(type):
def __init__(cls, name, bases, members):
# The sole purpose of this metaclass right now is to add the same
# data.deleter to all HDUs with a data property.
# It's unfortunate, but there's otherwise no straightforward way
# that a property can inherit setters/deleters of the property of the
# same name on base classes
if 'data' in members:
data_prop = members['data']
if (isinstance(data_prop, (lazyproperty, property)) and
data_prop.fdel is None):
# Don't do anything if the class has already explicitly
# set the deleter for its data property
def data(self):
# The deleter
if self._file is not None and self._data_loaded:
data_refcount = sys.getrefcount(self.data)
# Manually delete *now* so that FITS_rec.__del__
# cleanup can happen if applicable
del self.__dict__['data']
# Don't even do this unless the *only* reference to the
# .data array was the one we're deleting by deleting
# this attribute; if any other references to the array
# are hanging around (perhaps the user ran ``data =
# hdu.data``) don't even consider this:
if data_refcount == 2:
self._file._maybe_close_mmap()
setattr(cls, 'data', data_prop.deleter(data))
# TODO: Come up with a better __repr__ for HDUs (and for HDULists, for that
# matter)
class _BaseHDU(metaclass=_BaseHDUMeta):
"""Base class for all HDU (header data unit) classes."""
_hdu_registry = set()
# This HDU type is part of the FITS standard
_standard = True
# Byte to use for padding out blocks
_padding_byte = '\x00'
_default_name = ''
def __new__(cls, data=None, header=None, *args, **kwargs):
"""
Iterates through the subclasses of _BaseHDU and uses that class's
match_header() method to determine which subclass to instantiate.
It's important to be aware that the class hierarchy is traversed in a
depth-last order. Each match_header() should identify an HDU type as
uniquely as possible. Abstract types may choose to simply return False
or raise NotImplementedError to be skipped.
If any unexpected exceptions are raised while evaluating
match_header(), the type is taken to be _CorruptedHDU.
"""
klass = _hdu_class_from_header(cls, header)
return super().__new__(klass)
def __init__(self, data=None, header=None, *args, **kwargs):
if header is None:
header = Header()
self._header = header
self._file = None
self._buffer = None
self._header_offset = None
self._data_offset = None
self._data_size = None
# This internal variable is used to track whether the data attribute
# still points to the same data array as when the HDU was originally
# created (this does not track whether the data is actually the same
# content-wise)
self._data_replaced = False
self._data_needs_rescale = False
self._new = True
self._output_checksum = False
if 'DATASUM' in self._header and 'CHECKSUM' not in self._header:
self._output_checksum = 'datasum'
elif 'CHECKSUM' in self._header:
self._output_checksum = True
@property
def header(self):
return self._header
@header.setter
def header(self, value):
self._header = value
@property
def name(self):
# Convert the value to a string to be flexible in some pathological
# cases (see ticket #96)
return str(self._header.get('EXTNAME', self._default_name))
@name.setter
def name(self, value):
if not isinstance(value, str):
raise TypeError("'name' attribute must be a string")
if not conf.extension_name_case_sensitive:
value = value.upper()
if 'EXTNAME' in self._header:
self._header['EXTNAME'] = value
else:
self._header['EXTNAME'] = (value, 'extension name')
@property
def ver(self):
return self._header.get('EXTVER', 1)
@ver.setter
def ver(self, value):
if not _is_int(value):
raise TypeError("'ver' attribute must be an integer")
if 'EXTVER' in self._header:
self._header['EXTVER'] = value
else:
self._header['EXTVER'] = (value, 'extension value')
@property
def level(self):
return self._header.get('EXTLEVEL', 1)
@level.setter
def level(self, value):
if not _is_int(value):
raise TypeError("'level' attribute must be an integer")
if 'EXTLEVEL' in self._header:
self._header['EXTLEVEL'] = value
else:
self._header['EXTLEVEL'] = (value, 'extension level')
@property
def is_image(self):
return (
self.name == 'PRIMARY' or
('XTENSION' in self._header and
(self._header['XTENSION'] == 'IMAGE' or
(self._header['XTENSION'] == 'BINTABLE' and
'ZIMAGE' in self._header and self._header['ZIMAGE'] is True))))
@property
def _data_loaded(self):
return ('data' in self.__dict__ and self.data is not DELAYED)
@property
def _has_data(self):
return self._data_loaded and self.data is not None
@classmethod
def register_hdu(cls, hducls):
cls._hdu_registry.add(hducls)
@classmethod
def unregister_hdu(cls, hducls):
if hducls in cls._hdu_registry:
cls._hdu_registry.remove(hducls)
@classmethod
def match_header(cls, header):
raise NotImplementedError
@classmethod
def fromstring(cls, data, checksum=False, ignore_missing_end=False,
**kwargs):
"""
Creates a new HDU object of the appropriate type from a string
containing the HDU's entire header and, optionally, its data.
Note: When creating a new HDU from a string without a backing file
object, the data of that HDU may be read-only. It depends on whether
the underlying string was an immutable Python str/bytes object, or some
kind of read-write memory buffer such as a `memoryview`.
Parameters
----------
data : str, bytearray, memoryview, ndarray
A byte string containing the HDU's header and data.
checksum : bool, optional
Check the HDU's checksum and/or datasum.
ignore_missing_end : bool, optional
Ignore a missing end card in the header data. Note that without the
end card the end of the header may be ambiguous and resulted in a
corrupt HDU. In this case the assumption is that the first 2880
block that does not begin with valid FITS header data is the
beginning of the data.
kwargs : optional
May consist of additional keyword arguments specific to an HDU
type--these correspond to keywords recognized by the constructors of
different HDU classes such as `PrimaryHDU`, `ImageHDU`, or
`BinTableHDU`. Any unrecognized keyword arguments are simply
ignored.
"""
return cls._readfrom_internal(data, checksum=checksum,
ignore_missing_end=ignore_missing_end,
**kwargs)
@classmethod
def readfrom(cls, fileobj, checksum=False, ignore_missing_end=False,
**kwargs):
"""
Read the HDU from a file. Normally an HDU should be opened with
:func:`open` which reads the entire HDU list in a FITS file. But this
method is still provided for symmetry with :func:`writeto`.
Parameters
----------
fileobj : file object or file-like object
Input FITS file. The file's seek pointer is assumed to be at the
beginning of the HDU.
checksum : bool
If `True`, verifies that both ``DATASUM`` and ``CHECKSUM`` card
values (when present in the HDU header) match the header and data
of all HDU's in the file.
ignore_missing_end : bool
Do not issue an exception when opening a file that is missing an
``END`` card in the last header.
"""
# TODO: Figure out a way to make it possible for the _File
# constructor to be a noop if the argument is already a _File
if not isinstance(fileobj, _File):
fileobj = _File(fileobj)
hdu = cls._readfrom_internal(fileobj, checksum=checksum,
ignore_missing_end=ignore_missing_end,
**kwargs)
# If the checksum had to be checked the data may have already been read
# from the file, in which case we don't want to seek relative
fileobj.seek(hdu._data_offset + hdu._data_size, os.SEEK_SET)
return hdu
@deprecated_renamed_argument('clobber', 'overwrite', '2.0')
def writeto(self, name, output_verify='exception', overwrite=False,
checksum=False):
"""
Write the HDU to a new file. This is a convenience method to
provide a user easier output interface if only one HDU needs
to be written to a file.
Parameters
----------
name : file path, file object or file-like object
Output FITS file. If the file object is already opened, it must
be opened in a writeable mode.
output_verify : str
Output verification option. Must be one of ``"fix"``,
``"silentfix"``, ``"ignore"``, ``"warn"``, or
``"exception"``. May also be any combination of ``"fix"`` or
``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception"
(e.g. ``"fix+warn"``). See :ref:`verify` for more info.
overwrite : bool, optional
If ``True``, overwrite the output file if it exists. Raises an
``OSError`` if ``False`` and the output file exists. Default is
``False``.
.. versionchanged:: 1.3
``overwrite`` replaces the deprecated ``clobber`` argument.
checksum : bool
When `True` adds both ``DATASUM`` and ``CHECKSUM`` cards
to the header of the HDU when written to the file.
"""
from .hdulist import HDUList
hdulist = HDUList([self])
hdulist.writeto(name, output_verify, overwrite=overwrite,
checksum=checksum)
@classmethod
def _readfrom_internal(cls, data, header=None, checksum=False,
ignore_missing_end=False, **kwargs):
"""
Provides the bulk of the internal implementation for readfrom and
fromstring.
For some special cases, supports using a header that was already
created, and just using the input data for the actual array data.
"""
hdu_buffer = None
hdu_fileobj = None
header_offset = 0
if isinstance(data, _File):
if header is None:
header_offset = data.tell()
header = Header.fromfile(data, endcard=not ignore_missing_end)
hdu_fileobj = data
data_offset = data.tell() # *after* reading the header
else:
try:
# Test that the given object supports the buffer interface by
# ensuring an ndarray can be created from it
np.ndarray((), dtype='ubyte', buffer=data)
except TypeError:
raise TypeError(
'The provided object {!r} does not contain an underlying '
'memory buffer. fromstring() requires an object that '
'supports the buffer interface such as bytes, buffer, '
'memoryview, ndarray, etc. This restriction is to ensure '
'that efficient access to the array/table data is possible.'
.format(data))
if header is None:
def block_iter(nbytes):
idx = 0
while idx < len(data):
yield data[idx:idx + nbytes]
idx += nbytes
header_str, header = Header._from_blocks(
block_iter, True, '', not ignore_missing_end, True)
if len(data) > len(header_str):
hdu_buffer = data
elif data:
hdu_buffer = data
header_offset = 0
data_offset = len(header_str)
# Determine the appropriate arguments to pass to the constructor from
# self._kwargs. self._kwargs contains any number of optional arguments
# that may or may not be valid depending on the HDU type
cls = _hdu_class_from_header(cls, header)
sig = signature(cls.__init__)
new_kwargs = kwargs.copy()
if Parameter.VAR_KEYWORD not in (x.kind for x in sig.parameters.values()):
# If __init__ accepts arbitrary keyword arguments, then we can go
# ahead and pass all keyword arguments; otherwise we need to delete
# any that are invalid
for key in kwargs:
if key not in sig.parameters:
del new_kwargs[key]
hdu = cls(data=DELAYED, header=header, **new_kwargs)
# One of these may be None, depending on whether the data came from a
# file or a string buffer--later this will be further abstracted
hdu._file = hdu_fileobj
hdu._buffer = hdu_buffer
hdu._header_offset = header_offset # beginning of the header area
hdu._data_offset = data_offset # beginning of the data area
# data area size, including padding
size = hdu.size
hdu._data_size = size + _pad_length(size)
# Checksums are not checked on invalid HDU types
if checksum and checksum != 'remove' and isinstance(hdu, _ValidHDU):
hdu._verify_checksum_datasum()
return hdu
def _get_raw_data(self, shape, code, offset):
"""
Return raw array from either the HDU's memory buffer or underlying
file.
"""
if isinstance(shape, int):
shape = (shape,)
if self._buffer:
return np.ndarray(shape, dtype=code, buffer=self._buffer,
offset=offset)
elif self._file:
return self._file.readarray(offset=offset, dtype=code, shape=shape)
else:
return None
# TODO: Rework checksum handling so that it's not necessary to add a
# checksum argument here
# TODO: The BaseHDU class shouldn't even handle checksums since they're
# only implemented on _ValidHDU...
def _prewriteto(self, checksum=False, inplace=False):
self._update_uint_scale_keywords()
# Handle checksum
self._update_checksum(checksum)
def _update_uint_scale_keywords(self):
"""
If the data is unsigned int 16, 32, or 64 add BSCALE/BZERO cards to
header.
"""
if (self._has_data and self._standard and
_is_pseudo_unsigned(self.data.dtype)):
# CompImageHDUs need TFIELDS immediately after GCOUNT,
# so BSCALE has to go after TFIELDS if it exists.
if 'TFIELDS' in self._header:
self._header.set('BSCALE', 1, after='TFIELDS')
elif 'GCOUNT' in self._header:
self._header.set('BSCALE', 1, after='GCOUNT')
else:
self._header.set('BSCALE', 1)
self._header.set('BZERO', _unsigned_zero(self.data.dtype),
after='BSCALE')
def _update_checksum(self, checksum, checksum_keyword='CHECKSUM',
datasum_keyword='DATASUM'):
"""Update the 'CHECKSUM' and 'DATASUM' keywords in the header (or
keywords with equivalent semantics given by the ``checksum_keyword``
and ``datasum_keyword`` arguments--see for example ``CompImageHDU``
for an example of why this might need to be overridden).
"""
# If the data is loaded it isn't necessarily 'modified', but we have no
# way of knowing for sure
modified = self._header._modified or self._data_loaded
if checksum == 'remove':
if checksum_keyword in self._header:
del self._header[checksum_keyword]
if datasum_keyword in self._header:
del self._header[datasum_keyword]
elif (modified or self._new or
(checksum and ('CHECKSUM' not in self._header or
'DATASUM' not in self._header or
not self._checksum_valid or
not self._datasum_valid))):
if checksum == 'datasum':
self.add_datasum(datasum_keyword=datasum_keyword)
elif checksum:
self.add_checksum(checksum_keyword=checksum_keyword,
datasum_keyword=datasum_keyword)
def _postwriteto(self):
# If data is unsigned integer 16, 32 or 64, remove the
# BSCALE/BZERO cards
if (self._has_data and self._standard and
_is_pseudo_unsigned(self.data.dtype)):
for keyword in ('BSCALE', 'BZERO'):
with suppress(KeyError):
del self._header[keyword]
def _writeheader(self, fileobj):
offset = 0
if not fileobj.simulateonly:
with suppress(AttributeError, OSError):
offset = fileobj.tell()
self._header.tofile(fileobj)
try:
size = fileobj.tell() - offset
except (AttributeError, OSError):
size = len(str(self._header))
else:
size = len(str(self._header))
return offset, size
def _writedata(self, fileobj):
# TODO: A lot of the simulateonly stuff should be moved back into the
# _File class--basically it should turn write and flush into a noop
offset = 0
size = 0
if not fileobj.simulateonly:
fileobj.flush()
try:
offset = fileobj.tell()
except OSError:
offset = 0
if self._data_loaded or self._data_needs_rescale:
if self.data is not None:
size += self._writedata_internal(fileobj)
# pad the FITS data block
if size > 0:
padding = _pad_length(size) * self._padding_byte
# TODO: Not that this is ever likely, but if for some odd
# reason _padding_byte is > 0x80 this will fail; but really if
# somebody's custom fits format is doing that, they're doing it
# wrong and should be reprimanded harshly.
fileobj.write(padding.encode('ascii'))
size += len(padding)
else:
# The data has not been modified or does not need need to be
# rescaled, so it can be copied, unmodified, directly from an
# existing file or buffer
size += self._writedata_direct_copy(fileobj)
# flush, to make sure the content is written
if not fileobj.simulateonly:
fileobj.flush()
# return both the location and the size of the data area
return offset, size
def _writedata_internal(self, fileobj):
"""
The beginning and end of most _writedata() implementations are the
same, but the details of writing the data array itself can vary between
HDU types, so that should be implemented in this method.
Should return the size in bytes of the data written.
"""
if not fileobj.simulateonly:
fileobj.writearray(self.data)
return self.data.size * self.data.itemsize
def _writedata_direct_copy(self, fileobj):
"""Copies the data directly from one file/buffer to the new file.
For now this is handled by loading the raw data from the existing data
(including any padding) via a memory map or from an already in-memory
buffer and using Numpy's existing file-writing facilities to write to
the new file.
If this proves too slow a more direct approach may be used.
"""
raw = self._get_raw_data(self._data_size, 'ubyte', self._data_offset)
if raw is not None:
fileobj.writearray(raw)
return raw.nbytes
else:
return 0
# TODO: This is the start of moving HDU writing out of the _File class;
# Though right now this is an internal private method (though still used by
# HDUList, eventually the plan is to have this be moved into writeto()
# somehow...
def _writeto(self, fileobj, inplace=False, copy=False):
try:
dirname = os.path.dirname(fileobj._file.name)
except AttributeError:
dirname = None
with _free_space_check(self, dirname):
self._writeto_internal(fileobj, inplace, copy)
def _writeto_internal(self, fileobj, inplace, copy):
# For now fileobj is assumed to be a _File object
if not inplace or self._new:
header_offset, _ = self._writeheader(fileobj)
data_offset, data_size = self._writedata(fileobj)
# Set the various data location attributes on newly-written HDUs
if self._new:
self._header_offset = header_offset
self._data_offset = data_offset
self._data_size = data_size
return
hdrloc = self._header_offset
hdrsize = self._data_offset - self._header_offset
datloc = self._data_offset
datsize = self._data_size
if self._header._modified:
# Seek to the original header location in the file
self._file.seek(hdrloc)
# This should update hdrloc with he header location in the new file
hdrloc, hdrsize = self._writeheader(fileobj)
# If the data is to be written below with self._writedata, that
# will also properly update the data location; but it should be
# updated here too
datloc = hdrloc + hdrsize
elif copy:
# Seek to the original header location in the file
self._file.seek(hdrloc)
# Before writing, update the hdrloc with the current file position,
# which is the hdrloc for the new file
hdrloc = fileobj.tell()
fileobj.write(self._file.read(hdrsize))
# The header size is unchanged, but the data location may be
# different from before depending on if previous HDUs were resized
datloc = fileobj.tell()
if self._data_loaded:
if self.data is not None:
# Seek through the array's bases for an memmap'd array; we
# can't rely on the _File object to give us this info since
# the user may have replaced the previous mmap'd array
if copy or self._data_replaced:
# Of course, if we're copying the data to a new file
# we don't care about flushing the original mmap;
# instead just read it into the new file
array_mmap = None
else:
array_mmap = _get_array_mmap(self.data)
if array_mmap is not None:
array_mmap.flush()
else:
self._file.seek(self._data_offset)
datloc, datsize = self._writedata(fileobj)
elif copy:
datsize = self._writedata_direct_copy(fileobj)
self._header_offset = hdrloc
self._data_offset = datloc
self._data_size = datsize
self._data_replaced = False
def _close(self, closed=True):
# If the data was mmap'd, close the underlying mmap (this will
# prevent any future access to the .data attribute if there are
# not other references to it; if there are other references then
# it is up to the user to clean those up
if (closed and self._data_loaded and
_get_array_mmap(self.data) is not None):
del self.data
# For backwards-compatibility, though nobody should have
# been using this directly:
_AllHDU = _BaseHDU
# For convenience...
# TODO: register_hdu could be made into a class decorator which would be pretty
# cool, but only once 2.6 support is dropped.
register_hdu = _BaseHDU.register_hdu
unregister_hdu = _BaseHDU.unregister_hdu
class _CorruptedHDU(_BaseHDU):
"""
A Corrupted HDU class.
This class is used when one or more mandatory `Card`s are
corrupted (unparsable), such as the ``BITPIX``, ``NAXIS``, or
``END`` cards. A corrupted HDU usually means that the data size
cannot be calculated or the ``END`` card is not found. In the case
of a missing ``END`` card, the `Header` may also contain the binary
data
.. note::
In future, it may be possible to decipher where the last block
of the `Header` ends, but this task may be difficult when the
extension is a `TableHDU` containing ASCII data.
"""
@property
def size(self):
"""
Returns the size (in bytes) of the HDU's data part.
"""
# Note: On compressed files this might report a negative size; but the
# file is corrupt anyways so I'm not too worried about it.
if self._buffer is not None:
return len(self._buffer) - self._data_offset
return self._file.size - self._data_offset
def _summary(self):
return (self.name, self.ver, 'CorruptedHDU')
def verify(self):
pass
class _NonstandardHDU(_BaseHDU, _Verify):
"""
A Non-standard HDU class.
This class is used for a Primary HDU when the ``SIMPLE`` Card has
a value of `False`. A non-standard HDU comes from a file that
resembles a FITS file but departs from the standards in some
significant way. One example would be files where the numbers are
in the DEC VAX internal storage format rather than the standard
FITS most significant byte first. The header for this HDU should
be valid. The data for this HDU is read from the file as a byte
stream that begins at the first byte after the header ``END`` card
and continues until the end of the file.
"""
_standard = False
@classmethod
def match_header(cls, header):
"""
Matches any HDU that has the 'SIMPLE' keyword but is not a standard
Primary or Groups HDU.
"""
# The SIMPLE keyword must be in the first card
card = header.cards[0]
# The check that 'GROUPS' is missing is a bit redundant, since the
# match_header for GroupsHDU will always be called before this one.
if card.keyword == 'SIMPLE':
if 'GROUPS' not in header and card.value is False:
return True
else:
raise InvalidHDUException
else:
return False
@property
def size(self):
"""
Returns the size (in bytes) of the HDU's data part.
"""
if self._buffer is not None:
return len(self._buffer) - self._data_offset
return self._file.size - self._data_offset
def _writedata(self, fileobj):
"""
Differs from the base class :class:`_writedata` in that it doesn't
automatically add padding, and treats the data as a string of raw bytes
instead of an array.
"""
offset = 0
size = 0
if not fileobj.simulateonly:
fileobj.flush()
try:
offset = fileobj.tell()
except OSError:
offset = 0
if self.data is not None:
if not fileobj.simulateonly:
fileobj.write(self.data)
# flush, to make sure the content is written
fileobj.flush()
size = len(self.data)
# return both the location and the size of the data area
return offset, size
def _summary(self):
return (self.name, self.ver, 'NonstandardHDU', len(self._header))
@lazyproperty
def data(self):
"""
Return the file data.
"""
return self._get_raw_data(self.size, 'ubyte', self._data_offset)
def _verify(self, option='warn'):
errs = _ErrList([], unit='Card')
# verify each card
for card in self._header.cards:
errs.append(card._verify(option))
return errs
class _ValidHDU(_BaseHDU, _Verify):
"""
Base class for all HDUs which are not corrupted.
"""
def __init__(self, data=None, header=None, name=None, ver=None, **kwargs):
super().__init__(data=data, header=header)
# NOTE: private data members _checksum and _datasum are used by the
# utility script "fitscheck" to detect missing checksums.
self._checksum = None
self._checksum_valid = None
self._datasum = None
self._datasum_valid = None
if name is not None:
self.name = name
if ver is not None:
self.ver = ver
@classmethod
def match_header(cls, header):
"""
Matches any HDU that is not recognized as having either the SIMPLE or
XTENSION keyword in its header's first card, but is nonetheless not
corrupted.
TODO: Maybe it would make more sense to use _NonstandardHDU in this
case? Not sure...
"""
return first(header.keys()) not in ('SIMPLE', 'XTENSION')
@property
def size(self):
"""
Size (in bytes) of the data portion of the HDU.
"""
size = 0
naxis = self._header.get('NAXIS', 0)
if naxis > 0:
size = 1
for idx in range(naxis):
size = size * self._header['NAXIS' + str(idx + 1)]
bitpix = self._header['BITPIX']
gcount = self._header.get('GCOUNT', 1)
pcount = self._header.get('PCOUNT', 0)
size = abs(bitpix) * gcount * (pcount + size) // 8
return size
def filebytes(self):
"""
Calculates and returns the number of bytes that this HDU will write to
a file.
"""
f = _File()
# TODO: Fix this once new HDU writing API is settled on
return self._writeheader(f)[1] + self._writedata(f)[1]
def fileinfo(self):
"""
Returns a dictionary detailing information about the locations
of this HDU within any associated file. The values are only
valid after a read or write of the associated file with no
intervening changes to the `HDUList`.
Returns
-------
dict or None
The dictionary details information about the locations of
this HDU within an associated file. Returns `None` when
the HDU is not associated with a file.
Dictionary contents:
========== ================================================
Key Value
========== ================================================
file File object associated with the HDU
filemode Mode in which the file was opened (readonly, copyonwrite,
update, append, ostream)
hdrLoc Starting byte location of header in file
datLoc Starting byte location of data block in file
datSpan Data size including padding
========== ================================================
"""
if hasattr(self, '_file') and self._file:
return {'file': self._file, 'filemode': self._file.mode,
'hdrLoc': self._header_offset, 'datLoc': self._data_offset,
'datSpan': self._data_size}
else:
return None
def copy(self):
"""
Make a copy of the HDU, both header and data are copied.
"""
if self.data is not None:
data = self.data.copy()
else:
data = None
return self.__class__(data=data, header=self._header.copy())
def _verify(self, option='warn'):
errs = _ErrList([], unit='Card')
is_valid = BITPIX2DTYPE.__contains__
# Verify location and value of mandatory keywords.
# Do the first card here, instead of in the respective HDU classes, so
# the checking is in order, in case of required cards in wrong order.
if isinstance(self, ExtensionHDU):
firstkey = 'XTENSION'
firstval = self._extension
else:
firstkey = 'SIMPLE'
firstval = True
self.req_cards(firstkey, 0, None, firstval, option, errs)
self.req_cards('BITPIX', 1, lambda v: (_is_int(v) and is_valid(v)), 8,
option, errs)
self.req_cards('NAXIS', 2,
lambda v: (_is_int(v) and 0 <= v <= 999), 0,
option, errs)
naxis = self._header.get('NAXIS', 0)
if naxis < 1000:
for ax in range(3, naxis + 3):
key = 'NAXIS' + str(ax - 2)
self.req_cards(key, ax,
lambda v: (_is_int(v) and v >= 0),
_extract_number(self._header[key], default=1),
option, errs)