Skip to content

Commit

Permalink
Deprecate Beam API and download from HF GCS bucket (#6474)
Browse files Browse the repository at this point in the history
* Deprecate `BeamBasedBuilder` and download from HF GCS bucket

* Improvements

* Add deprecation warning to docs

* Nit

* Fix CI

* Fix windows CI

* Small fix

* Remove import

* Fix tests
  • Loading branch information
mariosasko committed Mar 12, 2024
1 parent 90b8961 commit f90b65d
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 90 deletions.
6 changes: 6 additions & 0 deletions docs/source/beam.mdx
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Beam Datasets

<Tip warning={true}>

The Beam API is deprecated and will be removed in the next major release.

</Tip>

Some datasets are too large to be processed on a single machine. Instead, you can process them with [Apache Beam](https://beam.apache.org/), a library for parallel data processing. The processing pipeline is executed on a distributed processing backend such as [Apache Flink](https://flink.apache.org/), [Apache Spark](https://spark.apache.org/), or [Google Cloud Dataflow](https://cloud.google.com/dataflow).

We have already created Beam pipelines for some of the larger datasets like [wikipedia](https://huggingface.co/datasets/wikipedia), and [wiki40b](https://huggingface.co/datasets/wiki40b). You can load these normally with [`load_dataset`]. But if you want to run your own Beam pipeline with Dataflow, here is how:
Expand Down
2 changes: 2 additions & 0 deletions src/datasets/arrow_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from .table import InMemoryTable, MemoryMappedTable, Table, concat_tables
from .utils import logging
from .utils import tqdm as hf_tqdm
from .utils.deprecation_utils import deprecated
from .utils.file_utils import cached_path


Expand Down Expand Up @@ -284,6 +285,7 @@ def read_files(
dataset_kwargs = {"arrow_table": pa_table, "info": self._info, "split": split}
return dataset_kwargs

@deprecated()
def download_from_hf_gcs(self, download_config: DownloadConfig, relative_data_dir):
"""
Download the dataset files from the Hf GCS
Expand Down
19 changes: 18 additions & 1 deletion src/datasets/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from .utils import logging
from .utils import tqdm as hf_tqdm
from .utils._filelock import FileLock
from .utils.deprecation_utils import deprecated
from .utils.file_utils import cached_path, is_remote_url
from .utils.info_utils import VerificationMode, get_size_checksum_dict, verify_checksums, verify_splits
from .utils.py_utils import (
Expand Down Expand Up @@ -748,7 +749,7 @@ def download_and_prepare(
download_mode: Optional[Union[DownloadMode, str]] = None,
verification_mode: Optional[Union[VerificationMode, str]] = None,
ignore_verifications="deprecated",
try_from_hf_gcs: bool = True,
try_from_hf_gcs="deprecated",
dl_manager: Optional[DownloadManager] = None,
base_path: Optional[str] = None,
use_auth_token="deprecated",
Expand Down Expand Up @@ -785,6 +786,13 @@ def download_and_prepare(
</Deprecated>
try_from_hf_gcs (`bool`):
If `True`, it will try to download the already prepared dataset from the HF Google cloud storage.
<Deprecated version="2.16.0">
`try_from_hf_gcs` was deprecated in version 2.16.0 and will be removed in 3.0.0.
Host the processed files on the Hugging Face Hub instead.
</Deprecated>
dl_manager (`DownloadManager`, *optional*):
Specific `DownloadManger` to use.
base_path (`str`, *optional*):
Expand Down Expand Up @@ -865,6 +873,14 @@ def download_and_prepare(
else:
token = self.token

if try_from_hf_gcs != "deprecated":
warnings.warn(
"'try_from_hf_gcs' was deprecated in version 2.16.0 and will be removed in 3.0.0.",
FutureWarning,
)
else:
try_from_hf_gcs = False

output_dir = output_dir if output_dir is not None else self._cache_dir
# output_dir can be a remote bucket on GCS or S3 (when using BeamBasedBuilder for distributed data processing)
fs, _, [output_dir] = fsspec.get_fs_token_paths(output_dir, storage_options=storage_options)
Expand Down Expand Up @@ -2025,6 +2041,7 @@ class MissingBeamOptions(ValueError):
pass


@deprecated("Use `GeneratorBasedBuilder` or `ArrowBasedBuilder` instead.")
class BeamBasedBuilder(DatasetBuilder):
"""Beam-based Builder."""

Expand Down
5 changes: 4 additions & 1 deletion src/datasets/commands/run_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datasets.download.download_config import DownloadConfig
from datasets.download.download_manager import DownloadMode
from datasets.load import dataset_module_factory, import_main_class
from datasets.utils.deprecation_utils import deprecated
from datasets.utils.info_utils import VerificationMode


Expand All @@ -28,6 +29,9 @@ def run_beam_command_factory(args, **kwargs):
)


@deprecated(
"`BeamBasedBuilder` and `datasets-cli run_beam` are deprecated and will be removed in v3.0.0. Please use `GeneratorBasedBuilder` or `ArrowBasedBuilder` instead."
)
class RunBeamCommand(BaseDatasetsCLICommand):
@staticmethod
def register_subcommand(parser: ArgumentParser):
Expand Down Expand Up @@ -135,7 +139,6 @@ def run(self):
verification_mode=VerificationMode.NO_CHECKS
if self._ignore_verifications
else VerificationMode.ALL_CHECKS,
try_from_hf_gcs=False,
)
if self._save_infos:
builder._save_infos()
Expand Down
1 change: 0 additions & 1 deletion src/datasets/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def read(self):
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
# try_from_hf_gcs=try_from_hf_gcs,
base_path=base_path,
num_proc=self.num_proc,
)
Expand Down
1 change: 0 additions & 1 deletion src/datasets/io/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def read(self):
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
try_from_hf_gcs=False,
base_path=base_path,
num_proc=self.num_proc,
)
Expand Down
1 change: 0 additions & 1 deletion src/datasets/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def read(self):
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
# try_from_hf_gcs=try_from_hf_gcs,
base_path=base_path,
num_proc=self.num_proc,
)
Expand Down
1 change: 0 additions & 1 deletion src/datasets/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def read(self):
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
# try_from_hf_gcs=try_from_hf_gcs,
base_path=base_path,
)

Expand Down
1 change: 0 additions & 1 deletion src/datasets/io/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def read(self):
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
# try_from_hf_gcs=try_from_hf_gcs,
base_path=base_path,
num_proc=self.num_proc,
)
Expand Down
5 changes: 0 additions & 5 deletions src/datasets/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -2594,16 +2594,11 @@ def load_dataset(
if streaming:
return builder_instance.as_streaming_dataset(split=split)

# Some datasets are already processed on the HF google storage
# Don't try downloading from Google storage for the packaged datasets as text, json, csv or pandas
try_from_hf_gcs = path not in _PACKAGED_DATASETS_MODULES

# Download and prepare data
builder_instance.download_and_prepare(
download_config=download_config,
download_mode=download_mode,
verification_mode=verification_mode,
try_from_hf_gcs=try_from_hf_gcs,
num_proc=num_proc,
storage_options=storage_options,
)
Expand Down
34 changes: 12 additions & 22 deletions tests/test_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def _generate_examples(self, filepaths, dummy_kwarg_with_different_length):

def _run_concurrent_download_and_prepare(tmp_dir):
builder = DummyBuilder(cache_dir=tmp_dir)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.REUSE_DATASET_IF_EXISTS)
builder.download_and_prepare(download_mode=DownloadMode.REUSE_DATASET_IF_EXISTS)
return builder


Expand All @@ -257,7 +257,7 @@ class BuilderTest(TestCase):
def test_download_and_prepare(self):
with tempfile.TemporaryDirectory() as tmp_dir:
builder = DummyBuilder(cache_dir=tmp_dir)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
os.path.exists(
os.path.join(
Expand All @@ -274,15 +274,12 @@ def test_download_and_prepare(self):
def test_download_and_prepare_checksum_computation(self):
with tempfile.TemporaryDirectory() as tmp_dir:
builder_no_verification = DummyBuilder(cache_dir=tmp_dir)
builder_no_verification.download_and_prepare(
try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD
)
builder_no_verification.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
all(v["checksum"] is not None for _, v in builder_no_verification.info.download_checksums.items())
)
builder_with_verification = DummyBuilder(cache_dir=tmp_dir)
builder_with_verification.download_and_prepare(
try_from_hf_gcs=False,
download_mode=DownloadMode.FORCE_REDOWNLOAD,
verification_mode=VerificationMode.ALL_CHECKS,
)
Expand Down Expand Up @@ -326,22 +323,16 @@ def test_download_and_prepare_with_base_path(self):
# test relative path is missing
builder = DummyBuilderWithDownload(cache_dir=tmp_dir, rel_path=rel_path)
with self.assertRaises(FileNotFoundError):
builder.download_and_prepare(
try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir
)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir)
# test absolute path is missing
builder = DummyBuilderWithDownload(cache_dir=tmp_dir, abs_path=abs_path)
with self.assertRaises(FileNotFoundError):
builder.download_and_prepare(
try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir
)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir)
# test that they are both properly loaded when they exist
open(os.path.join(tmp_dir, rel_path), "w")
open(abs_path, "w")
builder = DummyBuilderWithDownload(cache_dir=tmp_dir, rel_path=rel_path, abs_path=abs_path)
builder.download_and_prepare(
try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir
)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir)
self.assertTrue(
os.path.exists(
os.path.join(
Expand Down Expand Up @@ -580,7 +571,7 @@ def _post_processing_resources(self, split):
)
builder._post_process = types.MethodType(_post_process, builder)
builder._post_processing_resources = types.MethodType(_post_processing_resources, builder)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
os.path.exists(
os.path.join(
Expand All @@ -604,7 +595,7 @@ def _post_process(self, dataset, resources_paths):
with tempfile.TemporaryDirectory() as tmp_dir:
builder = DummyBuilder(cache_dir=tmp_dir)
builder._post_process = types.MethodType(_post_process, builder)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
os.path.exists(
os.path.join(
Expand Down Expand Up @@ -637,7 +628,7 @@ def _post_processing_resources(self, split):
builder = DummyBuilder(cache_dir=tmp_dir)
builder._post_process = types.MethodType(_post_process, builder)
builder._post_processing_resources = types.MethodType(_post_processing_resources, builder)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
os.path.exists(
os.path.join(
Expand All @@ -662,15 +653,14 @@ def _prepare_split(self, split_generator, **kwargs):
self.assertRaises(
ValueError,
builder.download_and_prepare,
try_from_hf_gcs=False,
download_mode=DownloadMode.FORCE_REDOWNLOAD,
)
self.assertRaises(FileNotFoundError, builder.as_dataset)

def test_generator_based_download_and_prepare(self):
with tempfile.TemporaryDirectory() as tmp_dir:
builder = DummyGeneratorBasedBuilder(cache_dir=tmp_dir)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
self.assertTrue(
os.path.exists(
os.path.join(
Expand Down Expand Up @@ -940,7 +930,7 @@ def test_generator_based_builder_as_dataset(in_memory, tmp_path):
cache_dir.mkdir()
cache_dir = str(cache_dir)
builder = DummyGeneratorBasedBuilder(cache_dir=cache_dir)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
with assert_arrow_memory_increases() if in_memory else assert_arrow_memory_doesnt_increase():
dataset = builder.as_dataset("train", in_memory=in_memory)
assert dataset.data.to_pydict() == {"text": ["foo"] * 100}
Expand All @@ -955,7 +945,7 @@ def test_custom_writer_batch_size(tmp_path, writer_batch_size, default_writer_ba
DummyGeneratorBasedBuilder.DEFAULT_WRITER_BATCH_SIZE = default_writer_batch_size
builder = DummyGeneratorBasedBuilder(cache_dir=cache_dir, writer_batch_size=writer_batch_size)
assert builder._writer_batch_size == (writer_batch_size or default_writer_batch_size)
builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD)
builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD)
dataset = builder.as_dataset("train")
assert len(dataset.data[0].chunks) == expected_chunks

Expand Down

0 comments on commit f90b65d

Please sign in to comment.