Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate Beam API and download from HF GCS bucket #6474

Merged
merged 15 commits into from
Mar 12, 2024
6 changes: 6 additions & 0 deletions docs/source/beam.mdx
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Beam Datasets

<Tip warning={true}>

The Beam API is deprecated and will be removed in the next major release.

</Tip>

Some datasets are too large to be processed on a single machine. Instead, you can process them with [Apache Beam](https://beam.apache.org/), a library for parallel data processing. The processing pipeline is executed on a distributed processing backend such as [Apache Flink](https://flink.apache.org/), [Apache Spark](https://spark.apache.org/), or [Google Cloud Dataflow](https://cloud.google.com/dataflow).

We have already created Beam pipelines for some of the larger datasets like [wikipedia](https://huggingface.co/datasets/wikipedia), and [wiki40b](https://huggingface.co/datasets/wiki40b). You can load these normally with [`load_dataset`]. But if you want to run your own Beam pipeline with Dataflow, here is how:
Expand Down
2 changes: 2 additions & 0 deletions src/datasets/arrow_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from .table import InMemoryTable, MemoryMappedTable, Table, concat_tables
from .utils import logging
from .utils import tqdm as hf_tqdm
from .utils.deprecation_utils import deprecated
from .utils.file_utils import cached_path


Expand Down Expand Up @@ -284,6 +285,7 @@ def read_files(
dataset_kwargs = {"arrow_table": pa_table, "info": self._info, "split": split}
return dataset_kwargs

@deprecated()
def download_from_hf_gcs(self, download_config: DownloadConfig, relative_data_dir):
"""
Download the dataset files from the Hf GCS
Expand Down
19 changes: 18 additions & 1 deletion src/datasets/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from .utils import logging
from .utils import tqdm as hf_tqdm
from .utils._filelock import FileLock
from .utils.deprecation_utils import deprecated
from .utils.file_utils import cached_path, is_remote_url
from .utils.info_utils import VerificationMode, get_size_checksum_dict, verify_checksums, verify_splits
from .utils.py_utils import (
Expand Down Expand Up @@ -748,7 +749,7 @@ def download_and_prepare(
download_mode: Optional[Union[DownloadMode, str]] = None,
verification_mode: Optional[Union[VerificationMode, str]] = None,
ignore_verifications="deprecated",
try_from_hf_gcs: bool = True,
try_from_hf_gcs="deprecated",
dl_manager: Optional[DownloadManager] = None,
base_path: Optional[str] = None,
use_auth_token="deprecated",
Expand Down Expand Up @@ -785,6 +786,13 @@ def download_and_prepare(
</Deprecated>
try_from_hf_gcs (`bool`):
If `True`, it will try to download the already prepared dataset from the HF Google cloud storage.

<Deprecated version="2.16.0">

`try_from_hf_gcs` was deprecated in version 2.16.0 and will be removed in 3.0.0.
Host the processed files on the Hugging Face Hub instead.

</Deprecated>
dl_manager (`DownloadManager`, *optional*):
Specific `DownloadManger` to use.
base_path (`str`, *optional*):
Expand Down Expand Up @@ -865,6 +873,14 @@ def download_and_prepare(
else:
token = self.token

if try_from_hf_gcs != "deprecated":
warnings.warn(
"'try_from_hf_gcs' was deprecated in version 2.16.0 and will be removed in 3.0.0.",
FutureWarning,
)
else:
try_from_hf_gcs = False

output_dir = output_dir if output_dir is not None else self._cache_dir
# output_dir can be a remote bucket on GCS or S3 (when using BeamBasedBuilder for distributed data processing)
fs, _, [output_dir] = fsspec.get_fs_token_paths(output_dir, storage_options=storage_options)
Expand Down Expand Up @@ -2025,6 +2041,7 @@ class MissingBeamOptions(ValueError):
pass


@deprecated("Use `GeneratorBasedBuilder` or `ArrowBasedBuilder` instead.")
class BeamBasedBuilder(DatasetBuilder):
"""Beam-based Builder."""

Expand Down
5 changes: 4 additions & 1 deletion src/datasets/commands/run_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datasets.download.download_config import DownloadConfig
from datasets.download.download_manager import DownloadMode
from datasets.load import dataset_module_factory, import_main_class
from datasets.utils.deprecation_utils import deprecated
from datasets.utils.info_utils import VerificationMode


Expand All @@ -28,6 +29,9 @@ def run_beam_command_factory(args, **kwargs):
)


@deprecated(
"`BeamBasedBuilder` and `datasets-cli run_beam` are deprecated and will be removed in v3.0.0. Please use `GeneratorBasedBuilder` or `ArrowBasedBuilder` instead."
)
class RunBeamCommand(BaseDatasetsCLICommand):
@staticmethod
def register_subcommand(parser: ArgumentParser):
Expand Down Expand Up @@ -135,7 +139,6 @@ def run(self):
verification_mode=VerificationMode.NO_CHECKS
if self._ignore_verifications
else VerificationMode.ALL_CHECKS,
try_from_hf_gcs=False,
)
if self._save_infos:
builder._save_infos()
Expand Down
1 change: 0 additions & 1 deletion src/datasets/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def read(self):
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
# try_from_hf_gcs=try_from_hf_gcs,
base_path=base_path,
num_proc=self.num_proc,
)
Expand Down
1 change: 0 additions & 1 deletion src/datasets/io/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def read(self):
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
try_from_hf_gcs=False,
base_path=base_path,
num_proc=self.num_proc,
)
Expand Down
1 change: 0 additions & 1 deletion src/datasets/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def read(self):
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
# try_from_hf_gcs=try_from_hf_gcs,
base_path=base_path,
num_proc=self.num_proc,
)
Expand Down
1 change: 0 additions & 1 deletion src/datasets/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def read(self):
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
# try_from_hf_gcs=try_from_hf_gcs,
base_path=base_path,
)

Expand Down
1 change: 0 additions & 1 deletion src/datasets/io/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def read(self):
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
# try_from_hf_gcs=try_from_hf_gcs,
base_path=base_path,
num_proc=self.num_proc,
)
Expand Down
5 changes: 0 additions & 5 deletions src/datasets/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -2594,16 +2594,11 @@ def load_dataset(
if streaming:
return builder_instance.as_streaming_dataset(split=split)

# Some datasets are already processed on the HF google storage
# Don't try downloading from Google storage for the packaged datasets as text, json, csv or pandas
try_from_hf_gcs = path not in _PACKAGED_DATASETS_MODULES

# Download and prepare data
builder_instance.download_and_prepare(
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
try_from_hf_gcs=try_from_hf_gcs,
num_proc=num_proc,
storage_options=storage_options,
)
Expand Down
34 changes: 12 additions & 22 deletions tests/test_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def _generate_examples(self, filepaths, dummy_kwarg_with_different_length):

def _run_concurrent_download_and_prepare(tmp_dir):
builder = DummyBuilder(cache_dir=tmp_dir)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.REUSE_DATASET_IF_EXISTS)
builder.download_and_prepare(download_mode=DownloadMode.REUSE_DATASET_IF_EXISTS)
return builder


Expand All @@ -257,7 +257,7 @@ class BuilderTest(TestCase):
def test_download_and_prepare(self):
with tempfile.TemporaryDirectory() as tmp_dir:
builder = DummyBuilder(cache_dir=tmp_dir)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
os.path.exists(
os.path.join(
Expand All @@ -274,15 +274,12 @@ def test_download_and_prepare(self):
def test_download_and_prepare_checksum_computation(self):
with tempfile.TemporaryDirectory() as tmp_dir:
builder_no_verification = DummyBuilder(cache_dir=tmp_dir)
builder_no_verification.download_and_prepare(
try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD
)
builder_no_verification.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
all(v["checksum"] is not None for _, v in builder_no_verification.info.download_checksums.items())
)
builder_with_verification = DummyBuilder(cache_dir=tmp_dir)
builder_with_verification.download_and_prepare(
try_from_hf_gcs=False,
download_mode=DownloadMode.FORCE_REDOWNLOAD,
verification_mode=VerificationMode.ALL_CHECKS,
)
Expand Down Expand Up @@ -326,22 +323,16 @@ def test_download_and_prepare_with_base_path(self):
# test relative path is missing
builder = DummyBuilderWithDownload(cache_dir=tmp_dir, rel_path=rel_path)
with self.assertRaises(FileNotFoundError):
builder.download_and_prepare(
try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir
)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir)
# test absolute path is missing
builder = DummyBuilderWithDownload(cache_dir=tmp_dir, abs_path=abs_path)
with self.assertRaises(FileNotFoundError):
builder.download_and_prepare(
try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir
)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir)
# test that they are both properly loaded when they exist
open(os.path.join(tmp_dir, rel_path), "w")
open(abs_path, "w")
builder = DummyBuilderWithDownload(cache_dir=tmp_dir, rel_path=rel_path, abs_path=abs_path)
builder.download_and_prepare(
try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir
)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir)
self.assertTrue(
os.path.exists(
os.path.join(
Expand Down Expand Up @@ -580,7 +571,7 @@ def _post_processing_resources(self, split):
)
builder._post_process = types.MethodType(_post_process, builder)
builder._post_processing_resources = types.MethodType(_post_processing_resources, builder)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
os.path.exists(
os.path.join(
Expand All @@ -604,7 +595,7 @@ def _post_process(self, dataset, resources_paths):
with tempfile.TemporaryDirectory() as tmp_dir:
builder = DummyBuilder(cache_dir=tmp_dir)
builder._post_process = types.MethodType(_post_process, builder)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
os.path.exists(
os.path.join(
Expand Down Expand Up @@ -637,7 +628,7 @@ def _post_processing_resources(self, split):
builder = DummyBuilder(cache_dir=tmp_dir)
builder._post_process = types.MethodType(_post_process, builder)
builder._post_processing_resources = types.MethodType(_post_processing_resources, builder)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
os.path.exists(
os.path.join(
Expand All @@ -662,15 +653,14 @@ def _prepare_split(self, split_generator, **kwargs):
self.assertRaises(
ValueError,
builder.download_and_prepare,
try_from_hf_gcs=False,
download_mode=DownloadMode.FORCE_REDOWNLOAD,
)
self.assertRaises(FileNotFoundError, builder.as_dataset)

def test_generator_based_download_and_prepare(self):
with tempfile.TemporaryDirectory() as tmp_dir:
builder = DummyGeneratorBasedBuilder(cache_dir=tmp_dir)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
os.path.exists(
os.path.join(
Expand Down Expand Up @@ -940,7 +930,7 @@ def test_generator_based_builder_as_dataset(in_memory, tmp_path):
cache_dir.mkdir()
cache_dir = str(cache_dir)
builder = DummyGeneratorBasedBuilder(cache_dir=cache_dir)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
with assert_arrow_memory_increases() if in_memory else assert_arrow_memory_doesnt_increase():
dataset = builder.as_dataset("train", in_memory=in_memory)
assert dataset.data.to_pydict() == {"text": ["foo"] * 100}
Expand All @@ -955,7 +945,7 @@ def test_custom_writer_batch_size(tmp_path, writer_batch_size, default_writer_ba
DummyGeneratorBasedBuilder.DEFAULT_WRITER_BATCH_SIZE = default_writer_batch_size
builder = DummyGeneratorBasedBuilder(cache_dir=cache_dir, writer_batch_size=writer_batch_size)
assert builder._writer_batch_size == (writer_batch_size or default_writer_batch_size)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
dataset = builder.as_dataset("train")
assert len(dataset.data[0].chunks) == expected_chunks

Expand Down