Skip to content

Commit

Permalink
Use pipe between ceph backup diff export/import
Browse files Browse the repository at this point in the history
We now use a piped transfer between the rbd export-diff
and import-diff for incremental backups/restores as
opposed to holding the entire diff in memory.

Change-Id: I33476d9b3934781413af5cd2867a11d825a5d78e
Fixes: bug 1244464
(cherry picked from commit d384d28)
  • Loading branch information
dosaboy committed Oct 30, 2013
1 parent 243ad85 commit 022f1df
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 35 deletions.
60 changes: 42 additions & 18 deletions cinder/backup/drivers/ceph.py
Expand Up @@ -41,8 +41,10 @@
restore to a new volume (default).
"""

import fcntl
import os
import re
import subprocess
import time

import eventlet
Expand All @@ -51,7 +53,6 @@
from cinder.backup.driver import BackupDriver
from cinder import exception
from cinder.openstack.common import log as logging
from cinder.openstack.common import processutils
from cinder import units
from cinder import utils
import cinder.volume.drivers.rbd as rbd_driver
Expand Down Expand Up @@ -410,6 +411,36 @@ def _try_delete_base_image(self, backup_id, volume_id, base_name=None):
finally:
src_rbd.close()

def _piped_execute(self, cmd1, cmd2):
"""Pipe output of cmd1 into cmd2."""
LOG.debug("piping cmd1='%s' into..." % (' '.join(cmd1)))
LOG.debug("cmd2='%s'" % (' '.join(cmd2)))

try:
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except OSError as e:
LOG.error("pipe1 failed - %s " % unicode(e))
raise

# NOTE(dosaboy): ensure that the pipe is blocking. This is to work
# around the case where evenlet.green.subprocess is used which seems to
# use a non-blocking pipe.
flags = fcntl.fcntl(p1.stdout, fcntl.F_GETFL) & (~os.O_NONBLOCK)
fcntl.fcntl(p1.stdout, fcntl.F_SETFL, flags)

try:
p2 = subprocess.Popen(cmd2, stdin=p1.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except OSError as e:
LOG.error("pipe2 failed - %s " % unicode(e))
raise

p1.stdout.close()
stdout, stderr = p2.communicate()
return p2.returncode, stderr

def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool,
src_user, src_conf, dest_user, dest_conf,
src_snap=None, from_snap=None):
Expand All @@ -430,29 +461,22 @@ def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool,
src_ceph_args = self._ceph_args(src_user, src_conf, pool=src_pool)
dest_ceph_args = self._ceph_args(dest_user, dest_conf, pool=dest_pool)

cmd = ['rbd', 'export-diff'] + src_ceph_args
cmd1 = ['rbd', 'export-diff'] + src_ceph_args
if from_snap is not None:
cmd.extend(['--from-snap', from_snap])
cmd1.extend(['--from-snap', from_snap])
if src_snap:
path = self._utf8("%s/%s@%s" % (src_pool, src_name, src_snap))
else:
path = self._utf8("%s/%s" % (src_pool, src_name))
cmd.extend([path, '-'])
try:
out, err = self._execute(*cmd)
except (processutils.ProcessExecutionError,
processutils.UnknownArgumentError) as exc:
msg = _("rbd export-diff failed - %s") % (str(exc))
LOG.info(msg)
raise exception.BackupRBDOperationFailed(msg)
cmd1.extend([path, '-'])

cmd = ['rbd', 'import-diff'] + dest_ceph_args
cmd.extend(['-', self._utf8("%s/%s" % (dest_pool, dest_name))])
try:
out, err = self._execute(*cmd, process_input=out)
except (processutils.ProcessExecutionError,
processutils.UnknownArgumentError) as exc:
msg = _("rbd import-diff failed - %s") % (str(exc))
cmd2 = ['rbd', 'import-diff'] + dest_ceph_args
cmd2.extend(['-', self._utf8("%s/%s" % (dest_pool, dest_name))])

ret, stderr = self._piped_execute(cmd1, cmd2)
if ret:
msg = (_("rbd diff op failed - (ret=%(ret)s stderr=%(stderr)s)") %
({'ret': ret, 'stderr': stderr}))
LOG.info(msg)
raise exception.BackupRBDOperationFailed(msg)

Expand Down
79 changes: 62 additions & 17 deletions cinder/tests/test_backup_ceph.py
Expand Up @@ -14,8 +14,10 @@
# under the License.
""" Tests for Ceph backup service """

import fcntl
import hashlib
import os
import subprocess
import tempfile
import time
import uuid
Expand Down Expand Up @@ -60,6 +62,31 @@ def _get_wrapped_rbd_io(self, rbd_image):
'user_foo', 'conf_foo')
return rbddriver.RBDImageIOWrapper(rbd_meta)

def _setup_mock_popen(self, inst, retval=None, p1hook=None, p2hook=None):
class stdout(object):
def close(self):
inst.called.append('stdout_close')

class FakePopen(object):

PASS = 0

def __init__(self, cmd, *args, **kwargs):
inst.called.append('popen_init')
self.stdout = stdout()
self.returncode = 0
self.__class__.PASS += 1
if self.__class__.PASS == 1 and p1hook:
p1hook()
elif self.__class__.PASS == 2 and p2hook:
p2hook()

def communicate(self):
inst.called.append('communicate')
return retval

self.stubs.Set(subprocess, 'Popen', FakePopen)

def setUp(self):
super(BackupCephTestCase, self).setUp()
self.ctxt = context.get_admin_context()
Expand Down Expand Up @@ -101,6 +128,13 @@ def setUp(self):
self.stubs.Set(time, 'time', self.time_inc)
self.stubs.Set(eventlet, 'sleep', lambda *args: None)

# Used to collect info on what was called during a test
self.called = []

# Do this to ensure that any test ending up in a subprocess fails if
# not properly mocked.
self.stubs.Set(subprocess, 'Popen', None)

def test_get_rbd_support(self):
self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_LAYERING'))
self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_STRIPINGV2'))
Expand Down Expand Up @@ -327,21 +361,30 @@ def test_backup_volume_from_rbd(self):
self.stubs.Set(self.service, '_try_delete_base_image',
lambda *args, **kwargs: None)

self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0)

with tempfile.NamedTemporaryFile() as test_file:
checksum = hashlib.sha256()

def write_data(inst, data, offset):
def write_data():
self.volume_file.seek(0)
data = self.volume_file.read(self.length)
self.called.append('write')
checksum.update(data)
test_file.write(data)

def read_data(inst, offset, length):
def read_data():
self.called.append('read')
return self.volume_file.read(self.length)

def rbd_list(inst, ioctx):
self.called.append('list')
return [backup_name]

self.stubs.Set(self.service.rbd.Image, 'read', read_data)
self.stubs.Set(self.service.rbd.Image, 'write', write_data)
self._setup_mock_popen(self, ['out', 'err'],
p1hook=read_data,
p2hook=write_data)

self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)

self.stubs.Set(self.service, '_discard_bytes',
Expand All @@ -354,6 +397,10 @@ def rbd_list(inst, ioctx):

self.service.backup(backup, rbd_io)

self.assertEquals(self.called, ['list', 'popen_init', 'read',
'popen_init', 'write',
'stdout_close', 'communicate'])

# Ensure the files are equal
self.assertEqual(checksum.digest(), self.checksum.digest())

Expand Down Expand Up @@ -463,15 +510,12 @@ def remove_snap(*args):
self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)

# Must be something mutable
remove_called = []

def remove(inst, ioctx, name):
remove_called.append(True)
self.called.append('remove')

self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.service.delete(backup)
self.assertTrue(remove_called[0])
self.assertEquals(self.called, ['remove'])

def test_try_delete_base_image(self):
# don't create volume db entry since it should not be required
Expand All @@ -486,18 +530,15 @@ def rbd_list(inst, ioctx):

self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)

# Must be something mutable
remove_called = []

self.stubs.Set(self.service, 'get_backup_snaps',
lambda *args, **kwargs: None)

def remove(inst, ioctx, name):
remove_called.append(True)
self.called.append('remove')

self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.service.delete(backup)
self.assertTrue(remove_called[0])
self.assertEquals(self.called, ['remove'])

def test_try_delete_base_image_busy(self):
"""This should induce retries then raise rbd.ImageBusy."""
Expand All @@ -513,9 +554,6 @@ def rbd_list(inst, ioctx):

self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)

# Must be something mutable
remove_called = []

self.stubs.Set(self.service, 'get_backup_snaps',
lambda *args, **kwargs: None)

Expand Down Expand Up @@ -616,6 +654,13 @@ def test_diff_restore_allowed_false(self):
self.assertEqual(resp, not_allowed)
self._set_service_stub('_file_is_rbd', True)

def test_piped_execute(self):
self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0)
self._setup_mock_popen(self, ['out', 'err'])
self.service._piped_execute(['foo'], ['bar'])
self.assertEquals(self.called, ['popen_init', 'popen_init',
'stdout_close', 'communicate'])

def tearDown(self):
self.volume_file.close()
self.stubs.UnsetAll()
Expand Down

0 comments on commit 022f1df

Please sign in to comment.