Skip to content

Commit

Permalink
livemerge: Fix over extend
Browse files Browse the repository at this point in the history
When extending the base volume before merge, we use a dumb calculation
extending the base volume by top_size + chunk_size. This allocate way
too much space which is typically not needed. For active layer merge,
there is no way to reduce the volume after the merge without shutting
down the VM. The result is growing the active volume on every merge,
until it consumes the maximum size.

Fix the issue by measuring the sub-chain from top to base before the
extend. This give the exact size needed to commit the top volume into
the base volume, including the size required for the bitmaps that may be
in the top and base volume.

In the case of active layer merge, this measurement is a heuristic,
since the guest can write data during the measurement, or later during
the merge. We add one chunk of free space to minimize the chance of
pausing a VM during merge. The only way to prevent pausing during merge
is to monitor base volume block threshold during the merge.  This was
not possible in the past, but can be done with current libvirt, but vdsm
thin provisioning code is not ready for this yet.

For internal merge, measuring is exact, and there is no need to leave
free space in the base volume since the top volume is read only.

Fixes: oVirt#188
Related-to: https://bugzilla.redhat.com/1993235
Signed-off-by: Nir Soffer <nsoffer@redhat.com>
  • Loading branch information
nirs authored and erav committed Jun 21, 2022
1 parent f801083 commit 8e7fe47
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 136 deletions.
174 changes: 100 additions & 74 deletions lib/vdsm/virt/livemerge.py
Expand Up @@ -29,6 +29,8 @@
from vdsm.common import concurrent
from vdsm.common import exception
from vdsm.common import logutils
from vdsm.common.config import config
from vdsm.common.units import MiB

from vdsm.virt import errors
from vdsm.virt import virdomain
Expand Down Expand Up @@ -302,14 +304,12 @@ def merge(self, driveSpec, base, top, bandwidth, job_id):
except JobExistsError as e:
raise exception.MergeFailed(str(e), job=job.id)

if self._base_needs_extend(drive, job, base_info):
job.extend = {
"attempt": 1,
"base_size": int(base_info["apparentsize"]),
"top_size": int(top_info["apparentsize"]),
"capacity": int(top_info["capacity"]),
}
self._start_extend(drive, job)
needs_extend, new_size = self._base_needs_extend(
drive, job, base_info)

if needs_extend:
job.extend = {"attempt": 1}
self._start_extend(drive, job, new_size)
else:
self._start_commit(drive, job)

Expand Down Expand Up @@ -420,33 +420,63 @@ def _start_commit(self, drive, job):
raise exception.MergeFailed(str(e), job=job.id)

def _base_needs_extend(self, drive, job, base_info):
# blockCommit will cause data to be written into the base volume.
# Perform an initial extension to ensure there is enough space to
# copy all the required data. Normally we'd use monitoring to extend
# the volume on-demand but internal watermark information is not being
# reported by libvirt so we must do the full extension up front. In
# the worst case, the allocated size of 'base' should be increased by
# the allocated size of 'top' plus one additional chunk to accomodate
# additional writes to 'top' during the live merge operation.
"""
Return True, new_size if base needs to be extended, otherwise False,
None.
The returned size is always correct for internal merge, but for active
layer merge it is only an estimate. In the worst case the merge may
fail with ENOSPC and the user will have to retry the merge.
"""
if not drive.chunked or base_info['format'] != 'COW':
log.debug("Base volume does not support extending "
"job=%s drive=%s volume=%s",
job.id, job.drive, job.base)
return False
return False, None

current_size = int(base_info["apparentsize"])
max_size = drive.getMaxVolumeSize(int(base_info["capacity"]))

# Size can be bigger than maximum value due to rounding to LVM extent
# size (128 MiB).
if int(base_info["apparentsize"]) >= max_size:
if current_size >= max_size:
log.debug("Base volume is already extended to maximum size "
"job=%s drive=%s volume=%s size=%s",
job.id, job.drive, job.base, base_info["apparentsize"])
return False
return False, None

# Measure the required base size for this merge, based on the current
# allocation in top. This measurement is racy, since the guest may
# write data to top after the measure.
measure = self._vm.measure(
drive.domainID,
drive.imageID,
job.top,
base_info["format"],
baseID=job.base)

# Consider the required size and possible bitmaps. Since some of the
# bitmaps already exist in base, this allocates more space than needed,
# but bitmaps are small so this is good enough.
required_size = measure["required"] + measure.get("bitmaps", 0)

