Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better windows support #644

Merged
merged 35 commits into from Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c6fb176
fix test_arrow_dataset tests on windows
lhoestq Sep 18, 2020
5b4092d
fix array tests
lhoestq Sep 23, 2020
a10d959
fix arrow reader test
lhoestq Sep 23, 2020
e41c3f1
fix arrow writer test
lhoestq Sep 23, 2020
d0bd5f3
fix test beam
lhoestq Sep 23, 2020
8c9ed71
fix test caching
lhoestq Sep 23, 2020
71c34ce
fix test builder
lhoestq Sep 23, 2020
497f535
use os.sep instead of regular slash
lhoestq Sep 23, 2020
77792cc
fix missing require lib decorators
lhoestq Sep 23, 2020
63b2fdf
specify encoding when reqdig files
lhoestq Sep 23, 2020
5650699
same for write
lhoestq Sep 23, 2020
1249457
fix dummy data url separator
lhoestq Sep 23, 2020
73b7427
fix csv in sogou
lhoestq Sep 23, 2020
2bb4d51
fix cmrc dummy data
lhoestq Sep 23, 2020
8afeaa5
close writer before raising error while generating examples
lhoestq Sep 23, 2020
4eda49a
fix web_question dummy data
lhoestq Sep 23, 2020
20a2a61
fix metric test
lhoestq Sep 23, 2020
7e5bc81
fix dataset test
lhoestq Sep 23, 2020
8c51cfb
fix test search
lhoestq Sep 24, 2020
a5672d7
fix test dataset dict
lhoestq Sep 24, 2020
b930853
fix dataset not compatible with windows
lhoestq Sep 24, 2020
eb4f3cc
fix test hf gcp
lhoestq Sep 24, 2020
3df94c4
fix tf export
lhoestq Sep 24, 2020
2cb0828
implement __del__ to fix permissions issues
lhoestq Sep 24, 2020
d05e9de
fix test metric
lhoestq Sep 24, 2020
203d947
fix weirdest bug I ever met (so far)
lhoestq Sep 24, 2020
6361172
del datasets instead of tables
lhoestq Sep 24, 2020
5c0bda0
fix test distributed dataset
lhoestq Sep 24, 2020
2a9fc9f
style
lhoestq Sep 25, 2020
e48b9d5
add ci job for windows
lhoestq Sep 25, 2020
d802938
update ci config version
lhoestq Sep 25, 2020
b01b5e6
add windows orb
lhoestq Sep 25, 2020
cfde6f2
use pip to install tensorflow
lhoestq Sep 25, 2020
8eaadf6
finalize ci config
lhoestq Sep 25, 2020
4e3c71d
fix test_map_nested
lhoestq Sep 25, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 44 additions & 1 deletion .circleci/config.yml
@@ -1,4 +1,8 @@
version: 2
version: 2.1

orbs:
win: circleci/windows@2.2.0

jobs:
run_dataset_script_tests_pyarrow_0p17:
working_directory: ~/datasets
Expand Down Expand Up @@ -26,6 +30,41 @@ jobs:
- run: pip install pyarrow==1.0.0
- run: HF_SCRIPTS_VERSION=master python -m pytest -sv ./tests/


run_dataset_script_tests_pyarrow_0p17_WIN:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Youpi!

working_directory: ~/datasets
executor:
name: win/default
shell: powershell
steps:
- checkout
- run: conda install python=3.6 --yes
- run: conda install pytorch --yes
- run: pip install virtualenv
- run: python -m virtualenv venv --system-site-packages
- run: "& venv/Scripts/activate.ps1"
- run: pip install .[tests]
- run: pip install pyarrow==0.17.1
- run: $env:HF_SCRIPTS_VERSION="master"
- run: python -m pytest -sv ./tests/

run_dataset_script_tests_pyarrow_1_WIN:
working_directory: ~/datasets
executor:
name: win/default
shell: powershell
steps:
- checkout
- run: conda install python=3.6 --yes
- run: conda install pytorch --yes
- run: pip install virtualenv
- run: python -m virtualenv venv --system-site-packages
- run: "& venv/Scripts/activate.ps1"
- run: pip install .[tests]
- run: pip install pyarrow==1.0.0
- run: $env:HF_SCRIPTS_VERSION="master"
- run: python -m pytest -sv ./tests/

