Skip to content

Commit

Permalink
Merge "Update cinder-volume to enable multi volume support"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Feb 18, 2013
2 parents a497e7a + 6c708d1 commit 76b5839
Show file tree
Hide file tree
Showing 38 changed files with 828 additions and 207 deletions.
17 changes: 14 additions & 3 deletions bin/cinder-volume
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,21 @@ from cinder.openstack.common import log as logging
from cinder import service
from cinder import utils

FLAGS = flags.FLAGS

if __name__ == '__main__':
flags.parse_args(sys.argv)
logging.setup("cinder")
utils.monkey_patch()
server = service.Service.create(binary='cinder-volume')
service.serve(server)
service.wait()
launcher = service.ProcessLauncher()
if FLAGS.enabled_backends:
for backend in FLAGS.enabled_backends:
host = "%s@%s" % (FLAGS.host, backend)
server = service.Service.create(
host=host,
service_name=backend)
launcher.launch_server(server)
else:
server = service.Service.create(binary='cinder-volume')
launcher.launch_server(server)
launcher.wait()
7 changes: 6 additions & 1 deletion cinder/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ def _get_my_ip():
'and deprecated.'),
cfg.StrOpt('control_exchange',
default='cinder',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ]
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.ListOpt('enabled_backends',
default=None,
help='A list of backend names to use. These backend names '
'should be backed by a unique [CONFIG] group '
'with its options'), ]

FLAGS.register_opts(global_opts)
3 changes: 2 additions & 1 deletion cinder/scheduler/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class SchedulerManager(manager.Manager):

RPC_API_VERSION = '1.2'

def __init__(self, scheduler_driver=None, *args, **kwargs):
def __init__(self, scheduler_driver=None, service_name=None,
*args, **kwargs):
if not scheduler_driver:
scheduler_driver = FLAGS.scheduler_driver
self.driver = importutils.import_object(scheduler_driver)
Expand Down
194 changes: 190 additions & 4 deletions cinder/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

"""Generic Node base class for all workers that run on hosts."""

import errno
import inspect
import os
import random
import signal
import sys
import time

import eventlet
import greenlet
Expand Down Expand Up @@ -65,6 +68,12 @@
FLAGS.register_opts(service_opts)


class SignalExit(SystemExit):
def __init__(self, signo, exccode=1):
super(SignalExit, self).__init__(exccode)
self.signo = signo


class Launcher(object):
"""Launch one or more services and wait for them to complete."""

Expand Down Expand Up @@ -128,6 +137,180 @@ def sigterm(sig, frame):
pass


class ServerWrapper(object):
def __init__(self, server, workers):
self.server = server
self.workers = workers
self.children = set()
self.forktimes = []


class ProcessLauncher(object):
def __init__(self):
self.children = {}
self.sigcaught = None
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')

signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)

def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False

# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)

def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read()

LOG.info(_('Parent process has died unexpectedly, exiting'))

sys.exit(1)

def _child_process(self, server):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)

signal.signal(signal.SIGTERM, _sigterm)
# Block SIGINT and let the parent send us a SIGTERM
# signal.signal(signal.SIGINT, signal.SIG_IGN)
# This differs from the behavior in nova in that we dont ignore this
# It allows the non-wsgi services to be terminated properly
signal.signal(signal.SIGINT, _sigterm)

# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()

# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn(self._pipe_watcher)

# Reseed random number generator
random.seed()

launcher = Launcher()
launcher.run_server(server)

def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(_('Forking too fast, sleeping'))
time.sleep(1)

wrap.forktimes.pop(0)

wrap.forktimes.append(time.time())

pid = os.fork()
if pid == 0:
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
status = 0
try:
self._child_process(wrap.server)
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
wrap.server.stop()

os._exit(status)

LOG.info(_('Started child %d'), pid)

wrap.children.add(pid)
self.children[pid] = wrap

return pid

def launch_server(self, server, workers=1):
wrap = ServerWrapper(server, workers)

LOG.info(_('Starting %d workers'), wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)

def _wait_child(self):
try:
# Don't block if no child processes have exited
pid, status = os.waitpid(0, os.WNOHANG)
if not pid:
return None
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
return None

if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
else:
code = os.WEXITSTATUS(status)
LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())

if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid)
return None

wrap = self.children.pop(pid)
wrap.children.remove(pid)
return wrap

def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
while self.running:
wrap = self._wait_child()
if not wrap:
# Yield to other threads if no children have exited
# Sleep for a short time to avoid excessive CPU usage
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue

while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)

if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame)

for pid in self.children:
try:
os.kill(pid, signal.SIGTERM)
except OSError as exc:
if exc.errno != errno.ESRCH:
raise

