diff --git a/coriolis/conductor/rpc/server.py b/coriolis/conductor/rpc/server.py index e1795e7b..b4d29037 100644 --- a/coriolis/conductor/rpc/server.py +++ b/coriolis/conductor/rpc/server.py @@ -2722,9 +2722,13 @@ def _start_task(task): parent_task_statuses) task_statuses[task.id] = _start_task(task) # start on-error tasks only if at least one non-error - # parent task has completed successfully: - elif constants.TASK_STATUS_COMPLETED in ( - non_error_parents.values()): + # parent task has completed (COMPLETED or + # CANCELED_AFTER_COMPLETION both mean the task + # finished its work and may have created resources + # that need cleaning up): + elif any( + s in constants.CLEANUP_TASK_TRIGGER_STATUSES + for s in non_error_parents.values()): LOG.info( "Starting on-error task '%s' as all parent " "tasks have been finalized and at least one " diff --git a/coriolis/constants.py b/coriolis/constants.py index d6312c8a..5624f130 100644 --- a/coriolis/constants.py +++ b/coriolis/constants.py @@ -75,6 +75,16 @@ TASK_STATUS_FAILED_TO_CANCEL ] +# Statuses that indicate a non-error parent task completed its work and +# may have created resources that on-error cleanup tasks must destroy. +# CANCELED_AFTER_COMPLETION means the task ran to completion (result saved, +# resources created) but was marked cancelled because a cancellation was +# already in flight when the completion arrived. +CLEANUP_TASK_TRIGGER_STATUSES = [ + TASK_STATUS_COMPLETED, + TASK_STATUS_CANCELED_AFTER_COMPLETION, +] + FINALIZED_TASK_STATUSES = [ TASK_STATUS_COMPLETED, TASK_STATUS_ERROR, diff --git a/coriolis/tests/integration/test_failure_recovery.py b/coriolis/tests/integration/test_failure_recovery.py index cf186a0f..f995d25e 100644 --- a/coriolis/tests/integration/test_failure_recovery.py +++ b/coriolis/tests/integration/test_failure_recovery.py @@ -4,32 +4,123 @@ """ Integration test for failure handling and cleanup. -Patches the active import provider's deploy_replica_target_resources to raise -an exception, triggers a transfer execution, and asserts that: - 1. The execution reaches ERROR status. - 2. Cleanup tasks (delete_replica_source_resources) ran so the replicator - process is no longer alive. +Tests two symmetrical error-path scenarios: + +- deploy_replica_target_resources is delayed then raises -> source task + completes first (creating real source resources) -> execution errors -> + source cleanup (DELETE_TRANSFER_SOURCE_RESOURCES) must run, + source_resources zeroed out. + +- deploy_replica_source_resources is delayed then raises -> target task + completes first (creating real target resources) -> execution errors -> + destination cleanup (DELETE_TRANSFER_TARGET_RESOURCES) must run, + target_resources zeroed out. Must be run as root; requires the scsi_debug kernel module. """ +import time from unittest import mock +from coriolis import constants +from coriolis.db import api as db_api from coriolis.tests.integration import base class TransferFailureIntegrationTest(base.ReplicaIntegrationTestBase): """Error path and resource cleanup.""" - def test_error_status_on_provider_failure(self): - """Execution reaches ERROR when target resource deployment fails.""" + def _assertResourcesCleaned(self, execution_id, task_type, resource_key): + ctxt = self._get_db_context() + execution = db_api.get_tasks_execution(ctxt, execution_id) + + cleanup_task = next( + (t for t in execution.tasks if t.task_type == task_type), + None, + ) + self.assertIsNotNone( + cleanup_task, + "%s task not found in execution %s" % (task_type, execution_id), + ) + + self.assertIn( + cleanup_task.status, + constants.CLEANUP_TASK_TRIGGER_STATUSES, + "%s task did not complete (status: %s); %s may have leaked" + % (task_type, cleanup_task.status, resource_key), + ) + + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + for instance in execution.action.instances: + self.assertIsNone( + action.info.get(instance, {}).get(resource_key), + "%s not cleared for instance '%s' after cleanup task completed" + % (resource_key, instance), + ) + + def assertSourceResourcesCleaned(self, execution_id): + self._assertResourcesCleaned( + execution_id, + constants.TASK_TYPE_DELETE_TRANSFER_SOURCE_RESOURCES, + "source_resources", + ) + + def assertTargetResourcesCleaned(self, execution_id): + self._assertResourcesCleaned( + execution_id, + constants.TASK_TYPE_DELETE_TRANSFER_TARGET_RESOURCES, + "target_resources", + ) + + def test_source_resources_cleaned_on_target_failure(self): + """Source cleanup runs when target resource deployment fails. + + deploy_replica_target_resources is delayed so that + deploy_replica_source_resources has time to finish (creating real + source resources) before the target task eventually raises. The + conductor must schedule DELETE_TRANSFER_SOURCE_RESOURCES and zero out + source_resources in the action info. + """ injected_error = Exception("injected target resource failure") + def _slow_then_fail(self_provider, *args, **kwargs): + time.sleep(15) + raise injected_error + with mock.patch.object( self._harness.imp_provider_class, "deploy_replica_target_resources", - side_effect=injected_error, + _slow_then_fail, ): execution = self._client.transfer_executions.create( self._transfer.id, shutdown_instances=False) self.assertExecutionErrored(execution.id) + + self.assertSourceResourcesCleaned(execution.id) + + def test_target_resources_cleaned_on_source_failure(self): + """Target cleanup runs when source resource deployment fails. + + deploy_replica_source_resources is delayed so that + deploy_replica_target_resources has time to finish (creating real + target resources) before the source task eventually raises. The + conductor must schedule DELETE_TRANSFER_TARGET_RESOURCES and zero out + target_resources in the action info. + """ + injected_error = Exception("injected source resource failure") + + def _slow_then_fail(self_provider, *args, **kwargs): + time.sleep(15) + raise injected_error + + with mock.patch.object( + self._harness.exp_provider_class, + "deploy_replica_source_resources", + _slow_then_fail, + ): + execution = self._client.transfer_executions.create( + self._transfer.id, shutdown_instances=False) + self.assertExecutionErrored(execution.id) + + self.assertTargetResourcesCleaned(execution.id)