if job.active_commit:
# For active layer merge, add one chunk of free space to avoid
# pausing during extend, or right after extend completes. This is
# a heuristic, we cannot prevent pausing during merge without
# monitoring the volume.
chunk_size = config.getint(
"irs", "volume_utilization_chunk_mb") * MiB
if required_size + chunk_size > current_size:
return True, required_size + chunk_size
else:
# For internal merge, we don't need any free space.
if required_size > current_size:
return True, required_size

return True
return False, None

def _start_extend(self, drive, job):
def _start_extend(self, drive, job, new_size):
"""
Start extend operation for the base volume.
Expand All @@ -458,12 +488,6 @@ def _start_extend(self, drive, job):
job.extend["started"] = time.monotonic()
self._persist_jobs()

# Curent extend API extend to the next chunk based on current size. We
# need to lie about the current size to get a bigger allocation.
# TODO: Change extend_volume so client can request a specific size.
max_alloc = job.extend["base_size"] + job.extend["top_size"]
capacity = job.extend["capacity"]

log.info("Starting extend %s/%s for job=%s drive=%s volume=%s",
job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id,
drive.name, job.base)
Expand All @@ -473,43 +497,8 @@ def _start_extend(self, drive, job):
job_id=job.id,
attempt=job.extend["attempt"])

# New size includes one chunk of free space.
new_size = drive.getNextVolumeSize(max_alloc, capacity)
self._vm.extend_volume(drive, job.base, new_size, callback=callback)

def _retry_extend(self, job):
"""
Retry extend after a timeout of extend error.
"""
assert job.extend["attempt"] < self.EXTEND_ATTEMPTS

try:
drive = self._vm.findDriveByUUIDs(job.disk)
except LookupError:
log.error(
"Cannot find drive %s, untracking job %s",
job.disk, job.id)
self._untrack_job(job.id)
return

# Use current top volume size for this extend retry, in case the top
# volume was extended during the merge.

try:
top_size = self._vm.getVolumeSize(
drive.domainID, drive.poolID, drive.imageID, job.top)
except errors.StorageUnavailableError as e:
log.exception(
"Cannot get top %s size, untracking job %s: %s",
job.top, job.id, e)
self._untrack_job(job.id)
return

job.extend["top_size"] = top_size.apparentsize
job.extend["attempt"] += 1

self._start_extend(drive, job)

def _extend_completed(self, job_id, attempt, error=None):
"""
Called when extend completed from mailbox worker thread.
Expand Down Expand Up @@ -661,21 +650,58 @@ def _update_extend(self, job):
"""
duration = time.monotonic() - job.extend["started"]

if duration > self.EXTEND_TIMEOUT:
if job.extend["attempt"] < self.EXTEND_ATTEMPTS:
log.warning(
"Extend %s/%s timeout for job %s, retrying",
job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id)
self._retry_extend(job)
else:
log.error(
"Extend %s/%s timeout for job %s, untracking job",
job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id)
self._untrack_job(job.id)
else:
if duration < self.EXTEND_TIMEOUT:
log.debug(
"Extend %s/%s for job %s running for %d seconds",
job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id, duration)
return

if job.extend["attempt"] >= self.EXTEND_ATTEMPTS:
log.error(
"Extend %s/%s timeout for job %s, untracking job",
job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id)
self._untrack_job(job.id)
return

try:
drive = self._vm.findDriveByUUIDs(job.disk)
except LookupError:
log.error(
"Cannot find drive %s, untracking job %s",
job.disk, job.id)
self._untrack_job(job.id)
return

try:
base_info = self._vm.getVolumeInfo(
drive.domainID, drive.poolID, drive.imageID, job.base)
except errors.StorageUnavailableError as e:
log.exception(
"Cannot get base %s info, untracking job %s: %s",
job.top, job.id, e)
self._untrack_job(job.id)
return

# The previous extend failed or timed out. We have 2 cases:
# - The previous attempt finished at the same time of the update, and
# extend is not needed - unlikely but possible.
# - The previous attempt failed or timed out - most likely. Since the
# guest is writing to top, we need to measure again and start a new
# extend with the current required size.
needs_extend, new_size = self._base_needs_extend(
drive, job, base_info)

if needs_extend:
log.warning(
"Extend %s/%s timeout for job %s, retrying",
job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id)
job.extend["attempt"] += 1
self._start_extend(drive, job, new_size)
else:
log.info("Extend %s/%s completed for job %s, starting commit",
job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id)
job.extend = None
self._start_commit(drive, job)

def _update_commit(self, job):
"""
Expand Down

0 comments on commit 8e7fe47

Please sign in to comment.