Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve default patterns resolution #6704

Merged
merged 11 commits into from
Mar 15, 2024
5 changes: 3 additions & 2 deletions src/datasets/arrow_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
from fsspec.core import url_to_fs
from huggingface_hub import CommitInfo, CommitOperationAdd, CommitOperationDelete, DatasetCard, DatasetCardData, HfApi
from multiprocess import Pool
from tqdm.contrib.concurrent import thread_map
Expand Down Expand Up @@ -1504,7 +1505,7 @@ def save_to_disk(
num_shards = num_shards if num_shards is not None else num_proc

fs: fsspec.AbstractFileSystem
fs, _, _ = fsspec.get_fs_token_paths(dataset_path, storage_options=storage_options)
fs, _ = url_to_fs(dataset_path, **(storage_options or {}))

if not is_remote_filesystem(fs):
parent_cache_files_paths = {
Expand Down Expand Up @@ -1694,7 +1695,7 @@ def load_from_disk(
storage_options = fs.storage_options

fs: fsspec.AbstractFileSystem
fs, _, [dataset_path] = fsspec.get_fs_token_paths(dataset_path, storage_options=storage_options)
fs, dataset_path = url_to_fs(dataset_path, **(storage_options or {}))

dest_dataset_path = dataset_path
dataset_dict_json_path = posixpath.join(dest_dataset_path, config.DATASETDICT_JSON_FILENAME)
Expand Down
15 changes: 6 additions & 9 deletions src/datasets/arrow_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from fsspec.core import url_to_fs

from . import config
from .features import Features, Image, Value
Expand Down Expand Up @@ -327,14 +328,10 @@ def __init__(
self._disable_nullable = disable_nullable

if stream is None:
fs_token_paths = fsspec.get_fs_token_paths(path, storage_options=storage_options)
self._fs: fsspec.AbstractFileSystem = fs_token_paths[0]
self._path = (
fs_token_paths[2][0]
if not is_remote_filesystem(self._fs)
else self._fs.unstrip_protocol(fs_token_paths[2][0])
)
self.stream = self._fs.open(fs_token_paths[2][0], "wb")
fs, path = url_to_fs(path, **(storage_options or {}))
self._fs: fsspec.AbstractFileSystem = fs
self._path = path if not is_remote_filesystem(self._fs) else self._fs.unstrip_protocol(path)
self.stream = self._fs.open(path, "wb")
self._closable_stream = True
else:
self._fs = None
Expand Down Expand Up @@ -681,7 +678,7 @@ def finalize(self, metrics_query_result: dict):
"""

# Beam FileSystems require the system's path separator in the older versions
fs, _, [parquet_path] = fsspec.get_fs_token_paths(self._parquet_path)
fs, parquet_path = url_to_fs(self._parquet_path)
parquet_path = str(Path(parquet_path)) if not is_remote_filesystem(fs) else fs.unstrip_protocol(parquet_path)

shards = fs.glob(parquet_path + "*.parquet")
Expand Down
3 changes: 2 additions & 1 deletion src/datasets/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import fsspec
import pyarrow as pa
from fsspec.core import url_to_fs
from multiprocess import Pool
from tqdm.contrib.concurrent import thread_map

Expand Down Expand Up @@ -883,7 +884,7 @@ def download_and_prepare(

output_dir = output_dir if output_dir is not None else self._cache_dir
# output_dir can be a remote bucket on GCS or S3 (when using BeamBasedBuilder for distributed data processing)
fs, _, [output_dir] = fsspec.get_fs_token_paths(output_dir, storage_options=storage_options)
fs, output_dir = url_to_fs(output_dir, **(storage_options or {}))
self._fs = fs
self._output_dir = output_dir if not is_remote_filesystem(self._fs) else self._fs.unstrip_protocol(output_dir)

Expand Down
55 changes: 38 additions & 17 deletions src/datasets/data_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Callable, Dict, List, Optional, Set, Tuple, Union

import huggingface_hub
from fsspec import get_fs_token_paths
from fsspec.core import url_to_fs
from fsspec.implementations.http import HTTPFileSystem
from huggingface_hub import HfFileSystem
from packaging import version
Expand Down Expand Up @@ -46,36 +46,57 @@ class EmptyDatasetError(FileNotFoundError):
}
NON_WORDS_CHARS = "-._ 0-9"
if config.FSSPEC_VERSION < version.parse("2023.9.0"):
KEYWORDS_IN_PATH_NAME_BASE_PATTERNS = ["{keyword}[{sep}/]**", "**[{sep}/]{keyword}[{sep}/]**"]
KEYWORDS_IN_FILENAME_BASE_PATTERNS = ["**[{sep}/]{keyword}[{sep}]*", "{keyword}[{sep}]*"]
Copy link
Member

@albertvillanova albertvillanova Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a breaking change impacting all uses of the variable KEYWORDS_IN_PATH_NAME_BASE_PATTERNS. See: https://github.com/huggingface/dataset-viewer/actions/runs/8753796799/job/24024224560?pr=2740

CC: @mariosasko @lhoestq

Copy link
Collaborator Author

@mariosasko mariosasko Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataset-viewer seems to be the only repo on GH using KEYWORDS_IN_PATH_NAME_BASE_PATTERNS...

But I think it's okay to add it back if it's hard to fix datasets-viewer's CI otherwise (merging KEYWORDS_IN_FILENAME_BASE_PATTERNS and KEYWORDS_IN_DIR_NAME_BASE_PATTERNS should fix it, no?).

Copy link
Member

@lhoestq lhoestq Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to add it back imo, we can just use the new variables (adding the two together) in dataset-viewer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fixing this in dataset-viewer: huggingface/dataset-viewer#2740 (comment)

KEYWORDS_IN_DIR_NAME_BASE_PATTERNS = [
"{keyword}/**",
"{keyword}[{sep}]*/**",
"**[{sep}/]{keyword}/**",
"**[{sep}/]{keyword}[{sep}]*/**",
]
elif config.FSSPEC_VERSION < version.parse("2023.12.0"):
KEYWORDS_IN_PATH_NAME_BASE_PATTERNS = ["{keyword}[{sep}/]**", "**/*[{sep}/]{keyword}[{sep}/]**"]
KEYWORDS_IN_FILENAME_BASE_PATTERNS = ["**/*[{sep}/]{keyword}[{sep}]*", "{keyword}[{sep}]*"]
KEYWORDS_IN_DIR_NAME_BASE_PATTERNS = [
"{keyword}/**/*",
"{keyword}[{sep}]*/**/*",
"**/*[{sep}/]{keyword}/**/*",
"**/*[{sep}/]{keyword}[{sep}]*/**/*",
]
else:
KEYWORDS_IN_PATH_NAME_BASE_PATTERNS = [
"**/{keyword}[{sep}]*",
KEYWORDS_IN_FILENAME_BASE_PATTERNS = ["**/{keyword}[{sep}]*", "**/*[{sep}]{keyword}[{sep}]*"]
KEYWORDS_IN_DIR_NAME_BASE_PATTERNS = [
"**/{keyword}/**",
"**/*[{sep}]{keyword}[{sep}]*",
"**/*[{sep}]{keyword}[{sep}]*/**",
"**/{keyword}[{sep}]*/**",
"**/*[{sep}]{keyword}/**",
"**/*[{sep}]{keyword}[{sep}]*/**",
]

DEFAULT_SPLITS = [Split.TRAIN, Split.VALIDATION, Split.TEST]
DEFAULT_PATTERNS_SPLIT_IN_PATH_NAME = {
DEFAULT_PATTERNS_SPLIT_IN_FILENAME = {
split: [
pattern.format(keyword=keyword, sep=NON_WORDS_CHARS)
for keyword in SPLIT_KEYWORDS[split]
for pattern in KEYWORDS_IN_PATH_NAME_BASE_PATTERNS
for pattern in KEYWORDS_IN_FILENAME_BASE_PATTERNS
]
for split in DEFAULT_SPLITS
}
DEFAULT_PATTERNS_SPLIT_IN_DIR_NAME = {
split: [
pattern.format(keyword=keyword, sep=NON_WORDS_CHARS)
for keyword in SPLIT_KEYWORDS[split]
for pattern in KEYWORDS_IN_DIR_NAME_BASE_PATTERNS
]
for split in DEFAULT_SPLITS
}


DEFAULT_PATTERNS_ALL = {
Split.TRAIN: ["**"],
}

ALL_SPLIT_PATTERNS = [SPLIT_PATTERN_SHARDED]
ALL_DEFAULT_PATTERNS = [
DEFAULT_PATTERNS_SPLIT_IN_PATH_NAME,
DEFAULT_PATTERNS_SPLIT_IN_DIR_NAME,
DEFAULT_PATTERNS_SPLIT_IN_FILENAME,
DEFAULT_PATTERNS_ALL,
]
if config.FSSPEC_VERSION < version.parse("2023.9.0"):
Expand Down Expand Up @@ -351,7 +372,7 @@ def resolve_pattern(
else:
base_path = ""
pattern, storage_options = _prepare_path_and_storage_options(pattern, download_config=download_config)
fs, _, _ = get_fs_token_paths(pattern, storage_options=storage_options)
fs, *_ = url_to_fs(pattern, **storage_options)
fs_base_path = base_path.split("::")[0].split("://")[-1] or fs.root_marker
fs_pattern = pattern.split("::")[0].split("://")[-1]
files_to_ignore = set(FILES_TO_IGNORE) - {xbasename(pattern)}
Expand Down Expand Up @@ -409,7 +430,7 @@ def get_data_patterns(base_path: str, download_config: Optional[DownloadConfig]

Output:

{"train": ["**"]}
{'train': ['**']}

Input:

Expand All @@ -435,8 +456,8 @@ def get_data_patterns(base_path: str, download_config: Optional[DownloadConfig]

Output:

{'train': ['train[-._ 0-9/]**', '**/*[-._ 0-9/]train[-._ 0-9/]**', 'training[-._ 0-9/]**', '**/*[-._ 0-9/]training[-._ 0-9/]**'],
'test': ['test[-._ 0-9/]**', '**/*[-._ 0-9/]test[-._ 0-9/]**', 'testing[-._ 0-9/]**', '**/*[-._ 0-9/]testing[-._ 0-9/]**', ...]}
{'train': ['**/train[-._ 0-9]*', '**/*[-._ 0-9]train[-._ 0-9]*', '**/training[-._ 0-9]*', '**/*[-._ 0-9]training[-._ 0-9]*'],
'test': ['**/test[-._ 0-9]*', '**/*[-._ 0-9]test[-._ 0-9]*', '**/testing[-._ 0-9]*', '**/*[-._ 0-9]testing[-._ 0-9]*', ...]}

Input:

Expand All @@ -454,8 +475,8 @@ def get_data_patterns(base_path: str, download_config: Optional[DownloadConfig]

Output:

{'train': ['train[-._ 0-9/]**', '**/*[-._ 0-9/]train[-._ 0-9/]**', 'training[-._ 0-9/]**', '**/*[-._ 0-9/]training[-._ 0-9/]**'],
'test': ['test[-._ 0-9/]**', '**/*[-._ 0-9/]test[-._ 0-9/]**', 'testing[-._ 0-9/]**', '**/*[-._ 0-9/]testing[-._ 0-9/]**', ...]}
{'train': ['**/train/**', '**/train[-._ 0-9]*/**', '**/*[-._ 0-9]train/**', '**/*[-._ 0-9]train[-._ 0-9]*/**', ...],
'test': ['**/test/**', '**/test[-._ 0-9]*/**', '**/*[-._ 0-9]test/**', '**/*[-._ 0-9]test[-._ 0-9]*/**', ...]}

Input:

Expand Down Expand Up @@ -504,7 +525,7 @@ def _get_single_origin_metadata(
download_config: Optional[DownloadConfig] = None,
) -> Tuple[str]:
data_file, storage_options = _prepare_path_and_storage_options(data_file, download_config=download_config)
fs, _, _ = get_fs_token_paths(data_file, storage_options=storage_options)
fs, *_ = url_to_fs(data_file, **storage_options)
if isinstance(fs, HfFileSystem):
resolved_path = fs.resolve_path(data_file)
return (resolved_path.repo_id, resolved_path.revision)
Expand Down
5 changes: 3 additions & 2 deletions src/datasets/dataset_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import fsspec
import numpy as np
from fsspec.core import url_to_fs
from huggingface_hub import (
CommitInfo,
CommitOperationAdd,
Expand Down Expand Up @@ -1280,7 +1281,7 @@ def save_to_disk(
storage_options = fs.storage_options

fs: fsspec.AbstractFileSystem
fs, _, _ = fsspec.get_fs_token_paths(dataset_dict_path, storage_options=storage_options)
fs, _ = url_to_fs(dataset_dict_path, **(storage_options or {}))

if num_shards is None:
num_shards = {k: None for k in self}
Expand Down Expand Up @@ -1354,7 +1355,7 @@ def load_from_disk(
storage_options = fs.storage_options

fs: fsspec.AbstractFileSystem
fs, _, [dataset_dict_path] = fsspec.get_fs_token_paths(dataset_dict_path, storage_options=storage_options)
fs, dataset_dict_path = url_to_fs(dataset_dict_path, **(storage_options or {}))

dataset_dict_json_path = posixpath.join(dataset_dict_path, config.DATASETDICT_JSON_FILENAME)
dataset_state_json_path = posixpath.join(dataset_dict_path, config.DATASET_STATE_JSON_FILENAME)
Expand Down
25 changes: 9 additions & 16 deletions src/datasets/download/streaming_download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import fsspec
from aiohttp.client_exceptions import ClientError
from fsspec.core import url_to_fs
from huggingface_hub.utils import EntryNotFoundError
from packaging import version

Expand Down Expand Up @@ -159,7 +160,7 @@ def xexists(urlpath: str, download_config: Optional[DownloadConfig] = None):
else:
urlpath, storage_options = _prepare_path_and_storage_options(urlpath, download_config=download_config)
main_hop, *rest_hops = urlpath.split("::")
fs, *_ = fsspec.get_fs_token_paths(urlpath, storage_options=storage_options)
fs, *_ = url_to_fs(urlpath, **storage_options)
return fs.exists(main_hop)


Expand Down Expand Up @@ -259,7 +260,7 @@ def xisfile(path, download_config: Optional[DownloadConfig] = None) -> bool:
else:
path, storage_options = _prepare_path_and_storage_options(path, download_config=download_config)
main_hop, *rest_hops = path.split("::")
fs, *_ = fsspec.get_fs_token_paths(path, storage_options=storage_options)
fs, *_ = url_to_fs(path, **storage_options)
return fs.isfile(main_hop)


Expand All @@ -279,7 +280,7 @@ def xgetsize(path, download_config: Optional[DownloadConfig] = None) -> int:
else:
path, storage_options = _prepare_path_and_storage_options(path, download_config=download_config)
main_hop, *rest_hops = path.split("::")
fs, *_ = fsspec.get_fs_token_paths(path, storage_options=storage_options)
fs, *_ = fs, *_ = url_to_fs(path, **storage_options)
try:
size = fs.size(main_hop)
except EntryNotFoundError:
Expand Down Expand Up @@ -307,7 +308,7 @@ def xisdir(path, download_config: Optional[DownloadConfig] = None) -> bool:
else:
path, storage_options = _prepare_path_and_storage_options(path, download_config=download_config)
main_hop, *rest_hops = path.split("::")
fs, *_ = fsspec.get_fs_token_paths(path, storage_options=storage_options)
fs, *_ = fs, *_ = url_to_fs(path, **storage_options)
inner_path = main_hop.split("://")[-1]
if not inner_path.strip("/"):
return True
Expand Down Expand Up @@ -546,7 +547,7 @@ def xlistdir(path: str, download_config: Optional[DownloadConfig] = None) -> Lis
# globbing inside a zip in a private repo requires authentication
path, storage_options = _prepare_path_and_storage_options(path, download_config=download_config)
main_hop, *rest_hops = path.split("::")
fs, *_ = fsspec.get_fs_token_paths(path, storage_options=storage_options)
fs, *_ = url_to_fs(path, **storage_options)
inner_path = main_hop.split("://")[-1]
if inner_path.strip("/") and not fs.isdir(inner_path):
raise FileNotFoundError(f"Directory doesn't exist: {path}")
Expand All @@ -573,11 +574,7 @@ def xglob(urlpath, *, recursive=False, download_config: Optional[DownloadConfig]
# globbing inside a zip in a private repo requires authentication
urlpath, storage_options = _prepare_path_and_storage_options(urlpath, download_config=download_config)
main_hop, *rest_hops = urlpath.split("::")
fs, *_ = fsspec.get_fs_token_paths(urlpath, storage_options=storage_options)
# - If there's no "*" in the pattern, get_fs_token_paths() doesn't do any pattern matching
# so to be able to glob patterns like "[0-9]", we have to call `fs.glob`.
# - Also "*" in get_fs_token_paths() only matches files: we have to call `fs.glob` to match directories.
# - If there is "**" in the pattern, `fs.glob` must be called anyway.
fs, *_ = url_to_fs(urlpath, **storage_options)
inner_path = main_hop.split("://")[1]
globbed_paths = fs.glob(inner_path)
protocol = fs.protocol if isinstance(fs.protocol, str) else fs.protocol[-1]
Expand All @@ -603,7 +600,7 @@ def xwalk(urlpath, download_config: Optional[DownloadConfig] = None, **kwargs):
# walking inside a zip in a private repo requires authentication
urlpath, storage_options = _prepare_path_and_storage_options(urlpath, download_config=download_config)
main_hop, *rest_hops = urlpath.split("::")
fs, *_ = fsspec.get_fs_token_paths(urlpath, storage_options=storage_options)
fs, *_ = url_to_fs(urlpath, **storage_options)
inner_path = main_hop.split("://")[-1]
if inner_path.strip("/") and not fs.isdir(inner_path):
return []
Expand Down Expand Up @@ -659,11 +656,7 @@ def glob(self, pattern, download_config: Optional[DownloadConfig] = None):
posix_path = "::".join([main_hop, urlpath, *rest_hops[1:]])
else:
storage_options = None
fs, *_ = fsspec.get_fs_token_paths(xjoin(posix_path, pattern), storage_options=storage_options)
# - If there's no "*" in the pattern, get_fs_token_paths() doesn't do any pattern matching
# so to be able to glob patterns like "[0-9]", we have to call `fs.glob`.
# - Also "*" in get_fs_token_paths() only matches files: we have to call `fs.glob` to match directories.
# - If there is "**" in the pattern, `fs.glob` must be called anyway.
fs, *_ = url_to_fs(xjoin(posix_path, pattern), **(storage_options or {}))
globbed_paths = fs.glob(xjoin(main_hop, pattern))
for globbed_path in globbed_paths:
yield type(self)("::".join([f"{fs.protocol}://{globbed_path}"] + rest_hops))
Expand Down
17 changes: 0 additions & 17 deletions src/datasets/filesystems/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import importlib
import shutil
import threading
import warnings
from typing import List

Expand Down Expand Up @@ -68,19 +67,3 @@ def rename(fs: fsspec.AbstractFileSystem, src: str, dst: str):
shutil.move(fs._strip_protocol(src), fs._strip_protocol(dst))
else:
fs.mv(src, dst, recursive=True)


def _reset_fsspec_lock() -> None:
"""
Clear reference to the loop and thread.
This is necessary otherwise HTTPFileSystem hangs in the ML training loop.
Only required for fsspec >= 0.9.0
See https://github.com/fsspec/gcsfs/issues/379
"""
if hasattr(fsspec.asyn, "reset_lock"):
# for future fsspec>2022.05.0
fsspec.asyn.reset_lock()
else:
fsspec.asyn.iothread[0] = None
fsspec.asyn.loop[0] = None
fsspec.asyn.lock = threading.Lock()
5 changes: 3 additions & 2 deletions src/datasets/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from typing import ClassVar, Dict, List, Optional, Union

import fsspec
from fsspec.core import url_to_fs
from huggingface_hub import DatasetCard, DatasetCardData

from . import config
Expand Down Expand Up @@ -251,7 +252,7 @@ def write_to_directory(
storage_options = fs.storage_options

fs: fsspec.AbstractFileSystem
fs, _, _ = fsspec.get_fs_token_paths(dataset_info_dir, storage_options=storage_options)
fs, *_ = url_to_fs(dataset_info_dir, **(storage_options or {}))
with fs.open(posixpath.join(dataset_info_dir, config.DATASET_INFO_FILENAME), "wb") as f:
self._dump_info(f, pretty_print=pretty_print)
if self.license:
Expand Down Expand Up @@ -347,7 +348,7 @@ def from_directory(
storage_options = fs.storage_options

fs: fsspec.AbstractFileSystem
fs, _, _ = fsspec.get_fs_token_paths(dataset_info_dir, storage_options=storage_options)
fs, *_ = url_to_fs(dataset_info_dir, **(storage_options or {}))
logger.info(f"Loading Dataset info from {dataset_info_dir}")
if not dataset_info_dir:
raise ValueError("Calling DatasetInfo.from_directory() with undefined dataset_info_dir.")
Expand Down
7 changes: 4 additions & 3 deletions src/datasets/iterable_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from itertools import cycle, islice
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union

import fsspec.asyn
import numpy as np
import pyarrow as pa

from . import config
from .arrow_dataset import Dataset, DatasetInfoMixin
from .features import Features
from .features.features import FeatureType, _align_features, _check_if_features_can_be_aligned, cast_to_python_objects
from .filesystems import _reset_fsspec_lock
from .formatting import PythonFormatter, TensorFormatter, get_format_type_from_alias, get_formatter
from .info import DatasetInfo
from .splits import NamedSplit
Expand Down Expand Up @@ -1257,8 +1257,9 @@ def n_shards(self) -> int:

def _iter_pytorch(self):
ex_iterable = self._prepare_ex_iterable_for_iteration()
# fix for fsspec when using multiprocess
_reset_fsspec_lock()
# Fix for fsspec when using multiprocess to avoid hanging in the ML training loop. (only required for fsspec >= 0.9.0)
# See https://github.com/fsspec/gcsfs/issues/379
fsspec.asyn.reset_lock()
# check if there aren't too many workers
import torch.utils.data

Expand Down
Loading
Loading