Permalink
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
357 lines (285 sloc) 14.1 KB
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import os
import json
import atexit
import argparse
import traceback
from oslo_config import cfg
from st2common import log as logging
from st2common.constants.keyvalue import SYSTEM_SCOPE
from st2common.logging.misc import set_log_level_for_all_loggers
from st2common.models.api.trigger import TriggerAPI
from st2common.persistence.db_init import db_setup_with_retry
from st2common.util import loader
from st2common.util.config_loader import ContentPackConfigLoader
from st2common.services.triggerwatcher import TriggerWatcher
from st2common.services.trigger_dispatcher import TriggerDispatcherService
from st2reactor.sensor.base import Sensor
from st2reactor.sensor.base import PollingSensor
from st2reactor.sensor import config
from st2common.services.datastore import SensorDatastoreService
from st2common.util.monkey_patch import monkey_patch
from st2common.util.monkey_patch import use_select_poll_workaround
__all__ = [
'SensorWrapper',
'SensorService'
]
monkey_patch()
use_select_poll_workaround(nose_only=False)
class SensorService(object):
"""
Instance of this class is passed to the sensor instance and exposes "public"
methods which can be called by the sensor.
"""
def __init__(self, sensor_wrapper):
self._sensor_wrapper = sensor_wrapper
self._logger = self._sensor_wrapper._logger
self._trigger_dispatcher_service = TriggerDispatcherService(logger=sensor_wrapper._logger)
self._datastore_service = SensorDatastoreService(
logger=self._logger,
pack_name=self._sensor_wrapper._pack,
class_name=self._sensor_wrapper._class_name,
api_username='sensor_service')
self._client = None
@property
def datastore_service(self):
return self._datastore_service
def get_logger(self, name):
"""
Retrieve an instance of a logger to be used by the sensor class.
"""
logger_name = '%s.%s' % (self._sensor_wrapper._logger.name, name)
logger = logging.getLogger(logger_name)
logger.propagate = True
return logger
##################################
# General methods
##################################
def get_user_info(self):
return self._datastore_service.get_user_info()
##################################
# Sensor related methods
##################################
def dispatch(self, trigger, payload=None, trace_tag=None):
# Provided by the parent BaseTriggerDispatcherService class
return self._trigger_dispatcher_service.dispatch(trigger=trigger, payload=payload,
trace_tag=trace_tag,
throw_on_validation_error=False)
def dispatch_with_context(self, trigger, payload=None, trace_context=None):
"""
Method which dispatches the trigger.
:param trigger: Full name / reference of the trigger.
:type trigger: ``str``
:param payload: Trigger payload.
:type payload: ``dict``
:param trace_context: Trace context to associate with Trigger.
:type trace_context: ``st2common.api.models.api.trace.TraceContext``
"""
# Provided by the parent BaseTriggerDispatcherService class
return self._trigger_dispatcher_service.dispatch_with_context(trigger=trigger,
payload=payload,
trace_context=trace_context,
throw_on_validation_error=False)
##################################
# Methods for datastore management
##################################
def list_values(self, local=True, prefix=None):
return self.datastore_service.list_values(local=local, prefix=prefix)
def get_value(self, name, local=True, scope=SYSTEM_SCOPE, decrypt=False):
return self.datastore_service.get_value(name=name, local=local, scope=scope,
decrypt=decrypt)
def set_value(self, name, value, ttl=None, local=True, scope=SYSTEM_SCOPE, encrypt=False):
return self.datastore_service.set_value(name=name, value=value, ttl=ttl, local=local,
scope=scope, encrypt=encrypt)
def delete_value(self, name, local=True, scope=SYSTEM_SCOPE):
return self.datastore_service.delete_value(name=name, local=local, scope=scope)
class SensorWrapper(object):
def __init__(self, pack, file_path, class_name, trigger_types,
poll_interval=None, parent_args=None):
"""
:param pack: Name of the pack this sensor belongs to.
:type pack: ``str``
:param file_path: Path to the sensor module file.
:type file_path: ``str``
:param class_name: Sensor class name.
:type class_name: ``str``
:param trigger_types: A list of references to trigger types which
belong to this sensor.
:type trigger_types: ``list`` of ``str``
:param poll_interval: Sensor poll interval (in seconds).
:type poll_interval: ``int`` or ``None``
:param parent_args: Command line arguments passed to the parent process.
:type parse_args: ``list``
"""
self._pack = pack
self._file_path = file_path
self._class_name = class_name
self._trigger_types = trigger_types or []
self._poll_interval = poll_interval
self._parent_args = parent_args or []
self._trigger_names = {}
# 1. Parse the config with inherited parent args
try:
config.parse_args(args=self._parent_args)
except Exception:
pass
# 2. Establish DB connection
username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
db_setup_with_retry(cfg.CONF.database.db_name, cfg.CONF.database.host,
cfg.CONF.database.port, username=username, password=password,
ssl=cfg.CONF.database.ssl, ssl_keyfile=cfg.CONF.database.ssl_keyfile,
ssl_certfile=cfg.CONF.database.ssl_certfile,
ssl_cert_reqs=cfg.CONF.database.ssl_cert_reqs,
ssl_ca_certs=cfg.CONF.database.ssl_ca_certs,
authentication_mechanism=cfg.CONF.database.authentication_mechanism,
ssl_match_hostname=cfg.CONF.database.ssl_match_hostname)
# 3. Instantiate the watcher
self._trigger_watcher = TriggerWatcher(create_handler=self._handle_create_trigger,
update_handler=self._handle_update_trigger,
delete_handler=self._handle_delete_trigger,
trigger_types=self._trigger_types,
queue_suffix='sensorwrapper_%s_%s' %
(self._pack, self._class_name),
exclusive=True)
# 4. Set up logging
self._logger = logging.getLogger('SensorWrapper.%s.%s' %
(self._pack, self._class_name))
logging.setup(cfg.CONF.sensorcontainer.logging)
if '--debug' in parent_args:
set_log_level_for_all_loggers()
self._sensor_instance = self._get_sensor_instance()
def run(self):
atexit.register(self.stop)
self._trigger_watcher.start()
self._logger.info('Watcher started')
self._logger.info('Running sensor initialization code')
self._sensor_instance.setup()
if self._poll_interval:
message = ('Running sensor in active mode (poll interval=%ss)' %
(self._poll_interval))
else:
message = 'Running sensor in passive mode'
self._logger.info(message)
try:
self._sensor_instance.run()
except Exception as e:
# Include traceback
msg = ('Sensor "%s" run method raised an exception: %s.' %
(self._class_name, str(e)))
self._logger.warn(msg, exc_info=True)
raise Exception(msg)
def stop(self):
# Stop watcher
self._logger.info('Stopping trigger watcher')
self._trigger_watcher.stop()
# Run sensor cleanup code
self._logger.info('Invoking cleanup on sensor')
self._sensor_instance.cleanup()
##############################################
# Event handler methods for the trigger events
##############################################
def _handle_create_trigger(self, trigger):
self._logger.debug('Calling sensor "add_trigger" method (trigger.type=%s)' %
(trigger.type))
self._trigger_names[str(trigger.id)] = trigger
trigger = self._sanitize_trigger(trigger=trigger)
self._sensor_instance.add_trigger(trigger=trigger)
def _handle_update_trigger(self, trigger):
self._logger.debug('Calling sensor "update_trigger" method (trigger.type=%s)' %
(trigger.type))
self._trigger_names[str(trigger.id)] = trigger
trigger = self._sanitize_trigger(trigger=trigger)
self._sensor_instance.update_trigger(trigger=trigger)
def _handle_delete_trigger(self, trigger):
trigger_id = str(trigger.id)
if trigger_id not in self._trigger_names:
return
self._logger.debug('Calling sensor "remove_trigger" method (trigger.type=%s)' %
(trigger.type))
del self._trigger_names[trigger_id]
trigger = self._sanitize_trigger(trigger=trigger)
self._sensor_instance.remove_trigger(trigger=trigger)
def _get_sensor_instance(self):
"""
Retrieve instance of a sensor class.
"""
_, filename = os.path.split(self._file_path)
module_name, _ = os.path.splitext(filename)
try:
sensor_class = loader.register_plugin_class(base_class=Sensor,
file_path=self._file_path,
class_name=self._class_name)
except Exception as e:
tb_msg = traceback.format_exc()
msg = ('Failed to load sensor class from file "%s" (sensor file most likely doesn\'t '
'exist or contains invalid syntax): %s' % (self._file_path, str(e)))
msg += '\n\n' + tb_msg
exc_cls = type(e)
raise exc_cls(msg)
if not sensor_class:
raise ValueError('Sensor module is missing a class with name "%s"' %
(self._class_name))
sensor_class_kwargs = {}
sensor_class_kwargs['sensor_service'] = SensorService(sensor_wrapper=self)
sensor_config = self._get_sensor_config()
sensor_class_kwargs['config'] = sensor_config
if self._poll_interval and issubclass(sensor_class, PollingSensor):
sensor_class_kwargs['poll_interval'] = self._poll_interval
try:
sensor_instance = sensor_class(**sensor_class_kwargs)
except Exception:
self._logger.exception('Failed to instantiate "%s" sensor class' % (self._class_name))
raise Exception('Failed to instantiate "%s" sensor class' % (self._class_name))
return sensor_instance
def _get_sensor_config(self):
config_loader = ContentPackConfigLoader(pack_name=self._pack)
config = config_loader.get_config()
if config:
self._logger.info('Found config for sensor "%s"' % (self._class_name))
else:
self._logger.info('No config found for sensor "%s"' % (self._class_name))
return config
def _sanitize_trigger(self, trigger):
sanitized = TriggerAPI.from_model(trigger).to_dict()
return sanitized
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Sensor runner wrapper')
parser.add_argument('--pack', required=True,
help='Name of the pack this sensor belongs to')
parser.add_argument('--file-path', required=True,
help='Path to the sensor module')
parser.add_argument('--class-name', required=True,
help='Name of the sensor class')
parser.add_argument('--trigger-type-refs', required=False,
help='Comma delimited string of trigger type references')
parser.add_argument('--poll-interval', type=int, default=None, required=False,
help='Sensor poll interval')
parser.add_argument('--parent-args', required=False,
help='Command line arguments passed to the parent process')
args = parser.parse_args()
trigger_types = args.trigger_type_refs
trigger_types = trigger_types.split(',') if trigger_types else []
parent_args = json.loads(args.parent_args) if args.parent_args else []
assert isinstance(parent_args, list)
obj = SensorWrapper(pack=args.pack,
file_path=args.file_path,
class_name=args.class_name,
trigger_types=trigger_types,
poll_interval=args.poll_interval,
parent_args=parent_args)
obj.run()