diff --git a/db/stationlite.db.empty b/db/stationlite.db.empty index 801ad9a..ac98b77 100644 Binary files a/db/stationlite.db.empty and b/db/stationlite.db.empty differ diff --git a/eidangservices/settings.py b/eidangservices/settings.py index 83d6586..446ef24 100644 --- a/eidangservices/settings.py +++ b/eidangservices/settings.py @@ -206,9 +206,9 @@ 'uri_path_config_vnet': '/eidaws/routing/1/localconfig', 'static_file': ''}, 'wfcatalog': { - 'url': 'http://eida.bgr.de/eidaws/wfcatalog/alpha/query', + 'url': 'http://eida.bgr.de/eidaws/wfcatalog/1/query', 'server': 'http://eida.bgr.de', - 'uri_path_query': "/eidaws/wfcatalog/alpha/query"} + 'uri_path_query': "/eidaws/wfcatalog/1/query"} } }, 'testquerysncls': { diff --git a/eidangservices/stationlite/engine/db.py b/eidangservices/stationlite/engine/db.py index 9937c04..c291a4b 100644 --- a/eidangservices/stationlite/engine/db.py +++ b/eidangservices/stationlite/engine/db.py @@ -32,31 +32,14 @@ from builtins import * # noqa -import collections import logging from contextlib import contextmanager from sqlalchemy.orm import scoped_session, sessionmaker -from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound - -from eidangservices import settings from eidangservices.stationlite.engine import orm from eidangservices.utils.error import Error, ErrorWithTraceback -from eidangservices.utils.sncl import (none_as_max, max_as_none, - StreamEpochs, StreamEpochsHandler) - - -# TODO(damb): Find a more elegant solution for CACHED_SERVICES workaround. -CACHED_SERVICES = ('station', 'dataselect', 'wfcatalog') - -CACHED_SERVICES_FDSN = ('station', 'dataselect') -CACHED_SERVICES_EIDA = ('wfcatalog',) -CACHED_SERVICES = { - 'fdsn': CACHED_SERVICES_FDSN, - 'eida': CACHED_SERVICES_EIDA -} logger = logging.getLogger(__name__) @@ -128,102 +111,8 @@ def session_guard(session): finally: session.close() +# session_guard () -def get_cached_services(): - retval = [] - for k, v in CACHED_SERVICES.items(): - retval.extend(v) - return retval - - -def init(session): - """initialize database tables""" - - def _lookup_service(services, name, std): - return next( - (x for x in services if x.name == name and x.standard == std), - None) - - # _lookup_service () - - # populate db tables from the mediatorws configuration (i.e. currently - # settings.py) - logger.debug('Collecting mappings ...') - _services = [] - for service_std, services in CACHED_SERVICES.items(): - for s_name in services: - _services.append(orm.Service(name=s_name, - standard=service_std)) - - _nodes = [] - _endpoints = [] - - logger.debug('Services available: %s' % _services) - - for node_name, node_par in settings.EIDA_NODES.items(): - - try: - n = orm.Node(name=node_name, - description=node_par['name']) - - # add services to node - # NOTE(damb): Only such services are added which are both - # cached and have configured parameters in settings.py - - # fdsn services - for s, v in node_par['services']['fdsn'].items(): - _service = _lookup_service(_services, s, 'fdsn') - if v and _service: - logger.debug("Adding service '{}' to '{}'".format(_service, n)) - n.services.append(_service) - - # eida services - for s, v in node_par['services']['eida'].items(): - _service = _lookup_service(_services, s, 'eida') - if v and _service: - logger.debug("Adding service '{}' to '{}'".format(_service, n)) - n.services.append(_service) - - except KeyError as err: - raise MissingNodeConfigParam(err) - - _nodes.append(n) - - # create endpoints and add service - try: - # fdsn services - for s, v in node_par['services']['fdsn'].items(): - if v and _lookup_service(_services, s, 'fdsn'): - e = orm.Endpoint( - url='{}/fdsnws/{}/1/query'.format( - node_par['services']['fdsn']['server'], s), - service=_lookup_service(_services, s, 'fdsn')) - - logger.debug('Created endpoint %r' % e) - _endpoints.append(e) - - # eida services - for s, v in node_par['services']['eida'].items(): - if (v['server'] and - _lookup_service(_services, s, 'eida')): - e = orm.Endpoint( - url='{}{}'.format(v['server'], - v['uri_path_query']), - service=_lookup_service(_services, s, 'eida')) - - logger.debug('Created endpoint %r' % e) - _endpoints.append(e) - - except KeyError as err: - raise InvalidEndpointConfig(err) - - payload = _nodes - payload.extend(_endpoints) - - with session_guard(session) as s: - s.add_all(payload) - -# init () def clean(session, timestamp): """ @@ -261,9 +150,8 @@ def clean(session, timestamp): vnets_active = set( session.query(orm.StreamEpochGroup).\ - filter(orm.StreamEpochGroup.oid.in_(vnets_active)).\ - all()) - + filter(orm.StreamEpochGroup.oid.in_(vnets_active)).\ + all()) vnets_all = set(session.query(orm.StreamEpochGroup).all()) diff --git a/eidangservices/stationlite/engine/orm.py b/eidangservices/stationlite/engine/orm.py index 31dd804..aeb961d 100644 --- a/eidangservices/stationlite/engine/orm.py +++ b/eidangservices/stationlite/engine/orm.py @@ -34,7 +34,7 @@ import datetime from sqlalchemy import (Column, Integer, Float, String, Unicode, DateTime, - ForeignKey, Table) + ForeignKey) from sqlalchemy.ext.declarative import declared_attr, declarative_base from sqlalchemy.orm import relationship @@ -56,11 +56,12 @@ def __tablename__(cls): # class Base + class EpochMixin(object): @declared_attr def starttime(cls): - return Column(DateTime, nullable=False, index=True) + return Column(DateTime, nullable=False, index=True) @declared_attr def endtime(cls): @@ -82,48 +83,12 @@ def lastseen(cls): # ----------------------------------------------------------------------------- ORMBase = declarative_base(cls=Base) -node_service_relation = Table( - 'node_service_relation', ORMBase.metadata, - Column('node_ref', Integer, ForeignKey('service.oid')), - Column('service_ref', Integer, ForeignKey('node.oid'))) - - -class Node(ORMBase): - - oid = Column(Integer, primary_key=True) - name = Column(String(LENGTH_STD_CODE), nullable=False, unique=True) - description = Column(Unicode(LENGTH_DESCRIPTION)) - - networks = relationship('NodeNetworkInventory', back_populates='node') - services = relationship('Service', - secondary=node_service_relation, - back_populates='nodes') - - def __repr__(self): - return '' % (self.name, - self.description) - -# class Node - - -class NodeNetworkInventory(LastSeenMixin, ORMBase): - - # association object pattern - node_ref = Column(Integer, ForeignKey('node.oid')) - network_ref = Column(Integer, ForeignKey('network.oid')) - - node = relationship('Node', back_populates='networks') - network = relationship('Network', back_populates='nodes') - -# class NodeNetworkInventory - class Network(ORMBase): name = Column(String(LENGTH_STD_CODE), nullable=False, index=True) network_epochs = relationship('NetworkEpoch', back_populates='network') - nodes = relationship('NodeNetworkInventory', back_populates='network') channel_epochs = relationship('ChannelEpoch', back_populates='network') stream_epochs = relationship('StreamEpoch', back_populates='network') @@ -136,7 +101,8 @@ def __repr__(self): class NetworkEpoch(EpochMixin, LastSeenMixin, ORMBase): - network_ref = Column(Integer, ForeignKey('network.oid')) + network_ref = Column(Integer, ForeignKey('network.oid'), + index=True) description = Column(Unicode(LENGTH_DESCRIPTION)) network = relationship('Network', back_populates='network_epochs') @@ -146,8 +112,10 @@ class NetworkEpoch(EpochMixin, LastSeenMixin, ORMBase): class ChannelEpoch(EpochMixin, LastSeenMixin, ORMBase): - network_ref = Column(Integer, ForeignKey('network.oid')) - station_ref = Column(Integer, ForeignKey('station.oid')) + network_ref = Column(Integer, ForeignKey('network.oid'), + index=True) + station_ref = Column(Integer, ForeignKey('station.oid'), + index=True) channel = Column(String(LENGTH_CHANNEL_CODE), nullable=False, index=True) locationcode = Column(String(LENGTH_LOCATION_CODE), nullable=False, @@ -187,7 +155,8 @@ def __repr__(self): class StationEpoch(EpochMixin, LastSeenMixin, ORMBase): - station_ref = Column(Integer, ForeignKey('station.oid')) + station_ref = Column(Integer, ForeignKey('station.oid'), + index=True) description = Column(Unicode(LENGTH_DESCRIPTION)) longitude = Column(Float, nullable=False, index=True) latitude = Column(Float, nullable=False, index=True) @@ -199,8 +168,10 @@ class StationEpoch(EpochMixin, LastSeenMixin, ORMBase): class Routing(EpochMixin, LastSeenMixin, ORMBase): - channel_epoch_ref = Column(Integer, ForeignKey('channelepoch.oid')) - endpoint_ref = Column(Integer, ForeignKey('endpoint.oid')) + channel_epoch_ref = Column(Integer, ForeignKey('channelepoch.oid'), + index=True) + endpoint_ref = Column(Integer, ForeignKey('endpoint.oid'), + index=True) channel_epoch = relationship('ChannelEpoch', back_populates='endpoints') endpoint = relationship('Endpoint', back_populates='channel_epochs') @@ -214,7 +185,8 @@ def __repr__(self): class Endpoint(ORMBase): - service_ref = Column(Integer, ForeignKey('service.oid')) + service_ref = Column(Integer, ForeignKey('service.oid'), + index=True) url = Column(String(LENGTH_URL), nullable=False) # many to many ChannelEpoch<->Endpoint @@ -231,16 +203,11 @@ def __repr__(self): class Service(ORMBase): name = Column(String(LENGTH_STD_CODE), nullable=False, unique=True) - standard = Column(String(LENGTH_STD_CODE), nullable=False) endpoints = relationship('Endpoint', back_populates='service') - nodes = relationship('Node', - secondary=node_service_relation, - back_populates='services') - def __repr__(self): - return '' % (self.name, self.standard) + return '' % self.name # class Service @@ -261,10 +228,13 @@ def __repr__(self): # elegantly class StreamEpoch(EpochMixin, LastSeenMixin, ORMBase): - network_ref = Column(Integer, ForeignKey('network.oid')) - station_ref = Column(Integer, ForeignKey('station.oid')) + network_ref = Column(Integer, ForeignKey('network.oid'), + index=True) + station_ref = Column(Integer, ForeignKey('station.oid'), + index=True) stream_epoch_group_ref = Column(Integer, - ForeignKey('streamepochgroup.oid')) + ForeignKey('streamepochgroup.oid'), + index=True) channel = Column(String(LENGTH_CHANNEL_CODE), nullable=False, index=True) location = Column(String(LENGTH_LOCATION_CODE), nullable=False, diff --git a/eidangservices/stationlite/example/db/stationlite_2018-03-21.db b/eidangservices/stationlite/example/db/stationlite_2018-05-01.db similarity index 69% rename from eidangservices/stationlite/example/db/stationlite_2018-03-21.db rename to eidangservices/stationlite/example/db/stationlite_2018-05-01.db index 863e76a..1d6ed89 100644 Binary files a/eidangservices/stationlite/example/db/stationlite_2018-03-21.db and b/eidangservices/stationlite/example/db/stationlite_2018-05-01.db differ diff --git a/eidangservices/stationlite/harvest/harvest.py b/eidangservices/stationlite/harvest/harvest.py index b06a84c..c882785 100644 --- a/eidangservices/stationlite/harvest/harvest.py +++ b/eidangservices/stationlite/harvest/harvest.py @@ -41,7 +41,7 @@ from lxml import etree from obspy import read_inventory, UTCDateTime from sqlalchemy import inspect -from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound +from sqlalchemy.orm.exc import MultipleResultsFound from sqlalchemy.exc import OperationalError from eidangservices import settings, utils @@ -50,13 +50,24 @@ node_generator, RequestsError, NoContent) from eidangservices.utils.app import CustomParser, App, AppError -from eidangservices.utils.error import Error, ErrorWithTraceback +from eidangservices.utils.error import Error from eidangservices.utils.sncl import Stream from eidangservices.utils.schema import StreamEpochSchema +# TODO(damb): +# - fix *cached_services* issue __version__ = utils.get_version("stationlite") +CACHED_SERVICES = ('station', 'dataselect', 'wfcatalog') + + +def get_cached_services(): + return [s for s in CACHED_SERVICES] + +# get_cached_services () + + # ---------------------------------------------------------------------------- class NothingToDo(Error): """Nothing to do.""" @@ -76,9 +87,6 @@ class Harvester(object): class HarvesterError(Error): """Base harvester error ({}).""" - class NotConfigured(ErrorWithTraceback): - """Harvester not configured.""" - class InvalidNodeConfiguration(HarvesterError): """DB Node configuration is not valid for node '{}'.""" @@ -92,16 +100,12 @@ def __init__(self, node_id, url_routing_config): self.node_id = node_id self._url_config = url_routing_config self._config = None - self._node = None self.logger = logging.getLogger(self.LOGGER) - self.is_configured = False @property def node(self): - if self.is_configured: - return self._node - return None + return self.node_id @property def url(self): @@ -118,18 +122,6 @@ def config(self): # config () - def configure(self, session): - try: - self._node = session.query(orm.Node).\ - filter(orm.Node.name==self.node_id).\ - one() - except (NoResultFound, MultipleResultsFound) as err: - raise self.InvalidNodeConfiguration(self.node_id) - - self.is_configured = True - - # configure () - def harvest(self, session): raise NotImplementedError @@ -152,26 +144,18 @@ class RoutingHarvester(Harvester): This harvester does not rely on the EIDA routing service anymore. """ + STATION_TAG = 'station' class StationXMLParsingError(Harvester.HarvesterError): """Error while parsing StationXML: ({})""" - def __init__(self, node_id, url_routing_config, url_fdsn_station): - super().__init__(node_id, url_routing_config) - self.url_fdsn_station = url_fdsn_station - - # __init__ () - def harvest(self, session): """Harvest the routing configuration.""" - if not self.is_configured: - raise self.NotConfigured() route_tag = '{}route'.format(self.NS_ROUTINGXML) - _cached_services = db.get_cached_services() + _cached_services = get_cached_services() _cached_services = ['{}{}'.format(self.NS_ROUTINGXML, s) for s in _cached_services] - self.logger.debug('Harvesting routes for %s.' % self.node) # event driven parsing for event, route_element in etree.iterparse(self.config, @@ -186,11 +170,32 @@ def harvest(self, session): query_params = '&'.join(['{}={}'.format(query_param, query_val) for query_param, query_val in attrs.items()]) + + # extract fdsn-station service url for each route + urls = set([ + e.get('address') for e in route_element.iter( + '{}{}'.format(self.NS_ROUTINGXML, self.STATION_TAG)) + if int(e.get('priority', 0)) == 1]) + + if (len(urls) == 0 and len([e for e in + route_element.iter() if int(e.get('priority', + 0)) == 1]) == 0): + # NOTE(damb): Skip routes which contain exclusively + # 'priority == 2' services + continue + + elif len(urls) > 1: + # NOTE(damb): Currently we cannot handle multiple + # fdsn-station urls i.e. for multiple routed epochs + raise self.IntegrityError( + ('Missing element for ' + '{} ({}).'.format(route_element, urls))) + _url_fdsn_station = '{}?{}&level=channel'.format( - self.url_fdsn_station, query_params) + urls.pop(), query_params) # XXX(damb): For every single route resolve FDSN wildcards - # using the EIDA node's station service. + # using the route's station service. # XXX(damb): Use the station service's GET method since the # POST method requires temporal constraints (both starttime and # endtime). @@ -212,8 +217,6 @@ def harvest(self, session): continue # NOTE(damb): currently only consider CACHED_SERVICEs - # TODO(damb): To be refactored - use the Station service - # defined within for service_element in route_element.iter(*_cached_services): # only consider priority=1 priority = service_element.get('priority') @@ -230,46 +233,24 @@ def harvest(self, session): raise self.RoutingConfigXMLParsingError( "Missing 'address' attrib.") - # fetch Endpoint object from DB - try: - endpoint = session.query(orm.Endpoint).\ - filter(orm.Endpoint.url==endpoint_url).\ - one() - except (NoResultFound, MultipleResultsFound) as err: - raise self.IntegrityError(err) + service = self._emerge_service(session, service_tag) + endpoint = self._emerge_endpoint(session, endpoint_url, + service) self.logger.debug('Processing routes for %r ' '(service=%s, endpoint=%s).' % (stream, service_element.tag, endpoint.url)) - for net in nets: - self.logger.debug('Checking Network<->Node relation ' - '{}<->{}.'.format(net, self.node)) - try: - _ = session.query(orm.NodeNetworkInventory).\ - filter(orm.NodeNetworkInventory.network == - net).\ - filter(orm.NodeNetworkInventory.node == - self.node).\ - one() - except NoResultFound: - # create a new relation - it will be - # automatically added to the session - r = orm.NodeNetworkInventory(network=net, - node=self.node) - self.logger.debug( - 'Created relation {0!r}'.format(r)) - except MultipleResultsFound as err: - raise self.IntegrityError(err) - try: - routing_starttime = utils.from_fdsnws_datetime( - service_element.get('start')) + routing_starttime = UTCDateTime( + service_element.get('start'), + iso8601=True).datetime routing_endtime = service_element.get('end') # reset endtime due to 'end=""' routing_endtime = ( - utils.from_fdsnws_datetime(routing_endtime) if + UTCDateTime(routing_endtime, + iso8601=True).datetime if routing_endtime is not None and routing_endtime.strip() else None) except Exception as err: @@ -341,6 +322,52 @@ def _harvest_from_stationxml(self, session, station_xml): # _harvest_from_stationxml () + def _emerge_service(self, session, service_tag): + """ + Factory method for a orm.Service object. + """ + try: + service = session.query(orm.Service).\ + filter(orm.Service.name==service_tag).\ + one_or_none() + except MultipleResultsFound as err: + raise self.IntegrityError(err) + + if service is None: + service = orm.Service(name=service_tag) + session.add(service) + self.logger.debug( + "Created new service object '{}'".format( + service)) + + return service + + # _emerge_service () + + def _emerge_endpoint(self, session, url, service): + """ + Factory method for a orm.Endpoint object. + """ + + try: + endpoint = session.query(orm.Endpoint).\ + filter(orm.Endpoint.url==url).\ + one_or_none() + except MultipleResultsFound as err: + raise self.IntegrityError(err) + + if endpoint is None: + endpoint = orm.Endpoint(url=url, + service=service) + session.add(endpoint) + self.logger.debug( + "Created new endpoint object '{}'".format( + endpoint)) + + return endpoint + + # _emerge_endpoint () + def _emerge_network(self, session, network): """ Factory method for a orm.Network object. @@ -625,8 +652,6 @@ def __init__(self, node_id, url_vnet_config): super().__init__(node_id, url_vnet_config) def harvest(self, session): - if not self.is_configured: - raise self.NotConfigured() vnet_tag = '{}vnetwork'.format(self.NS_ROUTINGXML) stream_tag = '{}stream'.format(self.NS_ROUTINGXML) @@ -966,10 +991,6 @@ def _harvest_routes(self, Session): node_par['services']['eida']['routing']['server'] + node_par['services']['eida']['routing']\ ['uri_path_config']) - url_fdsn_station = ( - node_par['services']['fdsn']['server'] + - settings.FDSN_STATION_PATH + - settings.FDSN_QUERY_METHOD_TOKEN) self.logger.info( 'Processing routes from EIDA node %r.' % node_name) @@ -978,14 +999,11 @@ def _harvest_routes(self, Session): # to the RoutingHarvester. The Harvester should fetch # this information from every single . # harvest the routing configuration - h = RoutingHarvester( - node_name, url_routing_config, url_fdsn_station) + h = RoutingHarvester(node_name, url_routing_config) session=Session() # XXX(damb): Maintain sessions within the scope of a # harvesting process. - h.configure(session) - with db.session_guard(session) as _session: h.harvest(_session) @@ -1016,8 +1034,6 @@ def _harvest_vnetworks(self, Session): session=Session() # XXX(damb): Maintain sessions within the scope of a # harvesting process. - h.configure(session) - with db.session_guard(session) as _session: h.harvest(_session) diff --git a/eidangservices/stationlite/harvest/misc.py b/eidangservices/stationlite/harvest/misc.py index ba19cf6..3b1fc82 100644 --- a/eidangservices/stationlite/harvest/misc.py +++ b/eidangservices/stationlite/harvest/misc.py @@ -46,7 +46,7 @@ from eidangservices import settings, utils from eidangservices.utils.app import CustomParser, App, AppError from eidangservices.utils.error import Error -from eidangservices.stationlite.engine import db, orm +from eidangservices.stationlite.engine import orm __version__ = utils.get_version("stationlite") @@ -110,7 +110,6 @@ def run(self): # configure SQLAlchemy logging # log_level = self.logger.getEffectiveLevel() # logging.getLogger('sqlalchemy.engine').setLevel(log_level) - exit_code = utils.ExitCodes.EXIT_SUCCESS try: self.logger.info('{}: Version {}'.format(type(self).__name__, @@ -122,15 +121,10 @@ def run(self): os.remove(self.args.path_db) engine = create_engine('sqlite:///{}'.format(self.args.path_db)) - Session = db.ScopedSession() - Session.configure(bind=engine) - session = Session() - # create db tables self.logger.debug('Creating database tables ...') orm.ORMBase.metadata.create_all(engine) - db.init(session) self.logger.info( "DB '{}' successfully initialized.".format(self.args.path_db))