# Wait for children to die
if self.children:
LOG.info(_('Waiting on %d children to exit'), len(self.children))
while self.children:
self._wait_child()


class Service(object):
"""Service object for binaries running on hosts.
Expand All @@ -137,13 +320,15 @@ class Service(object):

def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
*args, **kwargs):
service_name=None, *args, **kwargs):
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *args, **kwargs)
self.manager = manager_class(host=self.host,
service_name=service_name,
*args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
Expand Down Expand Up @@ -217,7 +402,7 @@ def __getattr__(self, key):
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None):
periodic_fuzzy_delay=None, service_name=None):
"""Instantiates class and passes back application object.
:param host: defaults to FLAGS.host
Expand Down Expand Up @@ -247,7 +432,8 @@ def create(cls, host=None, binary=None, topic=None, manager=None,
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay)
periodic_fuzzy_delay=periodic_fuzzy_delay,
service_name=service_name)

return service_obj

Expand Down
10 changes: 9 additions & 1 deletion cinder/tests/test_HpSanISCSIDriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import mox

from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
from cinder.volume import configuration as conf
from cinder.volume.drivers.san.hp_lefthand import HpSanISCSIDriver

LOG = logging.getLogger(__name__)
Expand All @@ -27,7 +31,11 @@ def setUp(self):
self._fake_cliq_run)
self.stubs.Set(HpSanISCSIDriver, "_get_iscsi_properties",
self._fake_get_iscsi_properties)
self.driver = HpSanISCSIDriver()
configuration = mox.MockObject(conf.Configuration)
configuration.san_is_local = False
configuration.append_config_values(mox.IgnoreArg())

self.driver = HpSanISCSIDriver(configuration=configuration)
self.volume_name = "fakevolume"
self.connector = {'ip': '10.0.0.2',
'initiator': 'iqn.1993-08.org.debian:01:222',
Expand Down
11 changes: 10 additions & 1 deletion cinder/tests/test_coraid.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
# License for the specific language governing permissions and limitations
# under the License.

import mox

from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
from cinder.volume import configuration as conf
from cinder.volume.drivers import coraid
from cinder.volume.drivers.coraid import CoraidDriver
from cinder.volume.drivers.coraid import CoraidRESTClient
Expand Down Expand Up @@ -91,7 +94,13 @@ def setUp(self):
self.esm_mock = self.mox.CreateMockAnything()
self.stubs.Set(coraid, 'CoraidRESTClient',
lambda *_, **__: self.esm_mock)
self.drv = CoraidDriver()
configuration = mox.MockObject(conf.Configuration)
configuration.append_config_values(mox.IgnoreArg())
configuration.coraid_esm_address = fake_esm_ipaddress
configuration.coraid_user = fake_esm_username
configuration.coraid_password = fake_esm_password

self.drv = CoraidDriver(configuration=configuration)
self.drv.do_setup({})

def test_create_volume(self):
Expand Down
13 changes: 9 additions & 4 deletions cinder/tests/test_emc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@
# License for the specific language governing permissions and limitations
# under the License.

import mox
import os
from xml.dom.minidom import Document

from cinder import exception
from cinder import flags
from cinder.openstack.common import log as logging
from cinder import test
from cinder.volume import configuration as conf
from cinder.volume.drivers.emc.emc_smis_common import EMCSMISCommon
from cinder.volume.drivers.emc.emc_smis_iscsi import EMCSMISISCSIDriver

FLAGS = flags.FLAGS

CINDER_EMC_CONFIG_FILE = '/etc/cinder/cinder_emc_config.xml'
LOG = logging.getLogger(__name__)

config_file_name = 'cinder_emc_config.xml'
Expand Down Expand Up @@ -579,12 +580,16 @@ def setUp(self):
super(EMCSMISISCSIDriverTestCase, self).setUp()
self.config_file_path = None
self.create_fake_config_file()
FLAGS.cinder_emc_config_file = self.config_file_path

configuration = mox.MockObject(conf.Configuration)
configuration.cinder_emc_config_file = self.config_file_path
configuration.append_config_values(mox.IgnoreArg())

self.stubs.Set(EMCSMISISCSIDriver, '_get_iscsi_properties',
self.fake_get_iscsi_properties)
self.stubs.Set(EMCSMISCommon, '_get_ecom_connection',
self.fake_ecom_connection)
driver = EMCSMISISCSIDriver()
driver = EMCSMISISCSIDriver(configuration=configuration)
self.driver = driver

def create_fake_config_file(self):
Expand Down

0 comments on commit 76b5839

Please sign in to comment.