check_code_quality:
working_directory: ~/datasets
docker:
Expand All @@ -38,6 +77,7 @@ jobs:
- run: black --check --line-length 119 --target-version py36 tests src benchmarks datasets metrics
- run: isort --check-only tests src benchmarks datasets metrics
- run: flake8 tests src benchmarks datasets metrics

build_doc:
working_directory: ~/datasets
docker:
Expand All @@ -48,6 +88,7 @@ jobs:
- run: cd docs && make html SPHINXOPTS="-W"
- store_artifacts:
path: ./docs/_build

deploy_doc:
working_directory: ~/datasets
docker:
Expand All @@ -73,5 +114,7 @@ workflows:
- check_code_quality
- run_dataset_script_tests_pyarrow_0p17
- run_dataset_script_tests_pyarrow_1
- run_dataset_script_tests_pyarrow_0p17_WIN
- run_dataset_script_tests_pyarrow_1_WIN
- build_doc
- deploy_doc: *workflow_filters
Binary file modified datasets/cmrc2018/dummy/0.1.0/dummy_data.zip
Binary file not shown.
4 changes: 2 additions & 2 deletions datasets/sogou_news/sogou_news.py
Expand Up @@ -19,13 +19,13 @@
from __future__ import absolute_import, division, print_function

import csv
import ctypes
import os
import sys

import datasets


