Skip to content

Commit

Permalink
Improvements to airflow.io (#35478)
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Nov 7, 2023
1 parent ca27772 commit 865b3a5
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 138 deletions.
5 changes: 3 additions & 2 deletions airflow/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ def get_fs(scheme: str, conn_id: str | None = None) -> AbstractFileSystem:
"""
filesystems = _register_filesystems()
try:
return filesystems[scheme](conn_id)
fs = filesystems[scheme]
except KeyError:
raise ValueError(f"No filesystem registered for scheme {scheme}")
raise ValueError(f"No filesystem registered for scheme {scheme}") from None
return fs(conn_id)


def has_fs(scheme: str) -> bool:
Expand Down
30 changes: 14 additions & 16 deletions airflow/io/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ObjectStore:
conn_id: str | None
protocol: str

_fs: AbstractFileSystem = None
_fs: AbstractFileSystem | None = None

def __init__(self, protocol: str, conn_id: str | None, fs: AbstractFileSystem | None = None):
self.conn_id = conn_id
Expand All @@ -46,8 +46,7 @@ def __str__(self):

@property
def fs(self) -> AbstractFileSystem:
self._connect()
return self._fs
return self._connect()

@property
def fsid(self) -> str:
Expand All @@ -59,9 +58,9 @@ def fsid(self) -> str:
:return: deterministic the filesystem ID
"""
self._connect()
fs = self._connect()
try:
return self._fs.fsid
return fs.fsid
except NotImplementedError:
return f"{self.fs.protocol}-{self.conn_id or 'env'}"

Expand All @@ -82,21 +81,21 @@ def deserialize(cls, data: dict[str, str], version: int):

alias = f"{protocol}-{conn_id}" if conn_id else protocol

if store := _STORE_CACHE.get(alias, None):
if store := _STORE_CACHE.get(alias):
return store

if not has_fs(protocol):
if "filesystem" in data and data["filesystem"]:
raise ValueError(
f"No attached filesystem found for {data['filesystem']} with "
f"protocol {data['protocol']}. Please use attach() for this protocol and filesystem."
)
if not has_fs(protocol) and "filesystem" in data and data["filesystem"]:
raise ValueError(
f"No attached filesystem found for {data['filesystem']} with "
f"protocol {data['protocol']}. Please use attach() for this protocol and filesystem."
)

return attach(protocol=protocol, conn_id=conn_id)

def _connect(self):
def _connect(self) -> AbstractFileSystem:
if self._fs is None:
self._fs = get_fs(self.protocol, self.conn_id)
return self._fs

def __eq__(self, other):
return isinstance(other, type(self)) and other.conn_id == self.conn_id and other._fs == self._fs
Expand All @@ -122,7 +121,7 @@ def attach(
:param fs: the filesystem type to use to connect to the filesystem
"""
if alias:
if store := _STORE_CACHE.get(alias, None):
if store := _STORE_CACHE.get(alias):
return store
elif not protocol:
raise ValueError(f"No registered store with alias: {alias}")
Expand All @@ -135,7 +134,6 @@ def attach(
if store := _STORE_CACHE.get(alias, None):
return store

store = ObjectStore(protocol=protocol, conn_id=conn_id, fs=fs)
_STORE_CACHE[alias] = store
_STORE_CACHE[alias] = store = ObjectStore(protocol=protocol, conn_id=conn_id, fs=fs)

return store

0 comments on commit 865b3a5

Please sign in to comment.