Skip to content

Commit

Permalink
Merge pull request #416 from jeremycline/fedmsg-check
Browse files Browse the repository at this point in the history
Add a fedmsg-check command
  • Loading branch information
jeremycline committed Jun 9, 2017
2 parents 97db0c6 + c35eff1 commit 5899512
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 0 deletions.
97 changes: 97 additions & 0 deletions fedmsg/commands/check.py
@@ -0,0 +1,97 @@
# This file is part of fedmsg.
# Copyright (C) 2017 Red Hat, Inc.
#
# fedmsg is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# fedmsg is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with fedmsg; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Authors: Jeremy Cline <jeremy@jcline.org>
"""
The ``fedmsg-check`` command can be used to see what fedmsg consumers and
producers are currently active. It requires that the ``moksha.monitoring.socket``
is defined in the fedmsg configuration.
"""
from gettext import gettext as _
import json

import click
import zmq

from fedmsg.config import load_config


TIMEOUT_HELP = _("The number of seconds to wait for a heartbeat from the fedmsg hub."
" The default is 10 seconds.")
CONSUMER_HELP = _("The name of the consumer to check the status of.")
PRODUCER_HELP = _("The name of the producer to check the status of.")


@click.command()
@click.option('--timeout', default=10, type=click.INT, help=TIMEOUT_HELP)
@click.option('--consumer', '-c', multiple=True, help=CONSUMER_HELP)
@click.option('--producer', '-p', multiple=True, help=PRODUCER_HELP)
def check(timeout, consumer=None, producer=None):
"""This command is used to check the status of consumers and producers.
If no consumers and producers are provided, the status of all consumers and producers
is printed.
"""
# It's weird to say --consumers, but there are multiple, so rename the variables
consumers, producers = consumer, producer

config = load_config()
endpoint = config.get('moksha.monitoring.socket')
if not endpoint:
raise click.ClickException('No monitoring endpoint has been configured: '
'please set "moksha.monitoring.socket"')

context = zmq.Context.instance()
socket = context.socket(zmq.SUB)
# ZMQ takes the timeout in milliseconds
socket.set(zmq.RCVTIMEO, timeout * 1000)
socket.subscribe(b'')
socket.connect(endpoint)

try:
message = socket.recv_json()
except zmq.error.Again:
raise click.ClickException(
'Failed to receive message from the monitoring endpoint ({e}) in {t} '
'seconds.'.format(e=endpoint, t=timeout))

if not consumers and not producers:
click.echo('No consumers or producers specified so all will be shown.')
else:
missing = False
uninitialized = False
for messager_type, messagers in (('consumers', consumers), ('producers', producers)):
active = {}
for messager in message[messager_type]:
active[messager['name']] = messager
for messager in messagers:
if messager not in active:
click.echo('"{m}" is not active!'.format(m=messager), err=True)
missing = True
else:
if active[messager]['initialized'] is not True:
click.echo('"{m}" is not initialized!'.format(m=messager), err=True)
uninitialized = True

if missing:
raise click.ClickException('Some consumers and/or producers are missing!')
elif uninitialized:
raise click.ClickException('Some consumers and/or producers are uninitialized!')
else:
click.echo('All consumers and producers are active!')

click.echo(json.dumps(message, indent=2, sort_keys=True))
155 changes: 155 additions & 0 deletions fedmsg/tests/test_commands.py
@@ -1,13 +1,16 @@
from datetime import datetime
import resource
import sys
import threading
import unittest
import time
import json
import os

from click.testing import CliRunner
from nose.tools import eq_
import six
import zmq

import fedmsg
import fedmsg.core
Expand All @@ -18,6 +21,7 @@
from fedmsg.commands.tail import TailCommand
from fedmsg.commands.relay import RelayCommand
from fedmsg.commands.config import config as config_command
from fedmsg.commands.check import check
import fedmsg.consumers.relay
from fedmsg.tests.test_utils import mock

Expand Down Expand Up @@ -283,3 +287,154 @@ def test_setting_nofile_limit_os_fail(self, mock_setrlimit, *unused_mocks):
hub_command.log = mock.Mock()
hub_command.set_rlimit_nofiles(limit=1844674407370)
hub_command.log.warning.assert_called_once()


class CheckTests(unittest.TestCase):
"""Tests for the :class:`fedmsg.commands.check.CheckCommand`."""

def setUp(self):
self.report = {
"consumers": [
{
"backlog": 0,
"exceptions": 0,
"headcount_in": 0,
"headcount_out": 0,
"initialized": True,
"jsonify": True,
"module": "test.consumers",
"name": "TestConsumer",
"times": [],
"topic": ['test']
},
],
"producers": [
{
"exceptions": 0,
"frequency": 5,
"initialized": True,
"last_ran": 1496780847.269628,
"module": "moksha.hub.monitoring",
"name": "MonitoringProducer",
"now": False,
}
]
}
self.context = zmq.Context.instance()
self.socket = self.context.socket(zmq.PUB)
self.port = self.socket.bind_to_random_port('tcp://127.0.0.1', max_tries=1000)

