From 8e7fe47256f7f4e3ec8a1ff9e0f8f565eb3c9cbb Mon Sep 17 00:00:00 2001 From: Nir Soffer Date: Wed, 18 May 2022 19:35:20 +0300 Subject: [PATCH] livemerge: Fix over extend When extending the base volume before merge, we use a dumb calculation extending the base volume by top_size + chunk_size. This allocate way too much space which is typically not needed. For active layer merge, there is no way to reduce the volume after the merge without shutting down the VM. The result is growing the active volume on every merge, until it consumes the maximum size. Fix the issue by measuring the sub-chain from top to base before the extend. This give the exact size needed to commit the top volume into the base volume, including the size required for the bitmaps that may be in the top and base volume. In the case of active layer merge, this measurement is a heuristic, since the guest can write data during the measurement, or later during the merge. We add one chunk of free space to minimize the chance of pausing a VM during merge. The only way to prevent pausing during merge is to monitor base volume block threshold during the merge. This was not possible in the past, but can be done with current libvirt, but vdsm thin provisioning code is not ready for this yet. For internal merge, measuring is exact, and there is no need to leave free space in the base volume since the top volume is read only. Fixes: #188 Related-to: https://bugzilla.redhat.com/1993235 Signed-off-by: Nir Soffer --- lib/vdsm/virt/livemerge.py | 174 +++++++++++++++++++-------------- tests/virt/livemerge_test.py | 184 +++++++++++++++++++++++------------ 2 files changed, 222 insertions(+), 136 deletions(-) diff --git a/lib/vdsm/virt/livemerge.py b/lib/vdsm/virt/livemerge.py index 1ca5926230..8e21c74265 100644 --- a/lib/vdsm/virt/livemerge.py +++ b/lib/vdsm/virt/livemerge.py @@ -29,6 +29,8 @@ from vdsm.common import concurrent from vdsm.common import exception from vdsm.common import logutils +from vdsm.common.config import config +from vdsm.common.units import MiB from vdsm.virt import errors from vdsm.virt import virdomain @@ -302,14 +304,12 @@ def merge(self, driveSpec, base, top, bandwidth, job_id): except JobExistsError as e: raise exception.MergeFailed(str(e), job=job.id) - if self._base_needs_extend(drive, job, base_info): - job.extend = { - "attempt": 1, - "base_size": int(base_info["apparentsize"]), - "top_size": int(top_info["apparentsize"]), - "capacity": int(top_info["capacity"]), - } - self._start_extend(drive, job) + needs_extend, new_size = self._base_needs_extend( + drive, job, base_info) + + if needs_extend: + job.extend = {"attempt": 1} + self._start_extend(drive, job, new_size) else: self._start_commit(drive, job) @@ -420,33 +420,63 @@ def _start_commit(self, drive, job): raise exception.MergeFailed(str(e), job=job.id) def _base_needs_extend(self, drive, job, base_info): - # blockCommit will cause data to be written into the base volume. - # Perform an initial extension to ensure there is enough space to - # copy all the required data. Normally we'd use monitoring to extend - # the volume on-demand but internal watermark information is not being - # reported by libvirt so we must do the full extension up front. In - # the worst case, the allocated size of 'base' should be increased by - # the allocated size of 'top' plus one additional chunk to accomodate - # additional writes to 'top' during the live merge operation. + """ + Return True, new_size if base needs to be extended, otherwise False, + None. + + The returned size is always correct for internal merge, but for active + layer merge it is only an estimate. In the worst case the merge may + fail with ENOSPC and the user will have to retry the merge. + """ if not drive.chunked or base_info['format'] != 'COW': log.debug("Base volume does not support extending " "job=%s drive=%s volume=%s", job.id, job.drive, job.base) - return False + return False, None + current_size = int(base_info["apparentsize"]) max_size = drive.getMaxVolumeSize(int(base_info["capacity"])) # Size can be bigger than maximum value due to rounding to LVM extent # size (128 MiB). - if int(base_info["apparentsize"]) >= max_size: + if current_size >= max_size: log.debug("Base volume is already extended to maximum size " "job=%s drive=%s volume=%s size=%s", job.id, job.drive, job.base, base_info["apparentsize"]) - return False + return False, None + + # Measure the required base size for this merge, based on the current + # allocation in top. This measurement is racy, since the guest may + # write data to top after the measure. + measure = self._vm.measure( + drive.domainID, + drive.imageID, + job.top, + base_info["format"], + baseID=job.base) + + # Consider the required size and possible bitmaps. Since some of the + # bitmaps already exist in base, this allocates more space than needed, + # but bitmaps are small so this is good enough. + required_size = measure["required"] + measure.get("bitmaps", 0) + + if job.active_commit: + # For active layer merge, add one chunk of free space to avoid + # pausing during extend, or right after extend completes. This is + # a heuristic, we cannot prevent pausing during merge without + # monitoring the volume. + chunk_size = config.getint( + "irs", "volume_utilization_chunk_mb") * MiB + if required_size + chunk_size > current_size: + return True, required_size + chunk_size + else: + # For internal merge, we don't need any free space. + if required_size > current_size: + return True, required_size - return True + return False, None - def _start_extend(self, drive, job): + def _start_extend(self, drive, job, new_size): """ Start extend operation for the base volume. @@ -458,12 +488,6 @@ def _start_extend(self, drive, job): job.extend["started"] = time.monotonic() self._persist_jobs() - # Curent extend API extend to the next chunk based on current size. We - # need to lie about the current size to get a bigger allocation. - # TODO: Change extend_volume so client can request a specific size. - max_alloc = job.extend["base_size"] + job.extend["top_size"] - capacity = job.extend["capacity"] - log.info("Starting extend %s/%s for job=%s drive=%s volume=%s", job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id, drive.name, job.base) @@ -473,43 +497,8 @@ def _start_extend(self, drive, job): job_id=job.id, attempt=job.extend["attempt"]) - # New size includes one chunk of free space. - new_size = drive.getNextVolumeSize(max_alloc, capacity) self._vm.extend_volume(drive, job.base, new_size, callback=callback) - def _retry_extend(self, job): - """ - Retry extend after a timeout of extend error. - """ - assert job.extend["attempt"] < self.EXTEND_ATTEMPTS - - try: - drive = self._vm.findDriveByUUIDs(job.disk) - except LookupError: - log.error( - "Cannot find drive %s, untracking job %s", - job.disk, job.id) - self._untrack_job(job.id) - return - - # Use current top volume size for this extend retry, in case the top - # volume was extended during the merge. - - try: - top_size = self._vm.getVolumeSize( - drive.domainID, drive.poolID, drive.imageID, job.top) - except errors.StorageUnavailableError as e: - log.exception( - "Cannot get top %s size, untracking job %s: %s", - job.top, job.id, e) - self._untrack_job(job.id) - return - - job.extend["top_size"] = top_size.apparentsize - job.extend["attempt"] += 1 - - self._start_extend(drive, job) - def _extend_completed(self, job_id, attempt, error=None): """ Called when extend completed from mailbox worker thread. @@ -661,21 +650,58 @@ def _update_extend(self, job): """ duration = time.monotonic() - job.extend["started"] - if duration > self.EXTEND_TIMEOUT: - if job.extend["attempt"] < self.EXTEND_ATTEMPTS: - log.warning( - "Extend %s/%s timeout for job %s, retrying", - job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id) - self._retry_extend(job) - else: - log.error( - "Extend %s/%s timeout for job %s, untracking job", - job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id) - self._untrack_job(job.id) - else: + if duration < self.EXTEND_TIMEOUT: log.debug( "Extend %s/%s for job %s running for %d seconds", job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id, duration) + return + + if job.extend["attempt"] >= self.EXTEND_ATTEMPTS: + log.error( + "Extend %s/%s timeout for job %s, untracking job", + job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id) + self._untrack_job(job.id) + return + + try: + drive = self._vm.findDriveByUUIDs(job.disk) + except LookupError: + log.error( + "Cannot find drive %s, untracking job %s", + job.disk, job.id) + self._untrack_job(job.id) + return + + try: + base_info = self._vm.getVolumeInfo( + drive.domainID, drive.poolID, drive.imageID, job.base) + except errors.StorageUnavailableError as e: + log.exception( + "Cannot get base %s info, untracking job %s: %s", + job.top, job.id, e) + self._untrack_job(job.id) + return + + # The previous extend failed or timed out. We have 2 cases: + # - The previous attempt finished at the same time of the update, and + # extend is not needed - unlikely but possible. + # - The previous attempt failed or timed out - most likely. Since the + # guest is writing to top, we need to measure again and start a new + # extend with the current required size. + needs_extend, new_size = self._base_needs_extend( + drive, job, base_info) + + if needs_extend: + log.warning( + "Extend %s/%s timeout for job %s, retrying", + job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id) + job.extend["attempt"] += 1 + self._start_extend(drive, job, new_size) + else: + log.info("Extend %s/%s completed for job %s, starting commit", + job.extend["attempt"], self.EXTEND_ATTEMPTS, job.id) + job.extend = None + self._start_commit(drive, job) def _update_commit(self, job): """ diff --git a/tests/virt/livemerge_test.py b/tests/virt/livemerge_test.py index f53af599db..30357a7a2b 100644 --- a/tests/virt/livemerge_test.py +++ b/tests/virt/livemerge_test.py @@ -30,7 +30,8 @@ from vdsm.common import exception from vdsm.common import response from vdsm.common import xmlutils -from vdsm.common.units import GiB +from vdsm.common.config import config +from vdsm.common.units import GiB, MiB from vdsm.virt import metadata from vdsm.virt import migration @@ -56,6 +57,10 @@ log = logging.getLogger("test") +def chunk_size(): + return config.getint("irs", "volume_utilization_chunk_mb") * MiB + + class FakeTime(object): def __init__(self, value=0): @@ -302,13 +307,12 @@ def test_merger_dump_jobs(fake_time): vm = RunningVM(config) - top = vm.cif.irs.prepared_volumes[(sd_id, img_id, top_id)] - base = vm.cif.irs.prepared_volumes[(sd_id, img_id, base_id)] - # No jobs yet. assert vm._drive_merger.dump_jobs() == {} + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) + vm.merge(**merge_params) # Merge was started, new jobs should be in the dump. @@ -316,38 +320,28 @@ def test_merger_dump_jobs(fake_time): assert vm._drive_merger.dump_jobs() == { job_id: { "bandwidth": merge_params["bandwidth"], - "base": merge_params["baseVolUUID"], + "base": base_id, "disk": merge_params["driveSpec"], "drive": "sda", "state": Job.EXTEND, "extend": { "attempt": 1, - "base_size": base["apparentsize"], - "capacity": top["capacity"], "started": fake_time.time, - "top_size": top["apparentsize"], }, "pivot": None, "id": job_id, - "top": merge_params["topVolUUID"], + "top": top_id, } } def test_merger_load_jobs(fake_time): config = Config('active-merge') - sd_id = config.values["drive"]["domainID"] - img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] - top_id = merge_params["topVolUUID"] - base_id = merge_params["baseVolUUID"] vm = RunningVM(config) - top = vm.cif.irs.prepared_volumes[(sd_id, img_id, top_id)] - base = vm.cif.irs.prepared_volumes[(sd_id, img_id, base_id)] - assert vm._drive_merger.dump_jobs() == {} # Load jobs, simulating recovery flow. @@ -361,8 +355,6 @@ def test_merger_load_jobs(fake_time): "state": Job.EXTEND, "extend": { "attempt": 1, - "base_size": base["apparentsize"], - "top_size": top["apparentsize"], "started": fake_time.time, }, "pivot": None, @@ -399,6 +391,8 @@ def test_active_merge(monkeypatch): # No active block jobs before calling merge. assert vm.query_jobs() == {} + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) + vm.merge(**merge_params) # Merge persists the job with EXTEND state. @@ -426,12 +420,12 @@ def test_active_merge(monkeypatch): persisted_job = parse_jobs(vm)[job_id] assert persisted_job["state"] == Job.EXTEND - # We should extend to next volume size based on base and top currrent size, - # base volume capacity, and chunk size configuration. - top = vm.cif.irs.prepared_volumes[(sd_id, img_id, top_id)] + # We should extend to next volume size based on the required size for this + # merge, including bitmaps size. + measure = vm.cif.irs.measure_info[(top_id, base_id)] + required_size = measure["required"] + measure["bitmaps"] base = vm.cif.irs.prepared_volumes[(sd_id, img_id, base_id)] - max_alloc = base["apparentsize"] + top["apparentsize"] - new_size = drive.getNextVolumeSize(max_alloc, top["capacity"]) + new_size = drive.getNextVolumeSize(required_size, base["capacity"]) simulate_volume_extension(vm, base_id) @@ -602,6 +596,8 @@ def test_active_merge_pivot_failure(monkeypatch): # No active block jobs before calling merge. assert vm.query_jobs() == {} + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) + vm.merge(**merge_params) # Merge persists the job with EXTEND state. @@ -615,12 +611,12 @@ def test_active_merge_pivot_failure(monkeypatch): persisted_job = parse_jobs(vm)[job_id] assert persisted_job["state"] == Job.EXTEND - # We should extend to next volume size based on base and top currrent size, - # base volume capacity, and chunk size configuration. - top = vm.cif.irs.prepared_volumes[(sd_id, img_id, top_id)] + # We should extend to next volume size based on the required size for this + # merge, including bitmaps size. + measure = vm.cif.irs.measure_info[(top_id, base_id)] + required_size = measure["required"] + measure["bitmaps"] base = vm.cif.irs.prepared_volumes[(sd_id, img_id, base_id)] - max_alloc = base["apparentsize"] + top["apparentsize"] - new_size = drive.getNextVolumeSize(max_alloc, top["capacity"]) + new_size = drive.getNextVolumeSize(required_size, base["capacity"]) simulate_volume_extension(vm, base_id) @@ -694,12 +690,17 @@ def test_active_merge_storage_unavailable(monkeypatch): monkeypatch.setattr(CleanupThread, "WAIT_INTERVAL", 0.01) config = Config('active-merge') + sd_id = config.values["drive"]["domainID"] + img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] base_id = merge_params["baseVolUUID"] + top_id = merge_params["topVolUUID"] vm = RunningVM(config) + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) + with monkeypatch.context() as ctx: # Simulate unavailable storage. fail = lambda *args, **kwargs: response.error("unavail") @@ -761,6 +762,9 @@ def test_internal_merge(): assert vm.query_jobs() == {} + simulate_base_needs_extend( + vm, sd_id, img_id, top_id, base_id, active=False) + vm.merge(**merge_params) # Merge persists job in EXTEND state. @@ -896,28 +900,21 @@ def test_extend_timeout_recover(fake_time): img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] - base_id = merge_params["baseVolUUID"] top_id = merge_params["topVolUUID"] + base_id = merge_params["baseVolUUID"] vm = RunningVM(config) - vm.merge(**merge_params) + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) - # Find base and top sizes. They do not change during this test. - base = vm.cif.irs.prepared_volumes[(sd_id, img_id, base_id)] - top = vm.cif.irs.prepared_volumes[(sd_id, img_id, top_id)] - base_size = base["apparentsize"] - top_size = top["apparentsize"] + vm.merge(**merge_params) # Job starts at EXTEND state, tracking the first extend attempt. persisted_job = parse_jobs(vm)[job_id] assert persisted_job["state"] == Job.EXTEND assert persisted_job["extend"] == { "attempt": 1, - "base_size": base_size, - "capacity": top["capacity"], "started": fake_time.time, - "top_size": top_size, } # First extend request was sent. @@ -932,10 +929,7 @@ def test_extend_timeout_recover(fake_time): assert persisted_job["state"] == Job.EXTEND assert persisted_job["extend"] == { "attempt": 2, - "base_size": base_size, - "capacity": top["capacity"], "started": fake_time.time, - "top_size": top_size, } # Second extend request was sent. @@ -948,49 +942,44 @@ def test_extend_timeout_recover(fake_time): assert persisted_job["state"] == Job.COMMIT -def test_extend_use_original_base_size(fake_time): +def test_extend_update_switch_to_commit(fake_time): config = Config('active-merge') sd_id = config.values["drive"]["domainID"] img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] + top_id = merge_params["topVolUUID"] base_id = merge_params["baseVolUUID"] vm = RunningVM(config) + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) + vm.merge(**merge_params) # Find base volume size. base = vm.cif.irs.prepared_volumes[(sd_id, img_id, base_id)] - base_size = base["apparentsize"] # Job starts at EXTEND state, tracking the first extend attempt. persisted_job = parse_jobs(vm)[job_id] assert persisted_job["state"] == Job.EXTEND assert persisted_job["extend"]["attempt"] == 1 - assert persisted_job["extend"]["base_size"] == base_size # Just when the extend timed out, the base volume was extended, but the # extend callback was not called yet. - new_size1 = vm.cif.irs.extend_requests[0][2] - base["apparentsize"] = new_size1 + new_size = vm.cif.irs.extend_requests[0][2] + base["apparentsize"] = new_size # Simulate extend timeout, triggering the next extend attempt. fake_time.time += DriveMerger.EXTEND_TIMEOUT + 1 vm.query_jobs() - # Job tracks the second extend attempt, using the original base size. + # Job detected that the base volume was extended and move to COMMIT state. persisted_job = parse_jobs(vm)[job_id] - assert persisted_job["state"] == Job.EXTEND - assert persisted_job["extend"]["attempt"] == 2 - assert persisted_job["extend"]["base_size"] == base_size - - # Second extend request was sent same size. This extend does not have any - # effect since the volume was already extended. - new_size2 = vm.cif.irs.extend_requests[1][2] - assert new_size2 == new_size1 + assert persisted_job["state"] == Job.COMMIT - # The first extend callback is called now, moving the job to COMMIT state. + # Simulate the extend callback - it has no effect since the jobs already + # moved to new state. simulate_volume_extension(vm, base_id) persisted_job = parse_jobs(vm)[job_id] assert persisted_job["state"] == Job.COMMIT @@ -1003,9 +992,12 @@ def test_extend_use_current_top_size(fake_time): merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] top_id = merge_params["topVolUUID"] + base_id = merge_params["baseVolUUID"] vm = RunningVM(config) + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) + vm.merge(**merge_params) top = vm.cif.irs.prepared_volumes[(sd_id, img_id, top_id)] @@ -1018,8 +1010,10 @@ def test_extend_use_current_top_size(fake_time): # First extend request was sent based on base and top size. new_size1 = vm.cif.irs.extend_requests[0][2] - # While waiting for extend completion, top volume was extended. - top["apparentsize"] += GiB + # While waiting for extend completion, guest wrote 1 GiB of data, and the + # top wolume was extended. + top["apparentsize"] += chunk_size() + vm.cif.irs.measure_info[(top_id, base_id)]["required"] += GiB # Simulate extend timeout, triggering the next extend attempt. fake_time.time += DriveMerger.EXTEND_TIMEOUT + 1 @@ -1030,18 +1024,24 @@ def test_extend_use_current_top_size(fake_time): assert persisted_job["state"] == Job.EXTEND assert persisted_job["extend"]["attempt"] == 2 - # Second extend request was sent with bigger volume size. + # Second extend request considered the changed measure results. new_size2 = vm.cif.irs.extend_requests[1][2] assert new_size2 == new_size1 + GiB def test_extend_timeout_all(fake_time): config = Config('active-merge') + sd_id = config.values["drive"]["domainID"] + img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] + top_id = merge_params["topVolUUID"] + base_id = merge_params["baseVolUUID"] vm = RunningVM(config) + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) + vm.merge(**merge_params) # Job starts with EXTEND state, performing the first extend attempt. @@ -1080,11 +1080,17 @@ def test_extend_timeout_all(fake_time): def test_extend_error_all(fake_time): config = Config('active-merge') + sd_id = config.values["drive"]["domainID"] + img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] + top_id = merge_params["topVolUUID"] + base_id = merge_params["baseVolUUID"] vm = RunningVM(config) + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) + vm.merge(**merge_params) # Job starts with EXTEND state, performing the first extend attempt. @@ -1151,14 +1157,19 @@ def test_extend_skipped(): def test_active_merge_canceled_during_commit(): config = Config('active-merge') + sd_id = config.values["drive"]["domainID"] + img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] + top_id = merge_params["topVolUUID"] base_id = merge_params["baseVolUUID"] vm = RunningVM(config) assert vm.query_jobs() == {} + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) + vm.merge(**merge_params) simulate_volume_extension(vm, base_id) @@ -1211,14 +1222,19 @@ def test_active_merge_canceled_during_commit(): def test_active_merge_canceled_during_cleanup(monkeypatch): config = Config('active-merge') + sd_id = config.values["drive"]["domainID"] + img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] + top_id = merge_params["topVolUUID"] base_id = merge_params["baseVolUUID"] vm = RunningVM(config) assert vm.query_jobs() == {} + simulate_base_needs_extend(vm, sd_id, img_id, top_id, base_id) + vm.merge(**merge_params) simulate_volume_extension(vm, base_id) @@ -1271,13 +1287,21 @@ def fail(drive, flags=0): def test_block_job_info_error(monkeypatch): - config = Config("internal-merge") + config = Config('internal-merge') + sd_id = config.values["drive"]["domainID"] + img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] + top_id = merge_params["topVolUUID"] base_id = merge_params["baseVolUUID"] vm = RunningVM(config) + assert vm.query_jobs() == {} + + simulate_base_needs_extend( + vm, sd_id, img_id, top_id, base_id, active=False) + vm.merge(**merge_params) simulate_volume_extension(vm, base_id) @@ -1325,12 +1349,20 @@ def blockJobInfo(*args, **kwargs): def test_merge_commit_error(monkeypatch): - config = Config("internal-merge") + config = Config('internal-merge') + sd_id = config.values["drive"]["domainID"] + img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] + top_id = merge_params["topVolUUID"] base_id = merge_params["baseVolUUID"] vm = RunningVM(config) + assert vm.query_jobs() == {} + + simulate_base_needs_extend( + vm, sd_id, img_id, top_id, base_id, active=False) + def commit_error(*args, **kwargs): raise fake.libvirt_error( [libvirt.VIR_ERR_INTERNAL_ERROR], "Block commit failed") @@ -1351,12 +1383,21 @@ def commit_error(*args, **kwargs): def test_merge_job_already_exists(monkeypatch): - config = Config("internal-merge") + config = Config('internal-merge') + sd_id = config.values["drive"]["domainID"] + img_id = config.values["drive"]["imageID"] merge_params = config.values["merge_params"] job_id = merge_params["jobUUID"] + top_id = merge_params["topVolUUID"] + base_id = merge_params["baseVolUUID"] vm = RunningVM(config) + assert vm.query_jobs() == {} + + simulate_base_needs_extend( + vm, sd_id, img_id, top_id, base_id, active=False) + # Calling merge twice will fail the second call with same block # job already tracked from first call. vm.merge(**merge_params) @@ -1391,6 +1432,25 @@ def test_merge_base_too_small(monkeypatch): assert parse_jobs(vm) == {} +def simulate_base_needs_extend( + vm, sd_id, img_id, top_id, base_id, active=True): + base = vm.cif.irs.prepared_volumes[(sd_id, img_id, base_id)] + + # Set required to maximum value before base need extension. For active + # volume we extend if required + chunk > current. For internal volume we + # extend if required > current. + if active: + required = base["apparentsize"] - chunk_size() + else: + required = base["apparentsize"] + + # Since we report non-zero bitmaps value, base need to be extended. + vm.cif.irs.measure_info[(top_id, base_id)] = { + "required": required, + "bitmaps": 1 * MiB, + } + + def simulate_volume_extension(vm, vol_id): _, vol_info, new_size, callback = vm.cif.irs.extend_requests.pop(0)