Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 60 additions & 33 deletions supervisor/tests/test_options.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Test suite for supervisor.options"""

import logging
import os
import sys
import tempfile
Expand All @@ -25,6 +26,12 @@
from supervisor.tests.base import DummySocketConfig
from supervisor.tests.base import lstrip

def _getTempFile(name):
prefix = 'supervisor.{0}.'.format(name)
return tempfile.NamedTemporaryFile(prefix=prefix)

logger = logging.getLogger(__name__)

class OptionTests(unittest.TestCase):

def _makeOptions(self, read_error=False):
Expand Down Expand Up @@ -1636,28 +1643,40 @@ def test_make_process_with_group(self):
def test_make_dispatchers_stderr_not_redirected(self):
options = DummyOptions()
instance = self._makeOne(options)
instance.redirect_stderr = False
process1 = DummyProcess(instance)
dispatchers, pipes = instance.make_dispatchers(process1)
self.assertEqual(dispatchers[5].channel, 'stdout')
from supervisor.events import ProcessCommunicationStdoutEvent
self.assertEqual(dispatchers[5].event_type,
ProcessCommunicationStdoutEvent)
self.assertEqual(pipes['stdout'], 5)
self.assertEqual(dispatchers[7].channel, 'stderr')
from supervisor.events import ProcessCommunicationStderrEvent
self.assertEqual(dispatchers[7].event_type,
ProcessCommunicationStderrEvent)
self.assertEqual(pipes['stderr'], 7)
with _getTempFile('stderr_logfile') as stdout_logfile:
with _getTempFile('stderr_logfile') as stderr_logfile:
instance.stdout_logfile = stdout_logfile.name
instance.stderr_logfile = stderr_logfile.name
logger.debug('instance.stdout_logfile = %r',
instance.stdout_logfile)
logger.debug('instance.stderr_logfile = %r',
instance.stderr_logfile)
instance.redirect_stderr = False
process1 = DummyProcess(instance)
dispatchers, pipes = instance.make_dispatchers(process1)
self.assertEqual(dispatchers[5].channel, 'stdout')
from supervisor.events import ProcessCommunicationStdoutEvent
self.assertEqual(dispatchers[5].event_type,
ProcessCommunicationStdoutEvent)
self.assertEqual(pipes['stdout'], 5)
self.assertEqual(dispatchers[7].channel, 'stderr')
from supervisor.events import ProcessCommunicationStderrEvent
self.assertEqual(dispatchers[7].event_type,
ProcessCommunicationStderrEvent)
self.assertEqual(pipes['stderr'], 7)

def test_make_dispatchers_stderr_redirected(self):
options = DummyOptions()
instance = self._makeOne(options)
process1 = DummyProcess(instance)
dispatchers, pipes = instance.make_dispatchers(process1)
self.assertEqual(dispatchers[5].channel, 'stdout')
self.assertEqual(pipes['stdout'], 5)
self.assertEqual(pipes['stderr'], None)
with _getTempFile('stderr_logfile') as stdout_logfile:
instance.stdout_logfile = stdout_logfile.name
logger.debug('instance.stdout_logfile = %r',
instance.stdout_logfile)
process1 = DummyProcess(instance)
dispatchers, pipes = instance.make_dispatchers(process1)
self.assertEqual(dispatchers[5].channel, 'stdout')
self.assertEqual(pipes['stdout'], 5)
self.assertEqual(pipes['stderr'], None)

class FastCGIProcessConfigTest(unittest.TestCase):
def _getTargetClass(self):
Expand Down Expand Up @@ -1697,21 +1716,29 @@ def test_make_process_with_group(self):
def test_make_dispatchers(self):
options = DummyOptions()
instance = self._makeOne(options)
instance.redirect_stderr = False
process1 = DummyProcess(instance)
dispatchers, pipes = instance.make_dispatchers(process1)
self.assertEqual(dispatchers[4].channel, 'stdin')
self.assertEqual(dispatchers[4].closed, True)
self.assertEqual(dispatchers[5].channel, 'stdout')
from supervisor.events import ProcessCommunicationStdoutEvent
self.assertEqual(dispatchers[5].event_type,
ProcessCommunicationStdoutEvent)
self.assertEqual(pipes['stdout'], 5)
self.assertEqual(dispatchers[7].channel, 'stderr')
from supervisor.events import ProcessCommunicationStderrEvent
self.assertEqual(dispatchers[7].event_type,
ProcessCommunicationStderrEvent)
self.assertEqual(pipes['stderr'], 7)
with _getTempFile('stderr_logfile') as stdout_logfile:
with _getTempFile('stderr_logfile') as stderr_logfile:
instance.stdout_logfile = stdout_logfile.name
instance.stderr_logfile = stderr_logfile.name
logger.debug('instance.stdout_logfile = %r',
instance.stdout_logfile)
logger.debug('instance.stderr_logfile = %r',
instance.stderr_logfile)
instance.redirect_stderr = False
process1 = DummyProcess(instance)
dispatchers, pipes = instance.make_dispatchers(process1)
self.assertEqual(dispatchers[4].channel, 'stdin')
self.assertEqual(dispatchers[4].closed, True)
self.assertEqual(dispatchers[5].channel, 'stdout')
from supervisor.events import ProcessCommunicationStdoutEvent
self.assertEqual(dispatchers[5].event_type,
ProcessCommunicationStdoutEvent)
self.assertEqual(pipes['stdout'], 5)
self.assertEqual(dispatchers[7].channel, 'stderr')
from supervisor.events import ProcessCommunicationStderrEvent
self.assertEqual(dispatchers[7].event_type,
ProcessCommunicationStderrEvent)
self.assertEqual(pipes['stderr'], 7)

class ProcessGroupConfigTests(unittest.TestCase):
def _getTargetClass(self):
Expand Down