def tearDown(self):
self.context.destroy()

def test_no_monitor_endpoint(self):
"""Assert that when no endpoint for monitoring is configured, users are informed."""
expected_error = (
u'Error: No monitoring endpoint has been configured: please set '
u'"moksha.monitoring.socket"\n'
)
runner = CliRunner()
result = runner.invoke(check, [])
self.assertEqual(1, result.exit_code)
self.assertEqual(expected_error, result.output)

@mock.patch('fedmsg.commands.check.load_config')
def test_socket_timeout(self, mock_load_config):
"""Assert when no message is received a timeout is hit."""
mock_load_config.return_value = {
'moksha.monitoring.socket': 'tcp://127.0.0.1:' + str(self.port)
}
expected_error = (
u'Error: Failed to receive message from the monitoring endpoint'
u' (tcp://127.0.0.1:{p}) in 1 seconds.\n'.format(p=self.port)
)
runner = CliRunner()

result = runner.invoke(check, ['--timeout=1'])

self.assertEqual(1, result.exit_code)
self.assertEqual(expected_error, result.output)

@mock.patch('fedmsg.commands.check.load_config')
def test_no_consumers_or_producers(self, mock_load_config):
"""Assert that when no consumers or producers are specified, all are displayed."""
mock_load_config.return_value = {
'moksha.monitoring.socket': 'tcp://127.0.0.1:' + str(self.port)
}
runner = CliRunner()
expected_output = (u'No consumers or producers specified so all will be shown.'
u'\n{r}\n'.format(r=json.dumps(self.report, indent=2, sort_keys=True)))

def send_report():
self.socket.send(json.dumps(self.report))

threading.Timer(2.0, send_report).start()
result = runner.invoke(check, [])

self.assertEqual(0, result.exit_code)
self.assertEqual(expected_output, result.output)

@mock.patch('fedmsg.commands.check.load_config')
def test_missing(self, mock_load_config):
"""
Assert the command has a non-zero exit when a consumer is missing
from the list of active consumers.
"""
mock_load_config.return_value = {
'moksha.monitoring.socket': 'tcp://127.0.0.1:' + str(self.port)
}
runner = CliRunner()
expected_output = (
u'"MissingConsumer" is not active!\n'
u'Error: Some consumers and/or producers are missing!\n'
)

def send_report():
self.socket.send(json.dumps(self.report))

threading.Timer(2.0, send_report).start()
result = runner.invoke(check, ['--consumer=MissingConsumer'])

self.assertEqual(1, result.exit_code)
self.assertEqual(expected_output, result.output)

@mock.patch('fedmsg.commands.check.load_config')
def test_uninitialized(self, mock_load_config):
"""Assert the command has a non-zero exit when a consumer is not initialized."""
mock_load_config.return_value = {
'moksha.monitoring.socket': 'tcp://127.0.0.1:' + str(self.port)
}
runner = CliRunner()
expected_output = (
u'"TestConsumer" is not initialized!\n'
u'Error: Some consumers and/or producers are uninitialized!\n'
)
self.report['consumers'][0]['initialized'] = False

def send_report():
self.socket.send(json.dumps(self.report))

threading.Timer(2.0, send_report).start()
result = runner.invoke(check, ['--consumer=TestConsumer'])

self.assertEqual(1, result.exit_code)
self.assertEqual(expected_output, result.output)

@mock.patch('fedmsg.commands.check.load_config')
def test_all_good(self, mock_load_config):
"""Assert if all consumers/producers are present, the command exits 0."""
mock_load_config.return_value = {
'moksha.monitoring.socket': 'tcp://127.0.0.1:' + str(self.port)
}
runner = CliRunner()
expected_output = (u'All consumers and producers are active!\n'
u'{r}\n'.format(r=json.dumps(self.report, indent=2, sort_keys=True)))

def send_report():
self.socket.send(json.dumps(self.report))

threading.Timer(2.0, send_report).start()
result = runner.invoke(check, ['--consumer=TestConsumer'])

self.assertEqual(0, result.exit_code)
self.assertEqual(expected_output, result.output)
2 changes: 2 additions & 0 deletions setup.py
Expand Up @@ -81,6 +81,7 @@
'commands': [
'pygments',
'psutil',
'click',
],
'consumers': [
'moksha.hub>=1.3.0',
Expand Down Expand Up @@ -169,6 +170,7 @@
"fedmsg-announce=fedmsg.commands.announce:announce [commands]",
"fedmsg-trigger=fedmsg.commands.trigger:trigger [commands]",
"fedmsg-dg-replay=fedmsg.commands.replay:replay [commands]",
"fedmsg-check=fedmsg.commands.check:check [commands]",
#"fedmsg-config=fedmsg.commands.config:config [commands]",
"fedmsg-hub=fedmsg.commands.hub:hub [consumers]",
"fedmsg-relay=fedmsg.commands.relay:relay [consumers]",
Expand Down

0 comments on commit 5899512

Please sign in to comment.