Skip to content

Commit

Permalink
Acquire leases when using ADLS (#7410)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Apr 19, 2022
1 parent e781ce6 commit 98cdc50
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import pickle
from contextlib import contextmanager

from azure.storage.filedatalake import DataLakeLeaseClient
from dagster_azure.adls2.utils import ResourceNotFoundError

from dagster import Field, IOManager, StringSource, check, io_manager
Expand Down Expand Up @@ -38,8 +40,10 @@ def _rm_object(self, key):
check.str_param(key, "key")
check.param_invariant(len(key) > 0, "key")

# This operates recursively already so is nice and simple.
self.file_system_client.delete_file(key)
file_client = self.file_system_client.get_file_client(key)
with self._acquire_lease(file_client, is_rm=True) as lease:
# This operates recursively already so is nice and simple.
file_client.delete_file(lease=lease)

def _has_object(self, key):
check.str_param(key, "key")
Expand All @@ -62,6 +66,17 @@ def _uri_for_key(self, key, protocol=None):
key=key,
)

@contextmanager
def _acquire_lease(self, client, is_rm=False):
lease_client = DataLakeLeaseClient(client=client)
try:
lease_client.acquire(lease_duration=self.lease_duration)
yield lease_client.id
finally:
# cannot release a lease on a file that no longer exists, so need to check
if not is_rm:
lease_client.release()

def load_input(self, context):
key = self._get_path(context.upstream_output)
context.log.debug(f"Loading ADLS2 object from: {self._uri_for_key(key)}")
Expand All @@ -82,7 +97,7 @@ def handle_output(self, context, obj):
pickled_obj = pickle.dumps(obj, PICKLE_PROTOCOL)

file = self.file_system_client.create_file(key)
with file.acquire_lease(self.lease_duration) as lease:
with self._acquire_lease(file) as lease:
file.upload_data(pickled_obj, lease=lease, overwrite=True)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from uuid import uuid4

import pytest
from dagster_azure.adls2 import create_adls2_client
from dagster_azure.adls2.io_manager import (
Expand All @@ -10,6 +12,8 @@

from dagster import (
AssetGroup,
AssetIn,
AssetKey,
DagsterInstance,
DynamicOutput,
DynamicOutputDefinition,
Expand Down Expand Up @@ -74,7 +78,6 @@ def basic_external_plan_execution():


@pytest.mark.nettest
@pytest.mark.skip("https://github.com/dagster-io/dagster/issues/7421")
def test_adls2_pickle_io_manager_execution(storage_account, file_system, credential):
job = define_inty_job()

Expand Down Expand Up @@ -147,13 +150,16 @@ def test_adls2_pickle_io_manager_execution(storage_account, file_system, credent
assert io_manager.load_input(context) == 2


@pytest.mark.skip("https://github.com/dagster-io/dagster/issues/7421")
def test_asset_io_manager(storage_account, file_system, credential):
@asset
_id = f"{uuid4()}".replace("-", "")

@asset(name=f"upstream_{_id}")
def upstream():
return 2

@asset
@asset(
name=f"downstream_{_id}", ins={"upstream": AssetIn(asset_key=AssetKey([f"upstream_{_id}"]))}
)
def downstream(upstream):
assert upstream == 2
return 1 + upstream
Expand Down

0 comments on commit 98cdc50

Please sign in to comment.