Skip to content

Commit

Permalink
Merge 4fd0d28 into 7a1e2a9
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcorbett committed Jan 7, 2019
2 parents 7a1e2a9 + 4fd0d28 commit 968d7a9
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 9 deletions.
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ The python daemon library (N.B. only versions below 2.2.0 are currently supporte
The python ldap library
* `yum install python-ldap`

The python dirq library
Optionally, the python dirq library (N.B. this is only required if your messages
are stored in a dirq structure)
* `yum install python-dirq`

You need a certificate and key in PEM format accessible to the SSM.
Expand Down Expand Up @@ -119,26 +120,38 @@ configuration will send messages to the test apel server.

## Adding Files

There are two ways to add files to be sent:
There are multiple manual and programmatic ways to add files to be sent:

### Manual

#### With the dirq module
All file and directory names must use hex characters: `[0-9a-f]`.

* Create a directory within /var/spool/apel/outgoing with a name
of EIGHT hex characters e.g. `12345678`
* Put files in this directory with names of FOURTEEN hex
e.g. `1234567890abcd`

#### Without the dirq module
Ensure `path_type: directory` is set in your `sender.cfg`.
Then add messages as files to `/var/spool/apel/outgoing`,
there are no restrictions on the file names used.

### Programmatic

#### With the dirq module
Use the python or perl dirq libraries:
* python: http://pypi.python.org/pypi/dirq
* perl: http://search.cpan.org/~lcons/Directory-Queue/

Create a QueueSimple object with path /var/spool/apel/outgoing/ and
add your messages.

#### Without the dirq module
Use the `MessageDirectory` class provided in `ssm.message_directory`.

Create a `MessageDirectory` object with path `/var/spool/apel/outgoing/` and
add your messages using the `add` method.

## Running the SSM

Expand Down
2 changes: 1 addition & 1 deletion apel-ssm.spec
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ BuildArch: noarch
BuildRequires: python-devel
%endif

Requires: stomppy >= 3.1.1, python-daemon < 2.2.0, python-dirq, python-ldap
Requires: stomppy >= 3.1.1, python-daemon < 2.2.0, python-ldap
Requires(pre): shadow-utils

%define ssmconf %_sysconfdir/apel
Expand Down
11 changes: 10 additions & 1 deletion bin/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,18 @@ def main():
raise Ssm2Exception('No destination queue is configured.')
except ConfigParser.NoOptionError, e:
raise Ssm2Exception(e)


# Determine what type of message store we are interacting with,
# i.e. a dirq QueueSimple object or a plain MessageDirectory directory.
try:
path_type = cp.get('messaging', 'path_type')
except ConfigParser.NoOptionError:
log.info('No path type defined, assuming dirq.')
path_type = 'dirq'

sender = Ssm2(brokers,
cp.get('messaging','path'),
path_type=path_type,
cert=cp.get('certificates','certificate'),
key=cp.get('certificates','key'),
dest=cp.get('messaging','destination'),
Expand Down
7 changes: 7 additions & 0 deletions conf/sender.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ destination:

# Outgoing messages will be read and removed from this directory.
path: /var/spool/apel/outgoing
# If 'path_type' is set to 'dirq' (or if 'path_type' is omitted), the supplied
# 'path' will be treated as a Python dirq (a directory based queue, which is a
# port of the Perl module Directory::Queue).
# If 'path_type' is set to 'directory', the supplied 'path' will be treated
# as if it is a directory rather than a dirq.
# As a result, 'path' cannot contain subdirectories.
path_type: dirq

[logging]
logfile: /var/log/apel/ssmsend.log
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
stomp.py>=3.1.1
python-daemon<2.2.0
python-ldap
# Dependencies for optional dirq based sending
dirq
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ def main():
url='http://apel.github.io/',
download_url='https://github.com/apel/ssm/releases',
license='Apache License, Version 2.0',
install_requires=['stomp.py>=3.1.1', 'python-ldap', 'dirq'],
install_requires=['stomp.py>=3.1.1', 'python-ldap'],
extras_require={
'python-daemon': ['python-daemon<2.2.0'],
'dirq': ['dirq'],
},
packages=find_packages(exclude=['bin', 'test']),
scripts=['bin/ssmreceive', 'bin/ssmsend'],
Expand Down
127 changes: 127 additions & 0 deletions ssm/message_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright 2018 Science and Technology Facilities Council
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains the MessageDirectory class."""

import logging
import os
import uuid

# logging configuration
log = logging.getLogger(__name__)


class MessageDirectory(object):
"""A structure for holding Accounting messages in a directory."""

def __init__(self, path):
"""Create a new directory structure for holding Accounting messages."""
self.directory_path = path

def add(self, data):
"""Add the passed data to a new file and return it's name."""
# Create a unique file name so APEL admins can pair sent and recieved
# messages easily (as the file name appears in the sender and receiver
# logs as the message ID).
name = uuid.uuid4()

# Open the file and write the provided data into the file.
with open("%s/%s" % (self.directory_path, name), 'w') as message:
message.write(data)

# Return the name of the created file as a string,
# to keep the dirq like interface.
return "%s" % name

def count(self):
"""
Return the number of elements in the queue.
Regardless of their state.
"""
return len(self._get_messages())

def get(self, name):
"""Return the content of the named message."""
with open("%s/%s" % (self.directory_path, name)) as message:
content = message.read()
return content

def lock(self, _name):
"""Return True to simulate a successful lock. Does nothing else."""
return True

def purge(self):
"""
Do nothing, as there are no old/intermediate directories to purge.
Only included to preserve dirq interface.
"""
log.debug("purge called, but purge does nothing for non-dirq sending.")

def remove(self, name):
"""Remove the named message."""
os.unlink("%s/%s" % (self.directory_path, name))

def _get_messages(self, sort_by_mtime=False):
"""
Get the messages stored in this MessageDirectory.
if sort_by_mtime is set to True, the returned list is guaranteed to be
in increasing order of modification time.
mtime is used because (apparently) there is not way to find the
original date of file creation due to a limitation
of the underlying filesystem
"""
try:
# Get a list of files under self.directory_path
# in an arbitrary order.
file_name_list = os.listdir(self.directory_path)

if sort_by_mtime:
# Working space to hold the unsorted messages
# as file paths and mtimes.
unsorted_messages = []
# Working space to hold the sorted messages as file names.
sorted_messages = []

# Work out the mtime of each file.
for file_name in file_name_list:
file_path = os.path.join(self.directory_path, file_name)
# Store the file path and the time
# the file was last modified.
unsorted_messages.append((file_name,
os.path.getmtime(file_path)))

# Sort the file paths by mtime and
# then only store the file name.
for (file_name, _mtime) in sorted(unsorted_messages,
key=lambda tup: tup[1]):
# Store the sorted file paths in a class element.
sorted_messages.append(file_name)

# Return the sorted list.
return sorted_messages

# If we get here, just return the arbitrarily ordered list.
return file_name_list

except (IOError, OSError) as error:
log.error(error)
# Return an empty file list.
return []

def __iter__(self):
"""Return an iterable of files currently in the MessageDirectory."""
return self._get_messages(sort_by_mtime=True).__iter__()
35 changes: 31 additions & 4 deletions ssm/ssm2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@
ssl = None

from ssm import crypto
from dirq.QueueSimple import QueueSimple
from dirq.queue import Queue
from ssm.message_directory import MessageDirectory

try:
from dirq.QueueSimple import QueueSimple
from dirq.queue import Queue
except ImportError:
# ImportError is raised later on if dirq is requested but not installed.
QueueSimple = None
Queue = None

import stomp
from stomp.exception import ConnectFailedException
Expand Down Expand Up @@ -57,7 +64,8 @@ class Ssm2(stomp.ConnectionListener):

def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None,
capath=None, check_crls=False, use_ssl=False, username=None, password=None,
enc_cert=None, verify_enc_cert=True, pidfile=None):
enc_cert=None, verify_enc_cert=True, pidfile=None,
path_type='dirq'):
'''
Creates an SSM2 object. If a listen value is supplied,
this SSM2 will be a receiver.
Expand Down Expand Up @@ -86,10 +94,29 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None,

# create the filesystem queues for accepted and rejected messages
if dest is not None and listen is None:
self._outq = QueueSimple(qpath)
# Determine what sort of outgoing structure to make
if path_type == 'dirq':
if QueueSimple is None:
raise ImportError("dirq path_type requested but the dirq "
"module wasn't found.")

self._outq = QueueSimple(qpath)

elif path_type == 'directory':
self._outq = MessageDirectory(qpath)
else:
raise Ssm2Exception('Unsupported path_type variable.')

elif listen is not None:
inqpath = os.path.join(qpath, 'incoming')
rejectqpath = os.path.join(qpath, 'reject')

# Receivers must use the dirq module, so make a quick sanity check
# that dirq is installed.
if Queue is None:
raise ImportError("Receiving SSMs must use dirq, but the dirq "
"module wasn't found.")

self._inq = Queue(inqpath, schema=Ssm2.QSCHEMA)
self._rejectq = Queue(rejectqpath, schema=Ssm2.REJECT_SCHEMA)
else:
Expand Down

0 comments on commit 968d7a9

Please sign in to comment.