Skip to content

Commit

Permalink
Use 'mtime' as default input for modification_date
Browse files Browse the repository at this point in the history
-reduces need to modify code every time a new fsspec implementation is added
- `mtime` is idiomatic in *nix file systems.
  • Loading branch information
deanja committed Feb 15, 2024
1 parent c16a9dc commit ea0557f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 23 deletions.
57 changes: 42 additions & 15 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,20 @@ class FileItem(TypedDict, total=False):
file_content: Optional[bytes]


# Map of protocol to mtime resolver
# we only need to support a small finite set of protocols
MTIME_DISPATCH = {
"s3": lambda f: ensure_pendulum_datetime(f["LastModified"]),
"adl": lambda f: ensure_pendulum_datetime(f["LastModified"]),
"az": lambda f: ensure_pendulum_datetime(f["last_modified"]),
"gcs": lambda f: ensure_pendulum_datetime(f["updated"]),
"file": lambda f: ensure_pendulum_datetime(f["mtime"]),
"memory": lambda f: ensure_pendulum_datetime(f["created"]),
"gdrive": lambda f: ensure_pendulum_datetime(f["modifiedTime"]),
DEFAULT_MTIME_FIELD_NAME = "mtime"
MTIME_FIELD_NAMES = {
"file": "mtime",
"s3": "LastModified",
"adl": "LastModified",
"az": "last_modified",
"gcs": "updated",
"memory": "created",
"gdrive": "modifiedTime",
}
# Support aliases
MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"]
MTIME_DISPATCH["s3a"] = MTIME_DISPATCH["s3"]
MTIME_DISPATCH["abfs"] = MTIME_DISPATCH["az"]
MTIME_FIELD_NAMES["gs"] = MTIME_FIELD_NAMES["gcs"]
MTIME_FIELD_NAMES["s3a"] = MTIME_FIELD_NAMES["s3"]
MTIME_FIELD_NAMES["abfs"] = MTIME_FIELD_NAMES["az"]

# Map of protocol to a filesystem type
CREDENTIALS_DISPATCH: Dict[str, Callable[[FilesystemConfiguration], DictStrAny]] = {
Expand Down Expand Up @@ -110,7 +109,7 @@ def register_implementation_in_fsspec(protocol: str) -> None:
if protocol in known_implementations:
return

if not protocol in CUSTOM_IMPLEMENTATIONS:
if protocol not in CUSTOM_IMPLEMENTATIONS:
raise ValueError(
f"Unknown protocol: '{protocol}' is not an fsspec known "
"implementations nor a dlt custom implementations."
Expand Down Expand Up @@ -304,6 +303,32 @@ def guess_mime_type(file_name: str) -> Sequence[str]:
return type_


def extract_mtime(file_metadata: Dict[str, Any], protocol: str = None) -> pendulum.DateTime:
"""Extract the modification time from file listing metadata.
Args:
file_metadata (Dict[str, Any]): The file metadata.
protocol (str) [Optional]: The protocol. If not provided, None or not a known protocol,
then default field name `mtime` is tried. `mtime` is used for the "file" fsspec
implementation and our custom fsspec implementations.
Returns:
pendulum.DateTime: The modification time.
Raises:
KeyError: If the resolved field name is not found in the metadata. Current dlt use-cases
depend on a modified date. For example, transactional files, incremental destination
loading.
"""
field_name = MTIME_FIELD_NAMES.get(protocol, DEFAULT_MTIME_FIELD_NAME)
try:
return ensure_pendulum_datetime(file_metadata[field_name])
except KeyError:
if protocol not in MTIME_FIELD_NAMES:
extra_message = " {DEFAULT_MTIME_FIELD_NAME} was used by default."
raise KeyError(f"`{field_name}` not found in metadata.{extra_message}")


def glob_files(
fs_client: AbstractFileSystem, bucket_url: str, file_glob: str = "**"
) -> Iterator[FileItem]:
Expand Down Expand Up @@ -350,12 +375,14 @@ def glob_files(
path=posixpath.join(bucket_url_parsed.path, file_name)
).geturl()

modification_date = extract_mtime(md, bucket_url_parsed.scheme)

mime_type, encoding = guess_mime_type(file_name)
yield FileItem(
file_name=file_name,
file_url=file_url,
mime_type=mime_type,
encoding=encoding,
modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md),
modification_date=modification_date,
size_in_bytes=int(md["size"]),
)
9 changes: 4 additions & 5 deletions dlt/common/storages/transactional_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import fsspec

from dlt.common.pendulum import pendulum, timedelta
from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH
from dlt.common.storages.fsspec_filesystem import extract_mtime


def lock_id(k: int = 4) -> str:
Expand Down Expand Up @@ -56,16 +56,15 @@ def __init__(self, path: str, fs: fsspec.AbstractFileSystem) -> None:
path: The path to lock.
fs: The fsspec file system.
"""
proto = fs.protocol[0] if isinstance(fs.protocol, (list, tuple)) else fs.protocol
self.extract_mtime = MTIME_DISPATCH.get(proto, MTIME_DISPATCH["file"])
self._proto = fs.protocol[0] if isinstance(fs.protocol, (list, tuple)) else fs.protocol

parsed_path = Path(path)
if not parsed_path.is_absolute():
raise ValueError(
f"{path} is not absolute. Please pass only absolute paths to TransactionalFile"
)
self.path = path
if proto == "file":
if self._proto == "file":
# standardize path separator to POSIX. fsspec always uses POSIX. Windows may use either.
self.path = parsed_path.as_posix()

Expand Down Expand Up @@ -103,7 +102,7 @@ def _sync_locks(self) -> t.List[str]:
if not name.startswith(self.lock_prefix):
continue
# Purge stale locks
mtime = self.extract_mtime(lock)
mtime = extract_mtime(lock, self._proto)
if now - mtime > timedelta(seconds=TransactionalFile.LOCK_TTL_SECONDS):
try: # Janitors can race, so we ignore errors
self._fs.rm(name)
Expand Down
2 changes: 1 addition & 1 deletion tests/common/storages/implementations/test_gitpythonfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_ls_file_details(repo_fixture: Iterator[Any]) -> None:
assert isinstance(
details["mode"], str
), "Should be a string representation of octal, without the 0o prefix."
assert isinstance(details["committed_date"], int)
assert isinstance(details["mtime"], int)


def test_git_refs(repo_fixture: Iterator[Any]) -> None:
Expand Down
4 changes: 2 additions & 2 deletions tests/load/filesystem/test_filesystem_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dlt.common.storages import fsspec_from_config, FilesystemConfiguration
from dlt.common.storages.fsspec_filesystem import (
register_implementation_in_fsspec,
MTIME_DISPATCH,
extract_mtime,
glob_files,
)
from dlt.common.utils import uniq_id
Expand Down Expand Up @@ -96,7 +96,7 @@ def check_file_exists():
def check_file_changed():
details = filesystem.info(file_url)
assert details["size"] == 11
assert (MTIME_DISPATCH[config.protocol](details) - now).seconds < 60
assert (extract_mtime(details, config.protocol) - now).seconds < 60

bucket_url = os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"]
config = get_config()
Expand Down

0 comments on commit ea0557f

Please sign in to comment.