Skip to content

Commit

Permalink
Fix issue where azure IO manager was sometimes failing on recursive d…
Browse files Browse the repository at this point in the history
…eletes (#7956)

Summary:
I'm still a little unclear on when an IO manager would actually be expected to delete a folder rather than a file (as evidenced by my janky test which was the best way I could find to make it try), but this uses an undocumented(!) recursive argument to the Azure delete_file function to ensure that it can delete folders.

Test Plan: New test case passes, failed before
  • Loading branch information
gibsondan committed May 18, 2022
1 parent 0d90a3c commit f7b14c4
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ def _rm_object(self, key):

file_client = self.file_system_client.get_file_client(key)
with self._acquire_lease(file_client, is_rm=True) as lease:
# This operates recursively already so is nice and simple.
file_client.delete_file(lease=lease)
file_client.delete_file(lease=lease, recursive=True)

def _has_object(self, key):
check.str_param(key, "key")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,69 @@ def basic_external_plan_execution():
)


@pytest.mark.nettest
def test_adls2_pickle_io_manager_deletes_recursively(storage_account, file_system, credential):
job = define_inty_job()

run_config = {
"resources": {
"io_manager": {"config": {"adls2_file_system": file_system}},
"adls2": {
"config": {"storage_account": storage_account, "credential": {"key": credential}}
},
}
}

run_id = make_new_run_id()

resolved_run_config = ResolvedRunConfig.build(job, run_config=run_config)
execution_plan = ExecutionPlan.build(InMemoryPipeline(job), resolved_run_config)

assert execution_plan.get_step_by_key("return_one")

step_keys = ["return_one"]
instance = DagsterInstance.ephemeral()
pipeline_run = PipelineRun(pipeline_name=job.name, run_id=run_id, run_config=run_config)

return_one_step_events = list(
execute_plan(
execution_plan.build_subset_plan(step_keys, job, resolved_run_config),
pipeline=InMemoryPipeline(job),
run_config=run_config,
pipeline_run=pipeline_run,
instance=instance,
)
)

assert get_step_output(return_one_step_events, "return_one")
context = build_input_context(
upstream_output=build_output_context(
step_key="return_one",
name="result",
run_id=run_id,
)
)

io_manager = PickledObjectADLS2IOManager(
file_system=file_system,
adls2_client=create_adls2_client(storage_account, credential),
blob_client=create_blob_client(storage_account, credential),
lease_client_constructor=DataLakeLeaseClient,
)
assert io_manager.load_input(context) == 1

# Verify that when the IO manager needs to delete recursively, it is able to do so,
# by removing the whole path for the run
recursive_path = "/".join(
[
io_manager.prefix,
"storage",
run_id,
]
)
io_manager._rm_object(recursive_path) # pylint: disable=protected-access


@pytest.mark.nettest
def test_adls2_pickle_io_manager_execution(storage_account, file_system, credential):
job = define_inty_job()
Expand Down

0 comments on commit f7b14c4

Please sign in to comment.