diff --git a/.travis.yml b/.travis.yml index db6608ce..d6f1f475 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,8 @@ matrix: - python: "nightly" fast_finish: true -# Route build to container-based infrastructure -sudo: false +# Pin Ubuntu to Trusty for the moment for Python 2.6 support +dist: trusty # Cache the dependencies installed by pip cache: pip diff --git a/CHANGELOG b/CHANGELOG index 1bd5e6f9..05176934 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,14 @@ Changelog for ssm ================= +* Thu Aug 01 2019 Adrian Coveney - 2.4.0-1 + - Added support for sending and receiving messages using the ARGO Messaging + Service (AMS). + - Added option to send messages from a directory without needing to conform to + the file naming convention that the dirq module requires. + - Fixed SSM hanging if certificate is not authorised with the broker. Now it + will try other brokers if available and then correctly shut down. + - Fixed an OpenSSL 1.1 syntax error by including missing argument to checkend. + * Wed Nov 28 2018 Adrian Coveney - 2.3.0-2 - Updated build and test files only. diff --git a/README.md b/README.md index 30b7dcdf..557b423b 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,19 @@ -# Secure Stomp Messenger +# Secure STOMP Messenger [![Build Status](https://travis-ci.org/apel/ssm.svg?branch=dev)](https://travis-ci.org/apel/ssm) [![Coverage Status](https://coveralls.io/repos/github/apel/ssm/badge.svg?branch=dev)](https://coveralls.io/github/apel/ssm?branch=dev) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/cc3e808664ee41638938aa5c660a88ae)](https://www.codacy.com/app/apel/ssm) [![Maintainability](https://api.codeclimate.com/v1/badges/34aa04f3583afce2ceb2/maintainability)](https://codeclimate.com/github/apel/ssm/maintainability) -Secure Stomp Messenger (SSM) is designed to simply send messages -using the STOMP protocol. Messages are signed and may be encrypted -during transit. Persistent queues should be used to guarantee -delivery. +Secure STOMP Messenger (SSM) is designed to simply send messages +using the [STOMP protocol](http://stomp.github.io/) or via the ARGO Messaging Service (AMS). +Messages are signed and may be encrypted during transit. +Persistent queues should be used to guarantee delivery. -SSM is written in python. Packages are available for SL5 and SL6. +SSM is written in Python. Packages are available for RHEL 6 and 7, and + Ubuntu Trusty. -For more about SSM, see the [EGI wiki](https://wiki.egi.eu/wiki/APEL/SSM). +For more information about SSM, see the [EGI wiki](https://wiki.egi.eu/wiki/APEL/SSM). ## Installing the RPM @@ -23,16 +24,19 @@ The EPEL repository must be enabled. This can be done by installing the RPM for your version of SL, which is available on this page: http://fedoraproject.org/wiki/EPEL -The python stomp library (N.B. versions 3.1.1 and above are currently supported) +The Python STOMP library (N.B. versions 3.1.1 and above are currently supported) * `yum install stomppy` -The python daemon library (N.B. only versions below 2.2.0 are currently supported) +The Python AMS library. See here for details on obtaining an RPM: https://github.com/ARGOeu/argo-ams-library/ + +The Python daemon library (N.B. only versions below 2.2.0 are currently supported) * `yum install python-daemon` -The python ldap library +The Python ldap library * `yum install python-ldap` -The python dirq library +Optionally, the Python dirq library (N.B. this is only required if your messages +are stored in a dirq structure) * `yum install python-dirq` You need a certificate and key in PEM format accessible to the SSM. @@ -68,7 +72,7 @@ successfully. The RPM carries out a number of steps to run the SSM in a specific way. -1. It installs the core files in the python libraries directory +1. It installs the core files in the Python libraries directory 2. It installs scripts in /usr/bin 3. It installs configuration files in /etc/apel 4. It creates the messages directory /var/spool/apel/ @@ -85,7 +89,7 @@ Install APEL SSM: Install any missing system packages needed for the SSM: * `apt-get -f install` -Install any missing python requirements that don't have system packages: +Install any missing Python requirements that don't have system packages: * `pip install "stomp.py>=3.1.1" dirq` If you wish to run the SSM as a receiver, you will also need to install the python-daemon system package: @@ -95,7 +99,7 @@ If you wish to run the SSM as a receiver, you will also need to install the pyth The DEB carries out a number of steps to run the SSM in a specific way. -1. It installs the core files in the python libraries directory +1. It installs the core files in the Python libraries directory 2. It installs scripts in /usr/bin 3. It installs configuration files in /etc/apel 4. It creates the messages directory /var/spool/apel/ @@ -114,15 +118,16 @@ Ensure that the apel user running the SSM has access to the following: * `chown apel:apel /var/run/apel` The configuration files are in /etc/apel/. The default -configuration will send messages to the test apel server. +configuration will send messages to the test APEL server. ## Adding Files -There are two ways to add files to be sent: +There are multiple manual and programmatic ways to add files to be sent: ### Manual +#### With the dirq module All file and directory names must use hex characters: `[0-9a-f]`. * Create a directory within /var/spool/apel/outgoing with a name @@ -130,27 +135,45 @@ All file and directory names must use hex characters: `[0-9a-f]`. * Put files in this directory with names of FOURTEEN hex e.g. `1234567890abcd` +#### Without the dirq module +Ensure `path_type: directory` is set in your `sender.cfg`. +Then add messages as files to `/var/spool/apel/outgoing`, +there are no restrictions on the file names used. + ### Programmatic -Use the python or perl dirq libraries: - * python: http://pypi.python.org/pypi/dirq - * perl: http://search.cpan.org/~lcons/Directory-Queue/ +#### With the dirq module +Use the Python or Perl dirq libraries: + * Python: http://pypi.python.org/pypi/dirq + * Perl: http://search.cpan.org/~lcons/Directory-Queue/ Create a QueueSimple object with path /var/spool/apel/outgoing/ and add your messages. +#### Without the dirq module +Use the `MessageDirectory` class provided in `ssm.message_directory`. + +Create a `MessageDirectory` object with path `/var/spool/apel/outgoing/` and +add your messages using the `add` method. ## Running the SSM -### Sender +### Sender (sending via the EGI message brokers) * Run 'ssmsend' * SSM will pick up any messages and send them to the configured queue on the configured broker + +### Sender (sending via the ARGO Messaging Service (AMS)) + * Edit your sender configuration, usually under `/etc/apel/sender.cfg`, as per the [migration instructions](migrating_to_ams.md#sender) with some minor differences: + * There is no need to add the `[sender]` section as it already exists. Instead change the `protocol` to `AMS`. + * Set `ams_project` to the appropriate project. + * Then run 'ssmsend'. SSM will pick up any messages and send them via the ARGO Messaging Service. + ### Sender (container) * Download the example [configuration file](conf/sender.cfg) - * Edit the downloaded sender.cfg file to configure the queue and broker + * Edit the downloaded `sender.cfg` file as above for sending either via the [EGI message brokers](README.md#sender-sending-via-the-egi-message-brokers) or the [ARGO Messaging Service](README.md#sender-sending-via-the-argo-messaging-service-ams). * Run the following docker command to send ``` docker run \ @@ -191,6 +214,13 @@ add your messages. * SSM will receive any messages on the specified queue and write them to the filesystem * To stop, run ```'kill `cat /var/run/apel/ssm.pid`'``` + +### Receiver (receiving via the ARGO Messaging Service (AMS)) + + * Edit your receiver configuration, usually under `/etc/apel/receiver.cfg`, as per the [migration instructions](migrating_to_ams.md#receiver) with some minor differences: + * There is no need to add the `[receiver]` section as it already exists. Instead change the `protocol` to `AMS`. + * Set `ams_project` to the appropriate project. + * Then run your receiver ([as a service](README.md#receiver-service), [as a container](README.md#receiver-container) or [manually](README.md#receiver-manual)) as above. ## Removing the RPM diff --git a/apel-ssm.spec b/apel-ssm.spec index 7fc3e1c9..a452b581 100644 --- a/apel-ssm.spec +++ b/apel-ssm.spec @@ -4,8 +4,8 @@ %endif Name: apel-ssm -Version: 2.3.0 -%define releasenumber 2 +Version: 2.4.0 +%define releasenumber 1 Release: %{releasenumber}%{?dist} Summary: Secure stomp messenger @@ -21,7 +21,7 @@ BuildArch: noarch BuildRequires: python-devel %endif -Requires: stomppy >= 3.1.1, python-daemon < 2.2.0, python-dirq, python-ldap +Requires: stomppy >= 3.1.1, python-daemon < 2.2.0, python-ldap, argo-ams-library Requires(pre): shadow-utils %define ssmconf %_sysconfdir/apel @@ -100,6 +100,15 @@ rm -rf $RPM_BUILD_ROOT %doc %_defaultdocdir/%{name} %changelog +* Thu Aug 01 2019 Adrian Coveney - 2.4.0-1 + - Added support for sending and receiving messages using the ARGO Messaging + Service (AMS). + - Added option to send messages from a directory without needing to conform to + the file naming convention that the dirq module requires. + - Fixed SSM hanging if certificate is not authorised with the broker. Now it + will try other brokers if available and then correctly shut down. + - Fixed an OpenSSL 1.1 syntax error by including missing argument to checkend. + * Wed Nov 28 2018 Adrian Coveney - 2.3.0-2 - Updated build and test files only. diff --git a/bin/receiver.py b/bin/receiver.py index a96afe34..42ed11fc 100644 --- a/bin/receiver.py +++ b/bin/receiver.py @@ -24,6 +24,7 @@ from ssm import __version__, set_up_logging, LOG_BREAK from stomp.exception import NotConnectedException +from argo_ams_library import AmsConnectionException import time import logging.config @@ -103,39 +104,92 @@ def main(): print 'Error configuring logging: %s' % str(err) print 'SSM will exit.' sys.exit(1) - + global log log = logging.getLogger('ssmreceive') - + log.info(LOG_BREAK) log.info('Starting receiving SSM version %s.%s.%s.', *__version__) - # If we can't get a broker to connect to, we have to give up. + # Determine the protocol for the SSM to use. try: - bg = StompBrokerGetter(cp.get('broker','bdii')) - use_ssl = cp.getboolean('broker', 'use_ssl') - if use_ssl: - service = STOMP_SSL_SERVICE - else: - service = STOMP_SERVICE - brokers = bg.get_broker_hosts_and_ports(service, cp.get('broker','network')) - except ConfigParser.NoOptionError, e: + protocol = cp.get('receiver', 'protocol') + + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + # If the newer configuration setting 'protocol' is not set, use 'STOMP' + # for backwards compatability. + protocol = Ssm2.STOMP_MESSAGING + log.debug("No option set for 'protocol'. Defaulting to %s.", protocol) + + log.info('Setting up SSM with protocol: %s', protocol) + + if protocol == Ssm2.STOMP_MESSAGING: + # Set defaults for AMS variables that Ssm2 constructor requires below. + project = None + token = '' + + # If we can't get a broker to connect to, we have to give up. try: + bg = StompBrokerGetter(cp.get('broker', 'bdii')) + use_ssl = cp.getboolean('broker', 'use_ssl') + if use_ssl: + service = STOMP_SSL_SERVICE + else: + service = STOMP_SERVICE + brokers = bg.get_broker_hosts_and_ports(service, cp.get('broker', + 'network')) + except ConfigParser.NoOptionError, e: + try: + host = cp.get('broker', 'host') + port = cp.get('broker', 'port') + brokers = [(host, int(port))] + except ConfigParser.NoOptionError: + log.error('Options incorrectly supplied for either single ' + 'broker or broker network. ' + 'Please check configuration') + log.error('System will exit.') + log.info(LOG_BREAK) + sys.exit(1) + except ldap.SERVER_DOWN, e: + log.error('Could not connect to LDAP server: %s', e) + log.error('System will exit.') + log.info(LOG_BREAK) + sys.exit(1) + + elif protocol == Ssm2.AMS_MESSAGING: + # Then we are setting up an SSM to connect to a AMS. + + # 'use_ssl' isn't checked when using AMS (SSL is always used), but it + # is needed for the call to the Ssm2 constructor below. + use_ssl = None + try: + # We only need a hostname, not a port host = cp.get('broker', 'host') - port = cp.get('broker', 'port') - brokers = [(host, int(port))] + # Use brokers variable so subsequent code is not dependant on + # the exact destination type. + brokers = [host] + except ConfigParser.NoOptionError: - log.error('Options incorrectly supplied for either single broker \ - or broker network. Please check configuration') + log.error('The host must be specified when connecting to AMS, ' + 'please check your configuration') log.error('System will exit.') log.info(LOG_BREAK) + print 'SSM failed to start. See log file for details.' sys.exit(1) - except ldap.SERVER_DOWN, e: - log.error('Could not connect to LDAP server: %s', e) - log.error('System will exit.') - log.info(LOG_BREAK) - sys.exit(1) - + + # Attempt to configure AMS specific variables. + try: + token = cp.get('messaging', 'token') + project = cp.get('messaging', 'ams_project') + + except (ConfigParser.Error, ValueError, IOError), err: + # A token and project are needed to successfully receive from an + # AMS instance, so log and then exit on an error. + log.error('Error configuring AMS values: %s', err) + log.error('SSM will exit.') + print 'SSM failed to start. See log file for details.' + sys.exit(1) + if len(brokers) == 0: log.error('No brokers available.') log.error('System will exit.') @@ -155,11 +209,14 @@ def main(): cert=cp.get('certificates','certificate'), key=cp.get('certificates','key'), listen=cp.get('messaging','destination'), - use_ssl=cp.getboolean('broker','use_ssl'), + use_ssl=use_ssl, capath=cp.get('certificates', 'capath'), check_crls=cp.getboolean('certificates', 'check_crls'), - pidfile=pidfile) - + pidfile=pidfile, + protocol=protocol, + project=project, + token=token) + log.info('Fetching valid DNs.') dns = get_dns(options.dn_file) ssm.set_dns(dns) @@ -179,25 +236,31 @@ def main(): i = 0 # The message listening loop. while True: + try: + time.sleep(1) + if protocol == Ssm2.AMS_MESSAGING: + # We need to pull down messages as part of + # this loop when using AMS. + ssm.pull_msg_ams() + + if i % REFRESH_DNS == 0: + log.info('Refreshing valid DNs and then sending ping.') + dns = get_dns(options.dn_file) + ssm.set_dns(dns) + + if protocol == Ssm2.STOMP_MESSAGING: + ssm.send_ping() - time.sleep(1) - - if i % REFRESH_DNS == 0: - log.info('Refreshing valid DNs and then sending ping.') - dns = get_dns(options.dn_file) - ssm.set_dns(dns) - - try: - ssm.send_ping() - except NotConnectedException: - log.warn('Connection lost.') - ssm.shutdown() - dc.close() - log.info("Waiting for 10 minutes before restarting...") - time.sleep(10 * 60) - log.info('Restarting SSM.') - dc.open() - ssm.startup() + except (NotConnectedException, AmsConnectionException) as error: + log.warn('Connection lost.') + log.debug(error) + ssm.shutdown() + dc.close() + log.info("Waiting for 10 minutes before restarting...") + time.sleep(10 * 60) + log.info('Restarting SSM.') + dc.open() + ssm.startup() i += 1 diff --git a/bin/sender.py b/bin/sender.py index 2f1313b8..35c32123 100644 --- a/bin/sender.py +++ b/bin/sender.py @@ -60,42 +60,104 @@ def main(): print 'Error configuring logging: %s' % str(err) print 'The system will exit.' sys.exit(1) - + log = logging.getLogger('ssmsend') - + log.info(LOG_BREAK) log.info('Starting sending SSM version %s.%s.%s.', *__version__) - # If we can't get a broker to connect to, we have to give up. + + # Determine the protocol and destination type of the SSM to configure. try: - bdii_url = cp.get('broker','bdii') - log.info('Retrieving broker details from %s ...', bdii_url) - bg = StompBrokerGetter(bdii_url) - use_ssl = cp.getboolean('broker', 'use_ssl') - if use_ssl: - service = STOMP_SSL_SERVICE - else: - service = STOMP_SERVICE - brokers = bg.get_broker_hosts_and_ports(service, cp.get('broker','network')) - log.info('Found %s brokers.', len(brokers)) - except ConfigParser.NoOptionError, e: + protocol = cp.get('sender', 'protocol') + + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + # If the newer configuration setting 'protocol' is not set, use 'STOMP' + # for backwards compatability. + protocol = Ssm2.STOMP_MESSAGING + log.debug("No option set for 'protocol'. Defaulting to %s.", protocol) + + log.info('Setting up SSM with protocol: %s', protocol) + + if protocol == Ssm2.STOMP_MESSAGING: + # Set defaults for AMS variables that Ssm2 constructor requires below. + project = None + token = '' + + # If we can't get a broker to connect to, we have to give up. + try: + bdii_url = cp.get('broker', 'bdii') + log.info('Retrieving broker details from %s ...', bdii_url) + bg = StompBrokerGetter(bdii_url) + use_ssl = cp.getboolean('broker', 'use_ssl') + if use_ssl: + service = STOMP_SSL_SERVICE + else: + service = STOMP_SERVICE + brokers = bg.get_broker_hosts_and_ports(service, cp.get('broker', + 'network')) + log.info('Found %s brokers.', len(brokers)) + except ConfigParser.NoOptionError, e: + try: + host = cp.get('broker', 'host') + port = cp.get('broker', 'port') + brokers = [(host, int(port))] + except ConfigParser.NoOptionError: + log.error('Options incorrectly supplied for either single ' + 'broker or broker network. ' + 'Please check configuration') + log.error('System will exit.') + log.info(LOG_BREAK) + print 'SSM failed to start. See log file for details.' + sys.exit(1) + except ldap.LDAPError, e: + log.error('Could not connect to LDAP server: %s', e) + log.error('System will exit.') + log.info(LOG_BREAK) + print 'SSM failed to start. See log file for details.' + sys.exit(1) + + elif protocol == Ssm2.AMS_MESSAGING: + # Then we are setting up an SSM to connect to a AMS. + + # 'use_ssl' isn't checked when using AMS (SSL is always used), but it + # is needed for the call to the Ssm2 constructor below. + use_ssl = None try: + # We only need a hostname, not a port host = cp.get('broker', 'host') - port = cp.get('broker', 'port') - brokers = [(host, int(port))] + # Use brokers variable so subsequent code is not dependant on + # the exact destination type. + brokers = [host] + except ConfigParser.NoOptionError: - log.error('Options incorrectly supplied for either single broker or \ - broker network. Please check configuration') + log.error('The host must be specified when connecting to AMS, ' + 'please check your configuration') log.error('System will exit.') log.info(LOG_BREAK) print 'SSM failed to start. See log file for details.' sys.exit(1) - except ldap.LDAPError, e: - log.error('Could not connect to LDAP server: %s', e) - log.error('System will exit.') - log.info(LOG_BREAK) - print 'SSM failed to start. See log file for details.' - sys.exit(1) - + + # Attempt to configure AMS project variable. + try: + project = cp.get('messaging', 'ams_project') + + except (ConfigParser.Error, ValueError, IOError), err: + # A project is needed to successfully send to an + # AMS instance, so log and then exit on an error. + log.error('Error configuring AMS values: %s', err) + log.error('SSM will exit.') + print 'SSM failed to start. See log file for details.' + sys.exit(1) + + try: + token = cp.get('messaging', 'token') + except (ConfigParser.Error, ValueError, IOError), err: + # A token is not necessarily needed, if the cert and key can be + # used by the underlying auth system to get a suitable token. + log.info('No AMS token provided, using cert/key pair instead.') + # Empty string used by AMS to define absence of token. + token = '' + if len(brokers) == 0: log.error('No brokers available.') log.error('System will exit.') @@ -120,17 +182,29 @@ def main(): raise Ssm2Exception('No destination queue is configured.') except ConfigParser.NoOptionError, e: raise Ssm2Exception(e) - + + # Determine what type of message store we are interacting with, + # i.e. a dirq QueueSimple object or a plain MessageDirectory directory. + try: + path_type = cp.get('messaging', 'path_type') + except ConfigParser.NoOptionError: + log.info('No path type defined, assuming dirq.') + path_type = 'dirq' + sender = Ssm2(brokers, - cp.get('messaging','path'), - cert=cp.get('certificates','certificate'), - key=cp.get('certificates','key'), - dest=cp.get('messaging','destination'), - use_ssl=cp.getboolean('broker','use_ssl'), - capath=cp.get('certificates', 'capath'), - enc_cert=server_cert, - verify_enc_cert=verify_server_cert) - + cp.get('messaging', 'path'), + path_type=path_type, + cert=cp.get('certificates', 'certificate'), + key=cp.get('certificates', 'key'), + dest=cp.get('messaging', 'destination'), + use_ssl=use_ssl, + capath=cp.get('certificates', 'capath'), + enc_cert=server_cert, + verify_enc_cert=verify_server_cert, + protocol=protocol, + project=project, + token=token) + if sender.has_msgs(): sender.handle_connect() sender.send_all() diff --git a/conf/receiver.cfg b/conf/receiver.cfg index cb36d28a..bca39065 100644 --- a/conf/receiver.cfg +++ b/conf/receiver.cfg @@ -1,14 +1,19 @@ +[receiver] +# Either 'STOMP' for STOMP message brokers or 'AMS' for Argo Messaging Service +protocol: STOMP + [broker] # The SSM will query a BDII to find brokers available. These details are for the -# EGI test broker network +# EGI production broker network bdii: ldap://lcg-bdii.cern.ch:2170 -network: TEST-NWOB -# OR (these details will only be used if the broker network settings aren't used) -#host: test-msg01.afroditi.hellasgrid.gr -#port: 6163 +network: PROD +# Alternatively, 'host' and 'port' can be set manually (with 'bdii' and +# 'network' commented out). This option MUST be used for AMS. +#host: +#port: -# broker authentication. If use-ssl is set, the certificates configured +# broker authentication. If use_ssl is set, the certificates configured # in the mandatory [certificates] section will be used. use_ssl: true @@ -19,6 +24,8 @@ capath: /etc/grid-security/certificates check_crls: false [messaging] +# If using AMS this is the project that SSM will connect to. Ignored for STOMP. +ams_project: # Destination to which SSM will listen. destination: /queue/ssm2test diff --git a/conf/sender.cfg b/conf/sender.cfg index 73c0511f..4e7698c4 100644 --- a/conf/sender.cfg +++ b/conf/sender.cfg @@ -1,6 +1,6 @@ -################################################################################ -# Required: broker configuration options -# +[sender] +# Either 'STOMP' for STOMP message brokers or 'AMS' for Argo Messaging Service +protocol: STOMP [broker] @@ -8,18 +8,15 @@ # EGI production broker network bdii: ldap://lcg-bdii.cern.ch:2170 network: PROD -# OR (these details will only be used if the broker network settings aren't used) -#host: test-msg01.afroditi.hellasgrid.gr -#port: 6163 +# Alternatively, 'host' and 'port' may be set manually (with 'bdii' and +# 'network' commented out). This option must be used for AMS. +#host: +#port: # broker authentication. If use_ssl is set, the certificates configured # in the mandatory [certificates] section will be used. use_ssl: true - -################################################################################ -# Required: Certificate configuration - [certificates] certificate: /etc/grid-security/hostcert.pem key: /etc/grid-security/hostkey.pem @@ -30,16 +27,22 @@ capath: /etc/grid-security/certificates # the final server that's receiving your messages; not your own, nor the broker. #server_cert: /etc/grid-security/servercert.pem -################################################################################ -# Messaging configuration. -# [messaging] +# If using AMS this is the project that SSM will connect to. Ignored for STOMP. +ams_project: # Queue to which SSM will send messages destination: # Outgoing messages will be read and removed from this directory. path: /var/spool/apel/outgoing +# If 'path_type' is set to 'dirq' (or if 'path_type' is omitted), the supplied +# 'path' will be treated as a Python dirq (a directory based queue, which is a +# port of the Perl module Directory::Queue). +# If 'path_type' is set to 'directory', the supplied 'path' will be treated +# as if it is a directory rather than a dirq. +# As a result, 'path' cannot contain subdirectories. +path_type: dirq [logging] logfile: /var/log/apel/ssmsend.log diff --git a/constraints.txt b/constraints.txt index 6c3eab64..74809e04 100644 --- a/constraints.txt +++ b/constraints.txt @@ -1,4 +1,5 @@ # Constraints that apply to pip installed requirements -# coveralls dependency that needs an older version for Python 2.6 +# coveralls dependencies that need older versions for Python 2.6 pycparser<2.19 +idna<2.8 diff --git a/migrating_to_ams.md b/migrating_to_ams.md new file mode 100644 index 00000000..aa7bd8db --- /dev/null +++ b/migrating_to_ams.md @@ -0,0 +1,50 @@ +# Migrating from using EGI ActiveMQ Message Brokers to using EGI ARGO Messaging Service + +Migration requires upgrading SSM to at least version 2.4.0 and adding new values to your configuration. + +## Sender + +The sender configuration is usually found under `/etc/apel/sender.cfg`. Follow the steps below to migrate. + +1. Comment out `bdii` and `network`. +1. Uncomment `host` and set it to `msg-devel.argo.grnet.gr`. +1. Add the following as a new section at the top of your configuration: +``` +[sender] +# Either 'STOMP' for STOMP message brokers or 'AMS' for Argo Messaging Service +protocol: AMS +``` +1. Add the following to the `[messaging]` section of your configuration: +``` +# If using AMS this is the project that SSM will connect to. Ignored for STOMP. +ams_project: EGI-ACCOUNTING +``` +1. To send to the central APEL Accounting server, change `destination` to one of the following depending on your type of accounting: + * `gLite-APEL` for Grid Accounting + * `eu.egi.cloud.accounting` for Cloud Accounting + * `eu.egi.storage.accounting` for Storage Accounting + +The next time `ssmsend` runs it should be using the AMS. You can check this by looking in the logs for a successful run, which should look like this: + +``` +2018-09-19 14:18:06,423 - ssmsend - INFO - ======================================== +2018-09-19 14:18:06,424 - ssmsend - INFO - Starting sending SSM version 2.4.0. +2018-09-19 14:18:06,424 - ssmsend - INFO - Setting up SSM with Dest Type: AMS, Protocol : HTTPS +2018-09-19 14:18:06,424 - ssmsend - INFO - No AMS token provided, using cert/key pair instead. +2018-09-19 14:18:06,424 - ssmsend - INFO - No server certificate supplied. Will not encrypt messages. +2018-09-19 14:18:07,061 - ssm.ssm2 - INFO - Found 1 messages. +2018-09-19 14:18:07,860 - ssm.ssm2 - INFO - Sent 5ba24c88/5ba24c8f0f129d, Argo ID: 18 +2018-09-19 14:18:07,861 - ssm.ssm2 - INFO - Tidying message directory. +2018-09-19 14:18:07,862 - ssmsend - INFO - SSM run has finished. +2018-09-19 14:18:07,862 - ssmsend - INFO - SSM has shut down. +2018-09-19 14:18:07,862 - ssmsend - INFO - ======================================== +``` + +## Receiver + +1. Follow the steps 1 to 4 as per the [Sender documentation](#Sender) but editing your receiver configuration instead, usually found under `/etc/apel/receiver.cfg`, naming the sction `[receiver]` rather than `[sender]`. +1. Change `destination` to be the subscription you are using to pull messages down. +1. Add your token to the `[messaging]` section of your configuration: +``` +token: your_token_here +``` diff --git a/requirements-test.txt b/requirements-test.txt index c9eb8ea3..f9d5b6b7 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -4,6 +4,6 @@ -c constraints.txt unittest2 -coveralls +coveralls<=1.2.0 mock codecov diff --git a/requirements.txt b/requirements.txt index a5977a79..02bfe1b7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,9 @@ stomp.py>=3.1.1 python-daemon<2.2.0 python-ldap +# Dependencies for optional dirq based sending dirq + +# Dependencies for experimental AMS functionality + +argo-ams-library diff --git a/scripts/ssm-build-deb.sh b/scripts/ssm-build-deb.sh index e4d0d006..4eb15210 100755 --- a/scripts/ssm-build-deb.sh +++ b/scripts/ssm-build-deb.sh @@ -16,7 +16,7 @@ set -eu -TAG=2.3.0-1 +TAG=2.4.0-1 SOURCE_DIR=~/debbuild/source BUILD_DIR=~/debbuild/build diff --git a/scripts/ssm-build-rpm.sh b/scripts/ssm-build-rpm.sh index 005a0918..a892b88c 100644 --- a/scripts/ssm-build-rpm.sh +++ b/scripts/ssm-build-rpm.sh @@ -10,7 +10,7 @@ rpmdev-setuptree RPMDIR=/home/rpmb/rpmbuild -VERSION=2.3.0-1 +VERSION=2.4.0-1 SSMDIR=apel-ssm-$VERSION # Remove old sources and RPMS diff --git a/setup.py b/setup.py index 69c42aff..15ab89c8 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ Requires setuptools. """ -from os import remove, path, makedirs +from os import remove from shutil import copyfile import sys @@ -50,9 +50,12 @@ def main(): url='http://apel.github.io/', download_url='https://github.com/apel/ssm/releases', license='Apache License, Version 2.0', - install_requires=['stomp.py>=3.1.1', 'python-ldap', 'dirq'], + install_requires=[ + 'stomp.py>=3.1.1', 'python-ldap', 'argo-ams-library', + ], extras_require={ 'python-daemon': ['python-daemon<2.2.0'], + 'dirq': ['dirq'], }, packages=find_packages(exclude=['bin', 'test']), scripts=['bin/ssmreceive', 'bin/ssmsend'], @@ -91,5 +94,6 @@ def main(): remove('conf/apel-ssm') remove('apel-ssm') + if __name__ == "__main__": main() diff --git a/ssm/__init__.py b/ssm/__init__.py index c03583bd..bc90c920 100644 --- a/ssm/__init__.py +++ b/ssm/__init__.py @@ -19,7 +19,7 @@ import logging import sys -__version__ = (2, 3, 0) +__version__ = (2, 4, 0) LOG_BREAK = '========================================' diff --git a/ssm/crypto.py b/ssm/crypto.py index 7fe6f176..f3b88ac0 100644 --- a/ssm/crypto.py +++ b/ssm/crypto.py @@ -214,11 +214,16 @@ def decrypt(encrypted_text, certpath, keypath): def verify_cert_date(certpath): - """Return True if certifcate is 'in date', otherwise return False.""" + """Check that certificate hasn't expired and won't expire within 24 hours. + + Return True if certifcate is 'in date', otherwise return False. + """ if certpath is None: raise CryptoException('Invalid None argument to verify_cert_date().') - args = ['openssl', 'x509', '-checkend', '-noout', '-in', certpath] + # Check if the certificate expires within the next 86400 seconds and exit + # non-zero if yes, it will expire, or zero if not. + args = ['openssl', 'x509', '-checkend', '86400', '-noout', '-in', certpath] p1 = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) diff --git a/ssm/message_directory.py b/ssm/message_directory.py new file mode 100644 index 00000000..cf1db8a1 --- /dev/null +++ b/ssm/message_directory.py @@ -0,0 +1,127 @@ +# Copyright 2018 Science and Technology Facilities Council +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This module contains the MessageDirectory class.""" + +import logging +import os +import uuid + +# logging configuration +log = logging.getLogger(__name__) + + +class MessageDirectory(object): + """A structure for holding Accounting messages in a directory.""" + + def __init__(self, path): + """Create a new directory structure for holding Accounting messages.""" + self.directory_path = path + + def add(self, data): + """Add the passed data to a new file and return it's name.""" + # Create a unique file name so APEL admins can pair sent and recieved + # messages easily (as the file name appears in the sender and receiver + # logs as the message ID). + name = uuid.uuid4() + + # Open the file and write the provided data into the file. + with open("%s/%s" % (self.directory_path, name), 'w') as message: + message.write(data) + + # Return the name of the created file as a string, + # to keep the dirq like interface. + return "%s" % name + + def count(self): + """ + Return the number of elements in the queue. + + Regardless of their state. + """ + return len(self._get_messages()) + + def get(self, name): + """Return the content of the named message.""" + with open("%s/%s" % (self.directory_path, name)) as message: + content = message.read() + return content + + def lock(self, _name): + """Return True to simulate a successful lock. Does nothing else.""" + return True + + def purge(self): + """ + Do nothing, as there are no old/intermediate directories to purge. + + Only included to preserve dirq interface. + """ + log.debug("purge called, but purge does nothing for non-dirq sending.") + + def remove(self, name): + """Remove the named message.""" + os.unlink("%s/%s" % (self.directory_path, name)) + + def _get_messages(self, sort_by_mtime=False): + """ + Get the messages stored in this MessageDirectory. + + if sort_by_mtime is set to True, the returned list is guaranteed to be + in increasing order of modification time. + + mtime is used because (apparently) there is not way to find the + original date of file creation due to a limitation + of the underlying filesystem + """ + try: + # Get a list of files under self.directory_path + # in an arbitrary order. + file_name_list = os.listdir(self.directory_path) + + if sort_by_mtime: + # Working space to hold the unsorted messages + # as file paths and mtimes. + unsorted_messages = [] + # Working space to hold the sorted messages as file names. + sorted_messages = [] + + # Work out the mtime of each file. + for file_name in file_name_list: + file_path = os.path.join(self.directory_path, file_name) + # Store the file path and the time + # the file was last modified. + unsorted_messages.append((file_name, + os.path.getmtime(file_path))) + + # Sort the file paths by mtime and + # then only store the file name. + for (file_name, _mtime) in sorted(unsorted_messages, + key=lambda tup: tup[1]): + # Store the sorted file paths in a class element. + sorted_messages.append(file_name) + + # Return the sorted list. + return sorted_messages + + # If we get here, just return the arbitrarily ordered list. + return file_name_list + + except (IOError, OSError) as error: + log.error(error) + # Return an empty file list. + return [] + + def __iter__(self): + """Return an iterable of files currently in the MessageDirectory.""" + return self._get_messages(sort_by_mtime=True).__iter__() diff --git a/ssm/ssm2.py b/ssm/ssm2.py index e451948a..28e9762b 100644 --- a/ssm/ssm2.py +++ b/ssm/ssm2.py @@ -25,8 +25,15 @@ ssl = None from ssm import crypto -from dirq.QueueSimple import QueueSimple -from dirq.queue import Queue +from ssm.message_directory import MessageDirectory + +try: + from dirq.QueueSimple import QueueSimple + from dirq.queue import Queue +except ImportError: + # ImportError is raised later on if dirq is requested but not installed. + QueueSimple = None + Queue = None import stomp from stomp.exception import ConnectFailedException @@ -36,9 +43,12 @@ import time import logging +from argo_ams_library import ArgoMessagingService, AmsMessage + # Set up logging log = logging.getLogger(__name__) + class Ssm2Exception(Exception): ''' Exception for use by SSM2. @@ -55,9 +65,14 @@ class Ssm2(stomp.ConnectionListener): REJECT_SCHEMA = {'body': 'string', 'signer':'string?', 'empaid':'string?', 'error':'string'} CONNECTION_TIMEOUT = 10 - def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None, - capath=None, check_crls=False, use_ssl=False, username=None, password=None, - enc_cert=None, verify_enc_cert=True, pidfile=None): + # Messaging protocols + STOMP_MESSAGING = 'STOMP' + AMS_MESSAGING = 'AMS' + + def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None, + capath=None, check_crls=False, use_ssl=False, username=None, password=None, + enc_cert=None, verify_enc_cert=True, pidfile=None, path_type='dirq', + protocol=STOMP_MESSAGING, project=None, token=''): ''' Creates an SSM2 object. If a listen value is supplied, this SSM2 will be a receiver. @@ -84,12 +99,45 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None, self._valid_dns = [] self._pidfile = pidfile + # Used to differentiate between STOMP and AMS methods + self._protocol = protocol + + # Used when interacting with an Argo Messaging Service + self._project = project + self._token = token + + if self._protocol == Ssm2.AMS_MESSAGING: + self._ams = ArgoMessagingService(endpoint=self._brokers[0], + token=self._token, + cert=self._cert, + key=self._key, + project=self._project) + # create the filesystem queues for accepted and rejected messages if dest is not None and listen is None: - self._outq = QueueSimple(qpath) + # Determine what sort of outgoing structure to make + if path_type == 'dirq': + if QueueSimple is None: + raise ImportError("dirq path_type requested but the dirq " + "module wasn't found.") + + self._outq = QueueSimple(qpath) + + elif path_type == 'directory': + self._outq = MessageDirectory(qpath) + else: + raise Ssm2Exception('Unsupported path_type variable.') + elif listen is not None: inqpath = os.path.join(qpath, 'incoming') rejectqpath = os.path.join(qpath, 'reject') + + # Receivers must use the dirq module, so make a quick sanity check + # that dirq is installed. + if Queue is None: + raise ImportError("Receiving SSMs must use dirq, but the dirq " + "module wasn't found.") + self._inq = Queue(inqpath, schema=Ssm2.QSCHEMA) self._rejectq = Queue(rejectqpath, schema=Ssm2.REJECT_SCHEMA) else: @@ -100,7 +148,8 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None, # Check that the certificate has not expired. if not crypto.verify_cert_date(self._cert): - raise Ssm2Exception('Certificate %s has expired.' % self._cert) + raise Ssm2Exception('Certificate %s has expired or will expire ' + 'within a day.' % self._cert) # check the server certificate provided if enc_cert is not None: @@ -110,9 +159,9 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None, # Check that the encyption certificate has not expired. if not crypto.verify_cert_date(enc_cert): raise Ssm2Exception( - 'Encryption certificate %s has expired. Please obtain the ' - 'new one from the final server receiving your messages.' % - enc_cert + 'Encryption certificate %s has expired or will expire ' + 'within a day. Please obtain the new one from the final ' + 'server receiving your messages.' % enc_cert ) if verify_enc_cert: if not crypto.verify_cert_path(self._enc_cert, self._capath, self._check_crls): @@ -205,12 +254,15 @@ def on_message(self, headers, body): except (IOError, OSError) as e: log.error('Failed to read or write file: %s', e) - def on_error(self, unused_headers, body): + def on_error(self, headers, body): ''' Called by stomppy when an error frame is received. ''' - log.warn('Error message received: %s', body) - raise Ssm2Exception() + if 'No user for client certificate: ' in headers['message']: + log.error('The following certificate is not authorised: %s', + headers['message'].split(':')[1]) + else: + log.error('Error message received: %s', body) def on_connected(self, unused_headers, unused_body): ''' @@ -311,6 +363,82 @@ def _send_msg(self, message, msgid): # If it fails, use the v3 metod signiture self._conn.send(to_send, headers=headers) + def pull_msg_ams(self): + """Pull 1 message from the AMS and acknowledge it.""" + if self._protocol != Ssm2.AMS_MESSAGING: + # Then this method should not be called, + # raise an exception if it is. + raise Ssm2Exception('pull_msg_ams called, ' + 'but protocol not set to AMS. ' + 'Protocol: %s' % self._protocol) + + # This method is setup so that you could pull down and + # acknowledge more than one message at a time, but + # currently there is no use case for it. + messages_to_pull = 1 + # ack id's will be stored in this list and then acknowledged + ackids = [] + + for msg_ack_id, msg in self._ams.pull_sub(self._listen, + messages_to_pull): + # Get the AMS message id + msgid = msg.get_msgid() + # Get the SSM dirq id + try: + empaid = msg.get_attr().get('empaid') + except AttributeError: + # A message without an empaid could be received if it wasn't + # sent via the SSM, we need to pull down that message + # to prevent it blocking the message queue. + log.debug("Message %s has no empaid.", msgid) + empaid = "N/A" + # get the message body + body = msg.get_data() + + log.info('Received message. ID = %s, Argo ID = %s', empaid, msgid) + + extracted_msg, signer, err_msg = self._handle_msg(body) + + try: + # If the message is empty or the error message is not empty + # then reject the message. + if extracted_msg is None or err_msg is not None: + if signer is None: # crypto failed + signer = 'Not available.' + elif extracted_msg is not None: + # If there is a signer then it was rejected for not + # being in the DNs list, so we can use the + # extracted msg, which allows the msg to be + # reloaded if needed. + body = extracted_msg + + log.warn("Message rejected: %s", err_msg) + + name = self._rejectq.add({'body': body, + 'signer': signer, + 'empaid': empaid, + 'error': err_msg}) + log.info("Message saved to reject queue as %s", name) + + else: # message verified ok + name = self._inq.add({'body': extracted_msg, + 'signer': signer, + 'empaid': empaid}) + log.info("Message saved to incoming queue as %s", name) + + # If we get here, we have saved the message, so add the + # ack ID to the list of those to be acknowledged. + ackids.append(msg_ack_id) + + except OSError, error: + log.error('Failed to read or write file: %s', error) + + # pass list of extracted ackIds to AMS Service so that + # it can move the offset for the next subscription pull + # (basically acknowledging pulled messages) + if ackids: + self._ams.ack_sub(self._listen, ackids) + def send_ping(self): ''' If a STOMP connection is left open with no activity for an hour or @@ -334,6 +462,8 @@ def has_msgs(self): def send_all(self): ''' Send all the messages in the outgoing queue. + + Either via STOMP or HTTPS (to an Argo Message Broker). ''' log.info('Found %s messages.', self._outq.count()) for msgid in self._outq: @@ -342,14 +472,44 @@ def send_all(self): continue text = self._outq.get(msgid) - self._send_msg(text, msgid) - log.info('Waiting for broker to accept message.') - while self._last_msg is None: - if not self.connected: - raise Ssm2Exception('Lost connection.') + if self._protocol == Ssm2.STOMP_MESSAGING: + # Then we are sending to a STOMP message broker. + self._send_msg(text, msgid) + + log.info('Waiting for broker to accept message.') + while self._last_msg is None: + if not self.connected: + raise Ssm2Exception('Lost connection.') + + log_string = "Sent %s" % msgid + + elif self._protocol == Ssm2.AMS_MESSAGING: + # Then we are sending to an Argo Messaging Service instance. + if text is not None: + # First we sign the message + to_send = crypto.sign(text, self._cert, self._key) + # Possibly encrypt the message. + if self._enc_cert is not None: + to_send = crypto.encrypt(to_send, self._enc_cert) - time.sleep(0.1) + # Then we need to wrap text up as an AMS Message. + message = AmsMessage(data=to_send, + attributes={'empaid': msgid}).dict() + + argo_response = self._ams.publish(self._dest, message) + + argo_id = argo_response['messageIds'][0] + log_string = "Sent %s, Argo ID: %s" % (msgid, argo_id) + + else: + # The SSM has been improperly configured + raise Ssm2Exception('Unknown messaging protocol: %s' % + self._protocol) + + time.sleep(0.1) + # log that the message was sent + log.info(log_string) self._last_msg = None self._outq.remove(msgid) @@ -397,6 +557,12 @@ def handle_connect(self): If more than one is in the list self._network_brokers, try to connect to each in turn until successful. ''' + if self._protocol == Ssm2.AMS_MESSAGING: + log.debug('handle_connect called for AMS, doing nothing.') + return + + log.info("Using stomp.py version %s.%s.%s.", *stomp.__version__) + for host, port in self._brokers: self._initialise_connection(host, port) try: @@ -416,6 +582,10 @@ def handle_disconnect(self): When disconnected, attempt to reconnect using the same method as used when starting up. ''' + if self._protocol == Ssm2.AMS_MESSAGING: + log.debug('handle_disconnect called for AMS, doing nothing.') + return + self.connected = False # Shut down properly self.close_connection() @@ -443,12 +613,25 @@ def start_connection(self): If the timeout is reached without receiving confirmation of connection, raise an exception. ''' + if self._protocol == Ssm2.AMS_MESSAGING: + log.debug('start_connection called for AMS, doing nothing.') + return + if self._conn is None: raise Ssm2Exception('Called start_connection() before a \ connection object was initialised.') self._conn.start() - self._conn.connect(wait = True) + self._conn.connect(wait=False) + + i = 0 + while not self.connected: + time.sleep(0.1) + if i > Ssm2.CONNECTION_TIMEOUT * 10: + err = 'Timed out while waiting for connection. ' + err += 'Check the connection details.' + raise Ssm2Exception(err) + i += 1 if self._dest is not None: log.info('Will send messages to: %s', self._dest) @@ -460,21 +643,16 @@ def start_connection(self): self._conn.subscribe(destination=self._listen, id=1, ack='auto') log.info('Subscribing to: %s', self._listen) - i = 0 - while not self.connected: - time.sleep(0.1) - if i > Ssm2.CONNECTION_TIMEOUT * 10: - err = 'Timed out while waiting for connection. ' - err += 'Check the connection details.' - raise Ssm2Exception(err) - i += 1 - def close_connection(self): ''' Close the connection. This is important because it runs in a separate thread, so it can outlive the main process if it is not ended. ''' + if self._protocol == Ssm2.AMS_MESSAGING: + log.debug('close_connection called for AMS, doing nothing.') + return + try: self._conn.disconnect() except (stomp.exception.NotConnectedException, socket.error): diff --git a/test/test_brokers.py b/test/test_brokers.py index 6780242e..28db2fae 100644 --- a/test/test_brokers.py +++ b/test/test_brokers.py @@ -1,38 +1,28 @@ -''' - Copyright (C) 2012 STFC +# Copyright 2019 UK Research and Innovation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 +import unittest - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - @author: Will Rogers -''' +import mock from ssm import brokers -import unittest class Test(unittest.TestCase): - - def setUp(self): - pass - - - def tearDown(self): - pass - - def test_parse_stomp_url(self): - + wrong_url = 'this is not a correct url' try: brokers.parse_stomp_url(wrong_url) @@ -40,52 +30,102 @@ def test_parse_stomp_url(self): except (IndexError, ValueError): # Expected exception pass - + http_url = 'http://not.a.stomp.url:8080' - + try: brokers.parse_stomp_url(http_url) self.fail('Parsed a URL which was not STOMP.') except ValueError: pass - + + self.assertRaises(ValueError, brokers.parse_stomp_url, + 'stomp://invalid.port.number:abc') + stomp_url = 'stomp://stomp.cern.ch:6262' - + try: brokers.parse_stomp_url(stomp_url) - except: - self.fail('Could not parse a valid stomp URL: %s' % stomp_url) - + except Exception: + self.fail('Could not parse a valid stomp URL: %s' % stomp_url) stomp_ssl_url = 'stomp+ssl://stomp.cern.ch:61262' - + try: brokers.parse_stomp_url(stomp_ssl_url) - except: - self.fail('Could not parse a valid stomp+ssl URL: %s' % stomp_url) - + except Exception: + self.fail('Could not parse a valid stomp+ssl URL: %s' % stomp_url) + def test_fetch_brokers(self): - ''' - Requires an internet connection to get information from the BDII. - Could fail if the BDII is down. This isn't very unit-test-like. - ''' - bdii = 'ldap://lcg-bdii.cern.ch:2170' + """Check the handling of responses from a mocked BDII.""" + bdii = 'ldap://no-bdii.utopia.ch:2170' network = 'PROD' - + sbg = brokers.StompBrokerGetter(bdii) - - bs = sbg.get_broker_hosts_and_ports(brokers.STOMP_SERVICE, network) - + + # So that there are no external LDAP calls, mock out the LDAP seach. + with mock.patch('ldap.ldapobject.SimpleLDAPObject.search_s', + side_effect=self._mocked_search): + bs = sbg.get_broker_hosts_and_ports(brokers.STOMP_SERVICE, network) + if len(bs) < 1: self.fail('No brokers found in the BDII.') - + host, port = bs[0] if not str(port).isdigit(): self.fail('Got a non-integer port from fetch_brokers()') - - if not '.' in host: + + if '.' not in host: self.fail("Didn't get a hostname from fetch_brokers()") + # Check that no brokers are returned from the TEST-NWOB network. + test_network = 'TEST-NWOB' + # So that there are no external LDAP calls, mock out the LDAP seach. + with mock.patch('ldap.ldapobject.SimpleLDAPObject.search_s', + side_effect=self._mocked_search): + test_bs = sbg.get_broker_hosts_and_ports(brokers.STOMP_SERVICE, + test_network) + self.assertEqual(len(test_bs), 0, "Test brokers found in error.") + + def _mocked_search(*args, **kwargs): + """Return values to mocked search call based on input.""" + + if ( + '(&(objectClass=GlueService)(GlueServiceType=msg.broker.stomp))' + ) in args: + return [( + 'GlueServiceUniqueID=mq.cro-ngi.hr_msg.broker.stomp_3523291347' + ',Mds-Vo-name=egee.srce.hr,Mds-Vo-name=local,o=grid', + {'GlueServiceUniqueID': + ['mq.cro-ngi.hr_msg.broker.stomp_3523291347'], + 'GlueServiceEndpoint': ['stomp://mq.cro-ngi.hr:6163/']}), + ( + 'GlueServiceUniqueID=broker-prod1.argo.grnet.gr_msg.broker.sto' + 'mp_175215210,Mds-Vo-name=HG-06-EKT,Mds-Vo-name=local,o=grid', + {'GlueServiceUniqueID': + ['broker-prod1.argo.grnet.gr_msg.broker.stomp_175215210'], + 'GlueServiceEndpoint': + ['stomp://broker-prod1.argo.grnet.gr:6163/']} + )] + elif ( + '(&(GlueServiceDataKey=cluster)(GlueChunkKey=GlueServiceUniqueID=' + 'mq.cro-ngi.hr_msg.broker.stomp_3523291347))' + ) in args: + return [( + 'GlueServiceDataKey=cluster,GlueServiceUniqueID=mq.cro-ngi.hr_' + 'msg.broker.stomp_3523291347,Mds-Vo-name=egee.srce.hr,Mds-Vo-n' + 'ame=local,o=grid', {'GlueServiceDataValue': ['PROD']} + )] + elif ( + '(&(GlueServiceDataKey=cluster)(GlueChunkKey=GlueServiceUniqueID=' + 'broker-prod1.argo.grnet.gr_msg.broker.stomp_175215210))' + ) in args: + return [( + 'GlueServiceDataKey=cluster,GlueServiceUniqueID=broker-prod1.a' + 'rgo.grnet.gr_msg.broker.stomp_175215210,Mds-Vo-name=HG-06-EKT' + ',Mds-Vo-name=local,o=grid', {'GlueServiceDataValue': ['PROD']} + )] + + if __name__ == '__main__': - #import sys;sys.argv = ['', 'Test.testName'] - unittest.main() \ No newline at end of file + unittest.main() diff --git a/test/test_message_directory.py b/test/test_message_directory.py new file mode 100644 index 00000000..1f61a0f2 --- /dev/null +++ b/test/test_message_directory.py @@ -0,0 +1,149 @@ +# Copyright 2018 Science and Technology Facilities Council +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This module contains test cases for the MessageDirectory class.""" + +import shutil +import tempfile +import time +import unittest + +from ssm.message_directory import MessageDirectory + + +class TestMessageDirectory(unittest.TestCase): + """Class used for testing the MessageDirectory class.""" + + def setUp(self): + """Create a MessageDirectory class on top of a temporary directory.""" + self.tmp_dir = tempfile.mkdtemp(prefix='message_directory') + self.message_directory = MessageDirectory(self.tmp_dir) + + def test_add_and_get(self): + """ + Test the add and get methods of the MessageDirectory class. + + This test adds a file to a MessageDirectory, checks it has been + written to the underlying directory and then checks the saved file + for content equality. + """ + test_content = "FOO" + # Add the test content to the MessageDirectory. + file_name = self.message_directory.add(test_content) + + # Assert there is exactly on message in the directory. + self.assertEqual(self.message_directory.count(), 1) + + # Fetch the saved content using the get method. + saved_content = self.message_directory.get(file_name) + + # Assert the saved content is equal to the original test content. + self.assertEqual(saved_content, test_content) + + def test_orderd_file_retrieval(self): + """ + Test the messages are retrieved in the order they were last modified. + + This test adds files to the MessageDirectory and then iterates over + the MessageDirectory to retrieve the file names. If the for loop does + not return them in the order they were last modfied, this test fails. + """ + # In the event of a failure of underlying _get_messages sorting, it's + # possible the returned list of files could still be in the correct + # order by random chance. + # A 'large' list of test_content reduces this chance. + test_content_list = ["Lobster Thermidor", "Crevettes", "Mornay sauce", + "Truffle Pate", "Brandy", "Fried egg", "Spam"] + + # A list to hold file names by creation time. + file_names_by_creation_time = [] + for test_content in test_content_list: + # Add the content to the MessageDirectory. + file_name = self.message_directory.add(test_content) + # Append the file name to the list of file names by create time. + file_names_by_creation_time.append(file_name) + # Wait a small amount of time to allow differentiation of times. + time.sleep(0.02) + + self.assertEqual(self.message_directory.count(), 7) + + # A list to hold file names by modification time. + file_names_by_modification_time = [] + # Use a for loop (similar to how the SSM retrieves messages) + # to build up an ordered list of files. + for file_name in self.message_directory: + file_names_by_modification_time.append(file_name) + + # As the files are not modified once added to the MessageDirectory, + # the two lists of file names should be equal. + self.assertEqual(file_names_by_modification_time, + file_names_by_creation_time) + + def test_count(self): + """ + Test the count method of the MessageDirectory class. + + This test adds two files to a MessageDirectory and then checks + the output of the count() function is as expected. + """ + self.assertEqual(self.message_directory.count(), 0) + self.message_directory.add("FOO") + self.assertEqual(self.message_directory.count(), 1) + self.message_directory.add("BAR") + self.assertEqual(self.message_directory.count(), 2) + + def test_lock(self): + """ + Test the lock method of the MessageDirectory class. + + This test checks the lock method returns true for any file. + """ + self.assertTrue(self.message_directory.lock("any file")) + + def test_purge(self): + """ + Test the purge method of the MessageDirectory class. + + This test only checks the purge method is callable without error, + as the purge method only logs that it has been called. + """ + self.message_directory.purge() + + def test_remove(self): + """ + Test the remove method of the MessageDirectory class. + + This test adds a file, removes the file and then checks + the number of files present. + """ + # Check the directory starts empty + self.assertEqual(self.message_directory.count(), 0) + # Add some files to the MessageDirectory. + file_name = self.message_directory.add("FOO") + self.assertEqual(self.message_directory.count(), 1) + # Use the remove method to delete the recently added file. + self.message_directory.remove(file_name) + # Check the count method returns the expected value. + self.assertEqual(self.message_directory.count(), 0) + + def tearDown(self): + """Remove test directory and all contents.""" + try: + shutil.rmtree(self.tmp_dir) + except OSError as error: + print('Error removing temporary directory %s' % self.tmp_dir) + print(error) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_ssm.py b/test/test_ssm.py index 1dcbbaca..896adba6 100644 --- a/test/test_ssm.py +++ b/test/test_ssm.py @@ -10,6 +10,7 @@ import unittest from subprocess import call +from ssm.message_directory import MessageDirectory from ssm.ssm2 import Ssm2, Ssm2Exception @@ -43,7 +44,8 @@ def setUp(self): # The subject has been hardcoded so that the generated # certificate subject matches the subject of the hardcoded, # expired, certificate at the bottom of this file. - call(['openssl', 'req', '-x509', '-nodes', '-days', '1', '-new', + # 2 days used so that verify_cert_date doesn't think it expires soon. + call(['openssl', 'req', '-x509', '-nodes', '-days', '2', '-new', '-key', self._key_path, '-out', TEST_CERT_FILE, '-subj', '/C=UK/O=STFC/OU=SC/CN=Test Cert']) @@ -65,7 +67,7 @@ def tearDown(self): except OSError, e: print 'Error removing temporary directory %s' % self._tmp_dir print e - + def test_on_message(self): ''' This is quite a complicated method, so it would take a long time @@ -94,8 +96,8 @@ def test_on_message(self): def test_init_expired_cert(self): """Test right exception is thrown creating an SSM with expired cert.""" - expected_error = ('Certificate %s has expired.' - % self._expired_cert_path) + expected_error = ('Certificate %s has expired or will expire ' + 'within a day.' % self._expired_cert_path) try: # Indirectly test crypto.verify_cert_date Ssm2(self._brokers, self._msgdir, self._expired_cert_path, @@ -120,6 +122,19 @@ def test_init_expired_server_cert(self): # verify_enc_cert is set to False as we don't want to risk raising an # exception by failing cert verification. + def test_ssm_init_non_dirq(self): + """Test a SSM can be initialised with support for non-dirq sending.""" + try: + ssm = Ssm2(self._brokers, self._msgdir, TEST_CERT_FILE, + self._key_path, dest=self._dest, listen=None, + path_type='directory') + except Ssm2Exception as error: + self.fail('An error occured trying to create an SSM using ' + 'the non-dirq functionality: %s.' % error) + + # Assert the outbound queue is of the expected type. + self.assertTrue(isinstance(ssm._outq, MessageDirectory)) + TEST_CERT_FILE = '/tmp/test.crt'