csv.field_size_limit(sys.maxsize)
csv.field_size_limit(int(ctypes.c_ulong(-1).value // 2))


_CITATION = """\
Expand Down
Binary file modified datasets/web_questions/dummy/1.0.0/dummy_data.zip
Binary file not shown.
3 changes: 3 additions & 0 deletions setup.py
Expand Up @@ -107,6 +107,9 @@
'zstandard'
]

if os.name == "nt": # windows
TESTS_REQUIRE.remove("faiss-cpu") # faiss doesn't exist on windows


QUALITY_REQUIRE = [
"black",
Expand Down
26 changes: 20 additions & 6 deletions src/datasets/arrow_dataset.py
Expand Up @@ -28,6 +28,7 @@
from functools import partial, wraps
from math import ceil, floor
from multiprocessing import Pool, RLock
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np
Expand Down Expand Up @@ -374,6 +375,12 @@ def from_dict(
pa_table: pa.Table = pa.Table.from_pydict(mapping=mapping)
return cls(pa_table, info=info, split=split)

def __del__(self):
if hasattr(self, "_data"):
del self._data
if hasattr(self, "_indices"):
del self._indices

def __getstate__(self):
state = dict(self.__dict__)
state["_info"] = json.dumps(asdict(state["_info"]))
Expand Down Expand Up @@ -443,7 +450,7 @@ def save_to_disk(self, dataset_path: str):
for data_file in self._data_files + self._indices_data_files:
# Copy file to destination directory
src = data_file["filename"]
filename = src.split("/")[-1]
filename = Path(src).name
dest = os.path.join(dataset_path, filename)
if src != dest:
shutil.copy(src, dest)
Expand All @@ -458,9 +465,9 @@ def save_to_disk(self, dataset_path: str):
len(h["transforms"]) == 0 for h in state.get("_inplace_history", [])
), "in-place history needs to be empty"
# Serialize state
with open(os.path.join(dataset_path, "state.json"), "w") as state_file:
with open(os.path.join(dataset_path, "state.json"), "w", encoding="utf-8") as state_file:
json.dump(state, state_file, indent=2, sort_keys=True)
with open(os.path.join(dataset_path, "dataset_info.json"), "w") as dataset_info_file:
with open(os.path.join(dataset_path, "dataset_info.json"), "w", encoding="utf-8") as dataset_info_file:
json.dump(dataset_info, dataset_info_file, indent=2, sort_keys=True)
logger.info("Dataset saved in {}".format(dataset_path))

Expand All @@ -471,9 +478,9 @@ def load_from_disk(dataset_path: str) -> "Dataset":
Args:
dataset_path (``str``): path of the dataset directory where the dataset will be loaded from
"""
with open(os.path.join(dataset_path, "state.json"), "r") as state_file:
with open(os.path.join(dataset_path, "state.json"), "r", encoding="utf-8") as state_file:
state = json.load(state_file)
with open(os.path.join(dataset_path, "dataset_info.json"), "r") as dataset_info_file:
with open(os.path.join(dataset_path, "dataset_info.json"), "r", encoding="utf-8") as dataset_info_file:
dataset_info = json.load(dataset_info_file)
state["_info"] = json.dumps(dataset_info)
dataset = Dataset.from_dict({})
Expand Down Expand Up @@ -1500,12 +1507,16 @@ def apply_function_on_filtered_inputs(inputs, indices, check_same_num_examples=F
if update_data:
writer.finalize() # close_stream=bool(buf_writer is None)) # We only close if we are writing in a file
except (Exception, KeyboardInterrupt):
if update_data:
writer.finalize()
if update_data and tmp_file is not None:
tmp_file.close()
if os.path.exists(tmp_file.name):
os.remove(tmp_file.name)
raise

if update_data and tmp_file is not None:
tmp_file.close()
shutil.move(tmp_file.name, cache_file_name)

if update_data:
Expand Down Expand Up @@ -1748,11 +1759,13 @@ def select(
writer.finalize() # close_stream=bool(buf_writer is None)) # We only close if we are writing in a file
except (Exception, KeyboardInterrupt):
if tmp_file is not None:
tmp_file.close()
if os.path.exists(tmp_file.name):
os.remove(tmp_file.name)
raise

if tmp_file is not None:
tmp_file.close()
shutil.move(tmp_file.name, indices_cache_file_name)

# Return new Dataset object
Expand Down Expand Up @@ -2207,7 +2220,7 @@ def _feature(values: np.ndarray) -> "tf.train.Feature":
if isinstance(values, np.ndarray):
if values.dtype == np.dtype(float):
return _float_feature(values)
elif values.dtype == np.dtype(int):
elif values.dtype == np.int64:
return _int64_feature(values)
elif values.dtype == np.dtype(str) or (
values.dtype == np.dtype(object) and len(values) > 0 and isinstance(values[0], str)
Expand Down Expand Up @@ -2246,6 +2259,7 @@ def generator():
logger.info(f"Writing TFRecord to {filename}")
writer.write(tf_dataset)
logger.info(f"Finished writing TFRecord to {filename}")
self = None # delete the dataset reference used by tf_dataset

def add_faiss_index(
self,
Expand Down
6 changes: 3 additions & 3 deletions src/datasets/arrow_reader.py
Expand Up @@ -242,10 +242,10 @@ def download_from_hf_gcs(self, cache_dir, relative_data_dir):
the `datasets` directory on GCS.

"""
remote_cache_dir = os.path.join(HF_GCP_BASE_URL, relative_data_dir)
remote_cache_dir = HF_GCP_BASE_URL + "/" + relative_data_dir.replace(os.sep, "/")
try:
remote_dataset_info = os.path.join(remote_cache_dir, "dataset_info.json")
downloaded_dataset_info = cached_path(remote_dataset_info)
downloaded_dataset_info = cached_path(remote_dataset_info.replace(os.sep, "/"))
shutil.move(downloaded_dataset_info, os.path.join(cache_dir, "dataset_info.json"))
if self._info is not None:
self._info.update(self._info.from_directory(cache_dir))
Expand All @@ -260,7 +260,7 @@ def download_from_hf_gcs(self, cache_dir, relative_data_dir):
)
for file_instruction in file_instructions:
remote_prepared_filename = os.path.join(remote_cache_dir, file_instruction["filename"])
downloaded_prepared_filename = cached_path(remote_prepared_filename)
downloaded_prepared_filename = cached_path(remote_prepared_filename.replace(os.sep, "/"))
shutil.move(downloaded_prepared_filename, os.path.join(cache_dir, file_instruction["filename"]))
except FileNotFoundError:
raise MissingFilesOnHfGcs()
Expand Down
32 changes: 17 additions & 15 deletions src/datasets/builder.py
Expand Up @@ -173,7 +173,7 @@ def __init__(
# prepare data dirs
self._cache_dir_root = os.path.expanduser(cache_dir or HF_DATASETS_CACHE)
self._cache_dir = self._build_cache_dir()
lock_path = os.path.join(self._cache_dir_root, self._cache_dir.replace("/", "_") + ".lock")
lock_path = os.path.join(self._cache_dir_root, self._cache_dir.replace(os.sep, "_") + ".lock")
with FileLock(lock_path):
if os.path.exists(self._cache_dir): # check if data exist
if len(os.listdir(self._cache_dir)) > 0:
Expand Down Expand Up @@ -396,7 +396,7 @@ def download_and_prepare(
)

# Prevent parallel disk operations
lock_path = os.path.join(self._cache_dir_root, self._cache_dir.replace("/", "_") + ".lock")
lock_path = os.path.join(self._cache_dir_root, self._cache_dir.replace(os.sep, "_") + ".lock")
with FileLock(lock_path):
data_exists = os.path.exists(self._cache_dir)
if data_exists and download_mode == REUSE_DATASET_IF_EXISTS:
Expand Down Expand Up @@ -493,13 +493,13 @@ def _download_prepared_from_hf_gcs(self):
downloaded_info = DatasetInfo.from_directory(self._cache_dir)
self.info.update(downloaded_info)
# download post processing resources
remote_cache_dir = os.path.join(HF_GCP_BASE_URL, relative_data_dir)
remote_cache_dir = HF_GCP_BASE_URL + "/" + relative_data_dir.replace(os.sep, "/")
for split in self.info.splits:
for resource_file_name in self._post_processing_resources(split).values():
if "/" in resource_file_name:
if os.sep in resource_file_name:
raise ValueError("Resources shouldn't be in a sub-directory: {}".format(resource_file_name))
try:
resource_path = utils.cached_path(os.path.join(remote_cache_dir, resource_file_name))
resource_path = utils.cached_path(remote_cache_dir + "/" + resource_file_name)
shutil.move(resource_path, os.path.join(self._cache_dir, resource_file_name))
except ConnectionError:
logger.info(
Expand Down Expand Up @@ -559,7 +559,7 @@ def _download_and_prepare(self, dl_manager, verify_infos, **prepare_split_kwargs
def download_post_processing_resources(self, dl_manager):
for split in self.info.splits:
for resource_name, resource_file_name in self._post_processing_resources(split).items():
if "/" in resource_file_name:
if os.sep in resource_file_name:
raise ValueError("Resources shouldn't be in a sub-directory: {}".format(resource_file_name))
resource_path = os.path.join(self._cache_dir, resource_file_name)
if not os.path.exists(resource_path):
Expand All @@ -573,12 +573,12 @@ def download_post_processing_resources(self, dl_manager):
shutil.move(downloaded_resource_path, resource_path)

def _save_info(self):
lock_path = os.path.join(self._cache_dir_root, self._cache_dir.replace("/", "_") + ".lock")
lock_path = os.path.join(self._cache_dir_root, self._cache_dir.replace(os.sep, "_") + ".lock")
with FileLock(lock_path):
self.info.write_to_directory(self._cache_dir)

def _save_infos(self):
lock_path = os.path.join(self._cache_dir_root, self._cache_dir.replace("/", "_") + ".lock")
lock_path = os.path.join(self._cache_dir_root, self._cache_dir.replace(os.sep, "_") + ".lock")
with FileLock(lock_path):
DatasetInfosDict(**{self.config.name: self.info}).write_to_directory(self.get_imported_module_dir())

Expand Down Expand Up @@ -635,7 +635,7 @@ def _build_single_dataset(self, split: Union[str, Split], run_post_process: bool
)
if run_post_process:
for resource_file_name in self._post_processing_resources(split).values():
if "/" in resource_file_name:
if os.sep in resource_file_name:
raise ValueError("Resources shouldn't be in a sub-directory: {}".format(resource_file_name))
resources_paths = {
resource_name: os.path.join(self._cache_dir, resource_file_name)
Expand Down Expand Up @@ -831,12 +831,14 @@ def _prepare_split(self, split_generator):

generator = self._generate_examples(**split_generator.gen_kwargs)
not_verbose = bool(logger.getEffectiveLevel() > WARNING)
for key, record in utils.tqdm(
generator, unit=" examples", total=split_info.num_examples, leave=False, disable=not_verbose
):
example = self.info.features.encode_example(record)
writer.write(example)
num_examples, num_bytes = writer.finalize()
try:
for key, record in utils.tqdm(
generator, unit=" examples", total=split_info.num_examples, leave=False, disable=not_verbose
):
example = self.info.features.encode_example(record)
writer.write(example)
finally:
num_examples, num_bytes = writer.finalize()

assert num_examples == num_examples, f"Expected to write {split_info.num_examples} but wrote {num_examples}"
split_generator.split_info.num_examples = num_examples
Expand Down
4 changes: 2 additions & 2 deletions src/datasets/commands/convert.py
Expand Up @@ -106,7 +106,7 @@ def run(self):
self._logger.info("Skipping file")
continue

with open(input_file, "r") as f:
with open(input_file, "r", encoding="utf-8") as f:
lines = f.readlines()

out_lines = []
Expand Down Expand Up @@ -174,7 +174,7 @@ def run(self):
if needs_manual_update:
with_manual_update.append(output_file)

with open(output_file, "w") as f:
with open(output_file, "w", encoding="utf-8") as f:
f.writelines(out_lines)
self._logger.info("Converted in %s", output_file)

Expand Down
2 changes: 1 addition & 1 deletion src/datasets/commands/dummy_data.py
Expand Up @@ -32,7 +32,7 @@ def __init__(
):
self._path_to_dataset = path_to_dataset
self._requires_manual = requires_manual
self._dataset_name = path_to_dataset.split("/")[-2]
self._dataset_name = path_to_dataset.replace(os.sep, "/").split("/")[-2]

def run(self):
module_path, hash = prepare_module(self._path_to_dataset)
Expand Down
3 changes: 2 additions & 1 deletion src/datasets/commands/run_beam.py
@@ -1,5 +1,6 @@
import os
from argparse import ArgumentParser
from pathlib import Path
from shutil import copyfile
from typing import List

Expand Down Expand Up @@ -126,7 +127,7 @@ def run(self):
if self._save_infos:
dataset_infos_path = os.path.join(builder_cls.get_imported_module_dir(), DATASET_INFOS_DICT_FILE_NAME)

name = list(filter(lambda x: x, path.split("/")))[-1] + ".py"
name = Path(path).name + ".py"

combined_path = os.path.join(path, name)
if os.path.isfile(path):
Expand Down
3 changes: 2 additions & 1 deletion src/datasets/commands/test.py
@@ -1,5 +1,6 @@
import os
from argparse import ArgumentParser
from pathlib import Path
from shutil import copyfile
from typing import List

Expand Down Expand Up @@ -101,7 +102,7 @@ def run(self):
if self._save_infos:
dataset_infos_path = os.path.join(builder_cls.get_imported_module_dir(), DATASET_INFOS_DICT_FILE_NAME)

name = list(filter(lambda x: x, path.split("/")))[-1] + ".py"
name = Path(path).name + ".py"

combined_path = os.path.join(path, name)
if os.path.isfile(path):
Expand Down
8 changes: 6 additions & 2 deletions src/datasets/dataset_dict.py
Expand Up @@ -476,7 +476,9 @@ def save_to_disk(self, dataset_dict_path: str):
dataset_dict_path (``str``): path of the dataset dict directory where the dataset dict will be saved to
"""
os.makedirs(dataset_dict_path, exist_ok=True)
json.dump({"splits": list(self)}, open(os.path.join(dataset_dict_path, "dataset_dict.json"), "w"))
json.dump(
{"splits": list(self)}, open(os.path.join(dataset_dict_path, "dataset_dict.json"), "w", encoding="utf-8")
)
for k, dataset in self.items():
dataset.save_to_disk(os.path.join(dataset_dict_path, k))

Expand All @@ -489,6 +491,8 @@ def load_from_disk(dataset_dict_path: str) -> "DatasetDict":
dataset_dict_path (``str``): path of the dataset dict directory where the dataset dict will be loaded from
"""
dataset_dict = DatasetDict()
for k in json.load(open(os.path.join(dataset_dict_path, "dataset_dict.json"), "r"))["splits"]:
for k in json.load(open(os.path.join(dataset_dict_path, "dataset_dict.json"), "r", encoding="utf-8"))[
"splits"
]:
dataset_dict[k] = Dataset.load_from_disk(os.path.join(dataset_dict_path, k))
return dataset_dict