import warnings from memory_profiler import profile from astropy import __version__ as astropy_version from astropy.io.votable import tree, util from astropy.io.votable.exceptions import ( # noqa warn_or_raise, vo_warn, vo_raise, vo_reraise, warn_unknown_attrs, W06, W07, W08, W09, W10, W11, W12, W13, W15, W17, W18, W19, W20, W21, W22, W26, W27, W28, W29, W32, W33, W35, W36, W37, W38, W40, W41, W42, W43, W44, W45, W50, W52, W53, W54, E06, E08, E09, E10, E11, E12, E13, E15, E16, E17, E18, E19, E20, E21, E22, E23) from astropy.io.votable.table import VERIFY_OPTIONS from astropy.io.votable.tree import ( Element, _IDProperty, _DescriptionProperty, resolve_id, CooSys, TimeSys, Param, Info, Resource, Group, Table, _lookup_by_attr_factory, _lookup_by_id_or_name_factory) from astropy.utils.collections import HomogeneousList from astropy.utils.exceptions import AstropyDeprecationWarning from astropy.utils.xml import iterparser from astropy.utils.xml.writer import XMLWriter class VOTableFile(Element, _IDProperty, _DescriptionProperty): """ VOTABLE_ element: represents an entire file. The keyword arguments correspond to setting members of the same name, documented below. *version* is settable at construction time only, since conformance tests for building the rest of the structure depend on it. """ def __init__(self, ID=None, id=None, config=None, pos=None, version="1.4"): if config is None: config = {} self._config = config self._pos = pos Element.__init__(self) self.ID = resolve_id(ID, id, config, pos) self.description = None self._coordinate_systems = HomogeneousList(CooSys) self._time_systems = HomogeneousList(TimeSys) self._params = HomogeneousList(Param) self._infos = HomogeneousList(Info) self._resources = HomogeneousList(Resource) self._groups = HomogeneousList(Group) version = str(version) if version not in ("1.0", "1.1", "1.2", "1.3", "1.4"): raise ValueError("'version' should be one of '1.0', '1.1', " "'1.2', '1.3', or '1.4'") self._version = version def __repr__(self): n_tables = len(list(self.iter_tables())) return f'... {n_tables} tables ...' @property def version(self): """ The version of the VOTable specification that the file uses. """ return self._version @version.setter def version(self, version): version = str(version) if version not in ('1.1', '1.2', '1.3', '1.4'): raise ValueError( "astropy.io.votable only supports VOTable versions " "1.1, 1.2, 1.3, and 1.4") self._version = version @property def coordinate_systems(self): """ A list of coordinate system descriptions for the file. Must contain only `CooSys` objects. """ return self._coordinate_systems @property def time_systems(self): """ A list of time system descriptions for the file. Must contain only `TimeSys` objects. """ return self._time_systems @property def params(self): """ A list of parameters (constant-valued columns) that apply to the entire file. Must contain only `Param` objects. """ return self._params @property def infos(self): """ A list of informational parameters (key-value pairs) for the entire file. Must only contain `Info` objects. """ return self._infos @property def resources(self): """ A list of resources, in the order they appear in the file. Must only contain `Resource` objects. """ return self._resources @property def groups(self): """ A list of groups, in the order they appear in the file. Only supported as a child of the VOTABLE element in VOTable 1.2 or later. """ return self._groups def _add_param(self, iterator, tag, data, config, pos): param = Param(self, config=config, pos=pos, **data) self.params.append(param) param.parse(iterator, config) def _add_resource(self, iterator, tag, data, config, pos): resource = Resource(config=config, pos=pos, **data) self.resources.append(resource) resource.parse(self, iterator, config) def _add_coosys(self, iterator, tag, data, config, pos): coosys = CooSys(config=config, pos=pos, **data) self.coordinate_systems.append(coosys) coosys.parse(iterator, config) def _add_timesys(self, iterator, tag, data, config, pos): timesys = TimeSys(config=config, pos=pos, **data) self.time_systems.append(timesys) timesys.parse(iterator, config) def _add_info(self, iterator, tag, data, config, pos): info = Info(config=config, pos=pos, **data) self.infos.append(info) info.parse(iterator, config) def _add_group(self, iterator, tag, data, config, pos): if not config.get('version_1_2_or_later'): warn_or_raise(W26, W26, ('GROUP', 'VOTABLE', '1.2'), config, pos) group = Group(self, config=config, pos=pos, **data) self.groups.append(group) group.parse(iterator, config) @profile def parse(self, iterator, config): config['_current_table_number'] = 0 for start, tag, data, pos in iterator: if start: if tag == 'xml': pass elif tag == 'VOTABLE': if 'version' not in data: warn_or_raise(W20, W20, self.version, config, pos) config['version'] = self.version else: config['version'] = self._version = data['version'] if config['version'].lower().startswith('v'): warn_or_raise( W29, W29, config['version'], config, pos) self._version = config['version'] = config['version'][1:] # noqa if config['version'] not in ('1.1', '1.2', '1.3', '1.4'): vo_warn(W21, config['version'], config, pos) if 'xmlns' in data: # Starting with VOTable 1.3, namespace URIs stop # incrementing with minor version changes. See # this IVOA note for more info: # http://www.ivoa.net/documents/Notes/XMLVers/20180529/ # # If this policy is in place for major version 2, # then this logic will need tweaking. if config['version'] in ('1.3', '1.4'): ns_version = '1.3' else: ns_version = config['version'] correct_ns = ( 'http://www.ivoa.net/xml/VOTable/v{}'.format( ns_version)) if data['xmlns'] != correct_ns: vo_warn( W41, (correct_ns, data['xmlns']), config, pos) else: vo_warn(W42, (), config, pos) break else: vo_raise(E19, (), config, pos) config['version_1_1_or_later'] = \ util.version_compare(config['version'], '1.1') >= 0 config['version_1_2_or_later'] = \ util.version_compare(config['version'], '1.2') >= 0 config['version_1_3_or_later'] = \ util.version_compare(config['version'], '1.3') >= 0 config['version_1_4_or_later'] = \ util.version_compare(config['version'], '1.4') >= 0 tag_mapping = { 'PARAM': self._add_param, 'RESOURCE': self._add_resource, 'COOSYS': self._add_coosys, 'TIMESYS': self._add_timesys, 'INFO': self._add_info, 'DEFINITIONS': self._add_definitions, 'DESCRIPTION': self._ignore_add, 'GROUP': self._add_group} for start, tag, data, pos in iterator: if start: tag_mapping.get(tag, self._add_unknown_tag)( iterator, tag, data, config, pos) elif tag == 'DESCRIPTION': if self.description is not None: warn_or_raise(W17, W17, 'VOTABLE', config, pos) self.description = data or None if not len(self.resources) and config['version_1_2_or_later']: warn_or_raise(W53, W53, (), config, pos) return self def to_xml(self, fd, compressed=False, tabledata_format=None, _debug_python_based_parser=False, _astropy_version=None): """ Write to an XML file. Parameters ---------- fd : str path or writable file-like object Where to write the file. compressed : bool, optional When `True`, write to a gzip-compressed file. (Default: `False`) tabledata_format : str, optional Override the format of the table(s) data to write. Must be one of ``tabledata`` (text representation), ``binary`` or ``binary2``. By default, use the format that was specified in each `Table` object as it was created or read in. See :ref:`votable-serialization`. """ if tabledata_format is not None: if tabledata_format.lower() not in ( 'tabledata', 'binary', 'binary2'): raise ValueError(f"Unknown format type '{format}'") kwargs = { 'version': self.version, 'version_1_1_or_later': util.version_compare(self.version, '1.1') >= 0, 'version_1_2_or_later': util.version_compare(self.version, '1.2') >= 0, 'version_1_3_or_later': util.version_compare(self.version, '1.3') >= 0, 'version_1_4_or_later': util.version_compare(self.version, '1.4') >= 0, 'tabledata_format': tabledata_format, '_debug_python_based_parser': _debug_python_based_parser, '_group_number': 1} with util.convert_to_writable_filelike( fd, compressed=compressed) as fd: w = XMLWriter(fd) version = self.version if _astropy_version is None: lib_version = astropy_version else: lib_version = _astropy_version xml_header = """ \n""" w.write(xml_header.lstrip().format(**locals())) with w.tag('VOTABLE', {'version': version, 'xmlns:xsi': "http://www.w3.org/2001/XMLSchema-instance", 'xsi:noNamespaceSchemaLocation': f"http://www.ivoa.net/xml/VOTable/v{version}", 'xmlns': f"http://www.ivoa.net/xml/VOTable/v{version}"}): if self.description is not None: w.element("DESCRIPTION", self.description, wrap=True) element_sets = [self.coordinate_systems, self.time_systems, self.params, self.infos, self.resources] if kwargs['version_1_2_or_later']: element_sets[0] = self.groups for element_set in element_sets: for element in element_set: element.to_xml(w, **kwargs) def iter_tables(self): """ Iterates over all tables in the VOTable file in a "flat" way, ignoring the nesting of resources etc. """ for resource in self.resources: for table in resource.iter_tables(): yield table def get_first_table(self): """ Often, you know there is only one table in the file, and that's all you need. This method returns that first table. """ for table in self.iter_tables(): if not table.is_empty(): return table raise IndexError("No table found in VOTABLE file.") get_table_by_id = _lookup_by_attr_factory( 'ID', True, 'iter_tables', 'TABLE', """ Looks up a TABLE_ element by the given ID. Used by the table "ref" attribute. """) get_tables_by_utype = _lookup_by_attr_factory( 'utype', False, 'iter_tables', 'TABLE', """ Looks up a TABLE_ element by the given utype, and returns an iterator emitting all matches. """) def get_table_by_index(self, idx): """ Get a table by its ordinal position in the file. """ for i, table in enumerate(self.iter_tables()): if i == idx: return table raise IndexError( f"No table at index {idx:d} found in VOTABLE file.") def iter_fields_and_params(self): """ Recursively iterate over all FIELD_ and PARAM_ elements in the VOTABLE_ file. """ for resource in self.resources: for field in resource.iter_fields_and_params(): yield field get_field_by_id = _lookup_by_attr_factory( 'ID', True, 'iter_fields_and_params', 'FIELD', """ Looks up a FIELD_ element by the given ID_. Used by the field's "ref" attribute. """) get_fields_by_utype = _lookup_by_attr_factory( 'utype', False, 'iter_fields_and_params', 'FIELD', """ Looks up a FIELD_ element by the given utype and returns an iterator emitting all matches. """) get_field_by_id_or_name = _lookup_by_id_or_name_factory( 'iter_fields_and_params', 'FIELD', """ Looks up a FIELD_ element by the given ID_ or name. """) def iter_values(self): """ Recursively iterate over all VALUES_ elements in the VOTABLE_ file. """ for field in self.iter_fields_and_params(): yield field.values get_values_by_id = _lookup_by_attr_factory( 'ID', True, 'iter_values', 'VALUES', """ Looks up a VALUES_ element by the given ID. Used by the values "ref" attribute. """) def iter_groups(self): """ Recursively iterate over all GROUP_ elements in the VOTABLE_ file. """ for table in self.iter_tables(): for group in table.iter_groups(): yield group get_group_by_id = _lookup_by_attr_factory( 'ID', True, 'iter_groups', 'GROUP', """ Looks up a GROUP_ element by the given ID. Used by the group's "ref" attribute """) get_groups_by_utype = _lookup_by_attr_factory( 'utype', False, 'iter_groups', 'GROUP', """ Looks up a GROUP_ element by the given utype and returns an iterator emitting all matches. """) def iter_coosys(self): """ Recursively iterate over all COOSYS_ elements in the VOTABLE_ file. """ for coosys in self.coordinate_systems: yield coosys for resource in self.resources: for coosys in resource.iter_coosys(): yield coosys get_coosys_by_id = _lookup_by_attr_factory( 'ID', True, 'iter_coosys', 'COOSYS', """Looks up a COOSYS_ element by the given ID.""") def iter_timesys(self): """ Recursively iterate over all TIMESYS_ elements in the VOTABLE_ file. """ for timesys in self.time_systems: yield timesys for resource in self.resources: for timesys in resource.iter_timesys(): yield timesys get_timesys_by_id = _lookup_by_attr_factory( 'ID', True, 'iter_timesys', 'TIMESYS', """Looks up a TIMESYS_ element by the given ID.""") def iter_info(self): """ Recursively iterate over all INFO_ elements in the VOTABLE_ file. """ for info in self.infos: yield info for resource in self.resources: for info in resource.iter_info(): yield info get_info_by_id = _lookup_by_attr_factory( 'ID', True, 'iter_info', 'INFO', """Looks up a INFO element by the given ID.""") def set_all_tables_format(self, format): """ Set the output storage format of all tables in the file. """ for table in self.iter_tables(): table.format = format @classmethod def from_table(cls, table, table_id=None): """ Create a `VOTableFile` instance from a given `astropy.table.Table` instance. Parameters ---------- table_id : str, optional Set the given ID attribute on the returned Table instance. """ votable_file = cls() resource = Resource() votable = Table.from_table(votable_file, table) if table_id is not None: votable.ID = table_id resource.tables.append(votable) votable_file.resources.append(resource) return votable_file @profile def parse(source, columns=None, invalid='exception', verify=None, chunk_size=tree.DEFAULT_CHUNK_SIZE, table_number=None, table_id=None, filename=None, unit_format=None, datatype_mapping=None, _debug_python_based_parser=False): """ Parses a VOTABLE_ xml file (or file-like object), and returns a `~astropy.io.votable.tree.VOTableFile` object. Parameters ---------- source : str or readable file-like object Path or file object containing a VOTABLE_ xml file. columns : sequence of str, optional List of field names to include in the output. The default is to include all fields. invalid : str, optional One of the following values: - 'exception': throw an exception when an invalid value is encountered (default) - 'mask': mask out invalid values verify : {'ignore', 'warn', 'exception'}, optional When ``'exception'``, raise an error when the file violates the spec, otherwise either issue a warning (``'warn'``) or silently continue (``'ignore'``). Warnings may be controlled using the standard Python mechanisms. See the `warnings` module in the Python standard library for more information. When not provided, uses the configuration setting ``astropy.io.votable.verify``, which defaults to 'ignore'. .. versionchanged:: 4.0 ``verify`` replaces the ``pedantic`` argument, which will be deprecated in future. chunk_size : int, optional The number of rows to read before converting to an array. Higher numbers are likely to be faster, but will consume more memory. table_number : int, optional The number of table in the file to read in. If `None`, all tables will be read. If a number, 0 refers to the first table in the file, and only that numbered table will be parsed and read in. Should not be used with ``table_id``. table_id : str, optional The ID of the table in the file to read in. Should not be used with ``table_number``. filename : str, optional A filename, URL or other identifier to use in error messages. If *filename* is None and *source* is a string (i.e. a path), then *source* will be used as a filename for error messages. Therefore, *filename* is only required when source is a file-like object. unit_format : str, astropy.units.format.Base instance or None, optional The unit format to use when parsing unit attributes. If a string, must be the name of a unit formatter. The built-in formats include ``generic``, ``fits``, ``cds``, and ``vounit``. A custom formatter may be provided by passing a `~astropy.units.UnitBase` instance. If `None` (default), the unit format to use will be the one specified by the VOTable specification (which is ``cds`` up to version 1.2 of VOTable, and (probably) ``vounit`` in future versions of the spec). datatype_mapping : dict of str to str, optional A mapping of datatype names to valid VOTable datatype names. For example, if the file being read contains the datatype "unsignedInt" (an invalid datatype in VOTable), include the mapping ``{"unsignedInt": "long"}``. Returns ------- votable : `~astropy.io.votable.tree.VOTableFile` object See also -------- astropy.io.votable.exceptions : The exceptions this function may raise. """ from astropy.io.votable import conf invalid = invalid.lower() if invalid not in ('exception', 'mask'): raise ValueError("accepted values of ``invalid`` are: " "``'exception'`` or ``'mask'``.") if verify is None: # NOTE: since the pedantic argument isn't fully deprecated yet, we need # to catch the deprecation warning that occurs when accessing the # configuration item, but only if it is for the pedantic option in the # [io.votable] section. with warnings.catch_warnings(): warnings.filterwarnings( "ignore", r"Config parameter \'pedantic\' in section \[io.votable\]", AstropyDeprecationWarning) conf_verify_lowercase = conf.verify.lower() # We need to allow verify to be booleans as strings since the # configuration framework doesn't make it easy/possible to have mixed # types. if conf_verify_lowercase in ['false', 'true']: verify = conf_verify_lowercase == 'true' else: verify = conf_verify_lowercase if isinstance(verify, bool): verify = 'exception' if verify else 'warn' elif verify not in VERIFY_OPTIONS: raise ValueError('verify should be one of {}'.format( '/'.join(VERIFY_OPTIONS))) if datatype_mapping is None: datatype_mapping = {} config = { 'columns': columns, 'invalid': invalid, 'verify': verify, 'chunk_size': chunk_size, 'table_number': table_number, 'filename': filename, 'unit_format': unit_format, 'datatype_mapping': datatype_mapping } if filename is None and isinstance(source, str): config['filename'] = source with iterparser.get_xml_iterator( source, _debug_python_based_parser=_debug_python_based_parser) as iterator: return VOTableFile( config=config, pos=(1, 1)).parse(iterator, config) if __name__ == '__main__': columns = ['phot_g_mean_mag', 'parallax'] table = parse("async_20190630210155.vot", columns=columns, table_number=0)