diff --git a/datasets/guardian_authorship/guardian_authorship.py b/datasets/guardian_authorship/guardian_authorship.py index 3a46b7e64e4..8edf52f5084 100644 --- a/datasets/guardian_authorship/guardian_authorship.py +++ b/datasets/guardian_authorship/guardian_authorship.py @@ -20,6 +20,7 @@ import nlp + _CITATION = """\ @article{article, author = {Stamatatos, Efstathios}, @@ -89,104 +90,151 @@ class GuardianAuthorship(nlp.GeneratorBasedBuilder): BUILDER_CONFIG_CLASS = GuardianAuthorshipConfig BUILDER_CONFIGS = [ # cross-topic - GuardianAuthorshipConfig(name="cross_topic_{}".format(1), - version=nlp.Version("{}.0.0".format(1), - description="The Original DS with the cross-topic scenario no.{}". - format(1)), - train_folder="Politics", valid_folder="Society", test_folder="UK,World"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(2), - version=nlp.Version("{}.0.0".format(2), - description="The Original DS with the cross-topic scenario no.{}". - format(2)), - train_folder="Politics", valid_folder="UK", test_folder="Society,World"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(3), - version=nlp.Version("{}.0.0".format(3), - description="The Original DS with the cross-topic scenario no.{}". - format(3)), - train_folder="Politics", valid_folder="World", test_folder="Society,UK"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(4), - version=nlp.Version("{}.0.0".format(4), - description="The Original DS with the cross-topic scenario no.{}". - format(4)), - train_folder="Society", valid_folder="Politics", test_folder="UK,World"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(5), - version=nlp.Version("{}.0.0".format(5), - description="The Original DS with the cross-topic scenario no.{}". - format(5)), - train_folder="Society", valid_folder="UK", test_folder="Politics,World"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(6), - version=nlp.Version("{}.0.0".format(6), - description="The Original DS with the cross-topic scenario no.{}". - format(6)), - train_folder="Society", valid_folder="World", test_folder="Politics,UK"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(7), - version=nlp.Version("{}.0.0".format(7), - description="The Original DS with the cross-topic scenario no.{}". - format(7)), - train_folder="UK", valid_folder="Politics", test_folder="Society,World"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(8), - version=nlp.Version("{}.0.0".format(8), - description="The Original DS with the cross-topic scenario no.{}". - format(8)), - train_folder="UK", valid_folder="Society", test_folder="Politics,World"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(9), - version=nlp.Version("{}.0.0".format(9), - description="The Original DS with the cross-topic scenario no.{}". - format(9)), - train_folder="UK", valid_folder="World", test_folder="Politics,Society"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(10), - version=nlp.Version("{}.0.0".format(10), - description="The Original DS with the cross-topic scenario no.{}". - format(10)), - train_folder="World", valid_folder="Politics", test_folder="Society,UK"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(11), - version=nlp.Version("{}.0.0".format(11), - description="The Original DS with the cross-topic scenario no.{}". - format(11)), - train_folder="World", valid_folder="Society", test_folder="Politics,UK"), - - GuardianAuthorshipConfig(name="cross_topic_{}".format(12), - version=nlp.Version("{}.0.0".format(12), - description="The Original DS with the cross-topic scenario no.{}". - format( - 12)), - train_folder="World", valid_folder="UK", test_folder="Politics,Society"), - + GuardianAuthorshipConfig( + name="cross_topic_{}".format(1), + version=nlp.Version( + "{}.0.0".format(1), description="The Original DS with the cross-topic scenario no.{}".format(1) + ), + train_folder="Politics", + valid_folder="Society", + test_folder="UK,World", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(2), + version=nlp.Version( + "{}.0.0".format(2), description="The Original DS with the cross-topic scenario no.{}".format(2) + ), + train_folder="Politics", + valid_folder="UK", + test_folder="Society,World", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(3), + version=nlp.Version( + "{}.0.0".format(3), description="The Original DS with the cross-topic scenario no.{}".format(3) + ), + train_folder="Politics", + valid_folder="World", + test_folder="Society,UK", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(4), + version=nlp.Version( + "{}.0.0".format(4), description="The Original DS with the cross-topic scenario no.{}".format(4) + ), + train_folder="Society", + valid_folder="Politics", + test_folder="UK,World", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(5), + version=nlp.Version( + "{}.0.0".format(5), description="The Original DS with the cross-topic scenario no.{}".format(5) + ), + train_folder="Society", + valid_folder="UK", + test_folder="Politics,World", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(6), + version=nlp.Version( + "{}.0.0".format(6), description="The Original DS with the cross-topic scenario no.{}".format(6) + ), + train_folder="Society", + valid_folder="World", + test_folder="Politics,UK", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(7), + version=nlp.Version( + "{}.0.0".format(7), description="The Original DS with the cross-topic scenario no.{}".format(7) + ), + train_folder="UK", + valid_folder="Politics", + test_folder="Society,World", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(8), + version=nlp.Version( + "{}.0.0".format(8), description="The Original DS with the cross-topic scenario no.{}".format(8) + ), + train_folder="UK", + valid_folder="Society", + test_folder="Politics,World", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(9), + version=nlp.Version( + "{}.0.0".format(9), description="The Original DS with the cross-topic scenario no.{}".format(9) + ), + train_folder="UK", + valid_folder="World", + test_folder="Politics,Society", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(10), + version=nlp.Version( + "{}.0.0".format(10), description="The Original DS with the cross-topic scenario no.{}".format(10) + ), + train_folder="World", + valid_folder="Politics", + test_folder="Society,UK", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(11), + version=nlp.Version( + "{}.0.0".format(11), description="The Original DS with the cross-topic scenario no.{}".format(11) + ), + train_folder="World", + valid_folder="Society", + test_folder="Politics,UK", + ), + GuardianAuthorshipConfig( + name="cross_topic_{}".format(12), + version=nlp.Version( + "{}.0.0".format(12), description="The Original DS with the cross-topic scenario no.{}".format(12) + ), + train_folder="World", + valid_folder="UK", + test_folder="Politics,Society", + ), # # cross-genre - GuardianAuthorshipConfig(name="cross_genre_{}".format(1), - version=nlp.Version("{}.0.0".format(13), - description="The Original DS with the cross-genre scenario no.{}". - format(1)), - train_folder="Books", valid_folder="Politics", test_folder="Society,UK,World"), - - GuardianAuthorshipConfig(name="cross_genre_{}".format(2), - version=nlp.Version("{}.0.0".format(14), - description="The Original DS with the cross-genre scenario no.{}". - format(2)), - train_folder="Books", valid_folder="Society", test_folder="Politics,UK,World"), - - GuardianAuthorshipConfig(name="cross_genre_{}".format(3), - version=nlp.Version("{}.0.0".format(15), - description="The Original DS with the cross-genre scenario no.{}". - format(3)), - - train_folder="Books", valid_folder="UK", test_folder="Politics,Society,World"), - - GuardianAuthorshipConfig(name="cross_genre_{}".format(4), - version=nlp.Version("{}.0.0".format(16), - description="The Original DS with the cross-genre scenario no.{}". - format(4)), - train_folder="Books", valid_folder="World", test_folder="Politics,Society,UK"), + GuardianAuthorshipConfig( + name="cross_genre_{}".format(1), + version=nlp.Version( + "{}.0.0".format(13), description="The Original DS with the cross-genre scenario no.{}".format(1) + ), + train_folder="Books", + valid_folder="Politics", + test_folder="Society,UK,World", + ), + GuardianAuthorshipConfig( + name="cross_genre_{}".format(2), + version=nlp.Version( + "{}.0.0".format(14), description="The Original DS with the cross-genre scenario no.{}".format(2) + ), + train_folder="Books", + valid_folder="Society", + test_folder="Politics,UK,World", + ), + GuardianAuthorshipConfig( + name="cross_genre_{}".format(3), + version=nlp.Version( + "{}.0.0".format(15), description="The Original DS with the cross-genre scenario no.{}".format(3) + ), + train_folder="Books", + valid_folder="UK", + test_folder="Politics,Society,World", + ), + GuardianAuthorshipConfig( + name="cross_genre_{}".format(4), + version=nlp.Version( + "{}.0.0".format(16), description="The Original DS with the cross-genre scenario no.{}".format(4) + ), + train_folder="Books", + valid_folder="World", + test_folder="Politics,Society,UK", + ), ] def _info(self): @@ -198,12 +246,23 @@ def _info(self): { # These are the features of your dataset like images, labels ... # There are 13 authors in this dataset - "author": nlp.features.ClassLabel(names=["catherinebennett", "georgemonbiot", "hugoyoung", - "jonathanfreedland", "martinkettle", "maryriddell", - "nickcohen", "peterpreston", "pollytoynbee", - "royhattersley", "simonhoggart", "willhutton", - "zoewilliams"]), - + "author": nlp.features.ClassLabel( + names=[ + "catherinebennett", + "georgemonbiot", + "hugoyoung", + "jonathanfreedland", + "martinkettle", + "maryriddell", + "nickcohen", + "peterpreston", + "pollytoynbee", + "royhattersley", + "simonhoggart", + "willhutton", + "zoewilliams", + ] + ), # There are book reviews, and articles on the following four topics "topic": nlp.features.ClassLabel(names=["Politics", "Society", "UK", "World", "Books"]), "article": nlp.Value("string"), @@ -213,7 +272,6 @@ def _info(self): # specify them here. They'll be used if as_supervised=True in # builder.as_dataset. supervised_keys=[("article", "author")], - # Homepage of the dataset for documentation homepage="http://www.icsd.aegean.gr/lecturers/stamatatos/papers/JLP2013.pdf", citation=_CITATION, @@ -232,29 +290,17 @@ def _split_generators(self, dl_manager): nlp.SplitGenerator( name=nlp.Split.TRAIN, # These kwargs will be passed to _generate_examples - gen_kwargs={ - "data_dir": data_dir, - "samples_folders": self.config.train_folder, - "split": "train" - }, + gen_kwargs={"data_dir": data_dir, "samples_folders": self.config.train_folder, "split": "train"}, ), nlp.SplitGenerator( name=nlp.Split.TEST, # These kwargs will be passed to _generate_examples - gen_kwargs={ - "data_dir": data_dir, - "samples_folders": self.config.test_folder, - "split": "test" - }, + gen_kwargs={"data_dir": data_dir, "samples_folders": self.config.test_folder, "split": "test"}, ), nlp.SplitGenerator( name=nlp.Split.VALIDATION, # These kwargs will be passed to _generate_examples - gen_kwargs={ - "data_dir": data_dir, - "samples_folders": self.config.valid_folder, - "split": "valid" - }, + gen_kwargs={"data_dir": data_dir, "samples_folders": self.config.valid_folder, "split": "valid"}, ), ] @@ -264,10 +310,10 @@ def _generate_examples(self, data_dir, samples_folders, split): # Training and validation are on 1 topic/genre, while testing is on multiple topics # We convert the sample folders into list (from string) - if samples_folders.count(',') == 0: + if samples_folders.count(",") == 0: samples_folders = [samples_folders] else: - samples_folders = samples_folders.split(',') + samples_folders = samples_folders.split(",") # the dataset is structured as: # |-Topic1 @@ -294,7 +340,7 @@ def _generate_examples(self, data_dir, samples_folders, split): path_2_author = os.path.join(full_path, author) path_2_article = os.path.join(path_2_author, article) - with open(path_2_article, 'r', encoding='utf8', errors='ignore') as f: + with open(path_2_article, "r", encoding="utf8", errors="ignore") as f: art = f.readlines() # The whole article is stored as one line. We access the 1st element of the list @@ -304,4 +350,3 @@ def _generate_examples(self, data_dir, samples_folders, split): "author": author, "topic": topic, } - diff --git a/docs/source/features.rst b/docs/source/features.rst index a15b602e7c9..a79a85d136b 100644 --- a/docs/source/features.rst +++ b/docs/source/features.rst @@ -16,6 +16,5 @@ Here is a brief presentation of the various types of features which can be used - a :class:`nlp.ClassLabel` feature specifies a field with a predefined set of classes which can have labels associated to them and will be stored as integers in the dataset. This field will be stored and retrieved as an integer value and two conversion methodes, :func:`nlp.ClassLabel.str2int` and :func:`nlp.ClassLabel.int2str` can be used to convert from the label names to the associate integer value and vice-versa. - a :class:`nlp.Value` feature specifies a single typed value, e.g. ``int64`` or ``string``. The types supported are all the `non-nested types of Apache Arrow `__ among which the most commonly used ones are ``int64``, ``float32`` and ``string``. -- :class:`nlp.Tensor` is mostly supported to have a compatibility layer with the TensorFlow Datasets library and can host a 0D or 1D array. A 0D array is equivalent to a :class:`nlp.Value` of the same dtype while a 1D array is equivalent to a :class:`nlp.Sequence` of the same dtype and fixed length. - eventually, two features are specific to Machine Translation: :class:`nlp.Translation` and :class:`nlp.TranslationVariableLanguages`. We refere to the package reference for more details on these features. diff --git a/docs/source/package_reference/main_classes.rst b/docs/source/package_reference/main_classes.rst index d999365b2da..93efab3d1f6 100644 --- a/docs/source/package_reference/main_classes.rst +++ b/docs/source/package_reference/main_classes.rst @@ -53,15 +53,23 @@ It also has dataset transform methods like map or filter, to process all the spl .. autoclass:: nlp.Value :members: -.. autoclass:: nlp.Tensor - :members: - .. autoclass:: nlp.Translation :members: .. autoclass:: nlp.TranslationVariableLanguages :members: +.. autoclass:: nlp.Array2D + :members: + +.. autoclass:: nlp.Array3D + :members: + +.. autoclass:: nlp.Array4D + :members: + +.. autoclass:: nlp.Array5D + :members: ``MetricInfo`` ~~~~~~~~~~~~~~~~~~~~~ diff --git a/src/nlp/__init__.py b/src/nlp/__init__.py index de89d78fdf5..c0664ab27c0 100644 --- a/src/nlp/__init__.py +++ b/src/nlp/__init__.py @@ -28,7 +28,18 @@ from .arrow_reader import ReadInstruction from .builder import ArrowBasedBuilder, BeamBasedBuilder, BuilderConfig, DatasetBuilder, GeneratorBasedBuilder from .dataset_dict import DatasetDict -from .features import ClassLabel, Features, Sequence, Tensor, Translation, TranslationVariableLanguages, Value +from .features import ( + Array2D, + Array3D, + Array4D, + Array5D, + ClassLabel, + Features, + Sequence, + Translation, + TranslationVariableLanguages, + Value, +) from .info import DatasetInfo, MetricInfo from .inspect import inspect_dataset, inspect_metric, list_datasets, list_metrics from .load import concatenate_datasets, import_main_class, load_dataset, load_metric, prepare_module diff --git a/src/nlp/arrow_dataset.py b/src/nlp/arrow_dataset.py index 2d0d9602626..83c07796201 100644 --- a/src/nlp/arrow_dataset.py +++ b/src/nlp/arrow_dataset.py @@ -36,8 +36,8 @@ from nlp.utils.py_utils import dumps -from .arrow_writer import ArrowWriter -from .features import Features, cast_to_python_objects +from .arrow_writer import ArrowWriter, TypedSequence +from .features import Features, cast_to_python_objects, pandas_types_mapper from .info import DatasetInfo from .search import IndexableMixin from .splits import NamedSplit @@ -258,9 +258,11 @@ def from_dict( mapping = features.encode_batch(mapping) else: mapping = cast_to_python_objects(mapping) - pa_table: pa.Table = pa.Table.from_pydict( - mapping=mapping, schema=pa.schema(features.type) if features is not None else None - ) + mapping = { + col: TypedSequence(data, type=features.type[col].type if features is not None else None) + for col, data in mapping.items() + } + pa_table: pa.Table = pa.Table.from_pydict(mapping=mapping) return cls(pa_table, info=info, split=split) @property @@ -624,12 +626,14 @@ def identity(x): return x command = identity - if isinstance(outputs, (list, tuple, np.ndarray)): + if isinstance(outputs, (list, tuple, np.ndarray, pd.Series)): return command(outputs) elif isinstance(outputs, pd.DataFrame): if format_columns is not None and not output_all_columns: to_remove_columns = [col for col in self.column_names if col not in format_columns] output_dict = outputs.drop(to_remove_columns, axis=1) + else: + output_dict = outputs else: output_dict = {} for k, v in outputs.items(): @@ -661,7 +665,8 @@ def _getitem( """ # In the following, to convert data from the arrow table to dicts or lists, # we use .to_pandas().to_dict() or .to_pandas().to_list() as they are - # significantly faster than .to_pydict() thanks to zero-copy + # significantly faster than .to_pydict() thanks to zero-copy and because it doesn't + # call `list()` on every object in sequences of sequences of objects for example if isinstance(key, int): if key < 0: key = self._data.num_rows + key @@ -669,9 +674,11 @@ def _getitem( raise IndexError(f"Index ({key}) outside of table length ({self._data.num_rows}).") if format_type is not None: if format_type == "pandas": - outputs = self._data.slice(key, 1).to_pandas() + outputs = self._data.slice(key, 1).to_pandas(types_mapper=pandas_types_mapper) else: - outputs = self._unnest(self._data.slice(key, 1).to_pandas().to_dict("list")) + outputs = self._unnest( + self._data.slice(key, 1).to_pandas(types_mapper=pandas_types_mapper).to_dict("list") + ) else: outputs = self._unnest(self._data.slice(key, 1).to_pydict()) elif isinstance(key, slice): @@ -681,12 +688,12 @@ def _getitem( if format_type is not None: if format_type == "pandas": outputs = self._data.slice(key_indices[0], key_indices[1] - key_indices[0]).to_pandas( - split_blocks=True + types_mapper=pandas_types_mapper ) else: outputs = ( self._data.slice(key_indices[0], key_indices[1] - key_indices[0]) - .to_pandas(split_blocks=True) + .to_pandas(types_mapper=pandas_types_mapper) .to_dict("list") ) else: @@ -695,15 +702,21 @@ def _getitem( if key not in self._data.column_names: raise ValueError(f"Column ({key}) not in table columns ({self._data.column_names}).") if format_type is not None: + # We should use + # outputs = self._data[key].to_pandas(types_mapper=pandas_types_mapper) + # but there is a bug in pyarrow that makes ignores the types_mapper in that case + # see https://issues.apache.org/jira/browse/ARROW-9664 + # We build a table with one column and call to_pandas on it instead + one_column_table = pa.Table.from_arrays( + [self._data[key]], schema=pa.schema([self._data.schema.field(key)]) + ) if format_columns is None or key in format_columns: if format_type == "pandas": - outputs = self._data[key].to_pandas(split_blocks=True) - elif format_type in ("numpy", "torch", "tensorflow"): - outputs = self._data.to_pandas(split_blocks=True).to_dict("list")[key] + outputs = one_column_table.to_pandas(types_mapper=pandas_types_mapper)[key] else: - outputs = self._data[key].to_pylist() + outputs = one_column_table.to_pandas(types_mapper=pandas_types_mapper)[key].to_list() else: - outputs = self._data[key].to_pylist() + outputs = one_column_table.to_pandas(types_mapper=pandas_types_mapper)[key].to_list() else: outputs = self._data[key].to_pylist() elif isinstance(key, Iterable): @@ -718,9 +731,9 @@ def _getitem( data_subset = pa.concat_tables(self._data.slice(int(i), 1) for i in indices) if format_type is not None: if format_type == "pandas": - outputs = data_subset.to_pandas(split_blocks=True) + outputs = data_subset.to_pandas(types_mapper=pandas_types_mapper) else: - outputs = data_subset.to_pandas(split_blocks=True).to_dict("list") + outputs = data_subset.to_pandas(types_mapper=pandas_types_mapper).to_dict("list") else: outputs = data_subset.to_pydict() @@ -805,7 +818,7 @@ def map( cache_file_name: Optional[str] = None, writer_batch_size: Optional[int] = 1000, features: Optional[Features] = None, - disable_nullable: bool = True, + disable_nullable: bool = False, verbose: bool = True, fn_kwargs: Optional[dict] = None, ) -> "Dataset": @@ -836,7 +849,7 @@ def map( Higher value gives smaller cache files, lower value consume less temporary memory while running `.map()`. `features` (`Optional[nlp.Features]`, default: `None`): Use a specific Features to store the cache file instead of the automatically generated one. - `disable_nullable` (`bool`, default: `True`): Allow null values in the table. + `disable_nullable` (`bool`, default: `False`): Disallow null values in the table. `verbose` (`bool`, default: `True`): Set to `False` to deactivate the tqdm progress bar and informations. `fn_kwargs` (`Optional[Dict]`, default: `None`): Keyword arguments to be passed to `function` """ diff --git a/src/nlp/arrow_writer.py b/src/nlp/arrow_writer.py index 701c8acbd6a..443c4f3450e 100644 --- a/src/nlp/arrow_writer.py +++ b/src/nlp/arrow_writer.py @@ -5,8 +5,6 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 -# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,7 +24,7 @@ import pyarrow as pa from tqdm.auto import tqdm -from .features import Features +from .features import Features, _ArrayXDExtensionType from .info import DatasetInfo from .utils.file_utils import HF_DATASETS_CACHE, hash_url_to_filename @@ -36,6 +34,90 @@ # Batch size constants. For more info, see: # https://github.com/apache/arrow/blob/master/docs/source/cpp/arrays.rst#size-limitations-and-recommendations) DEFAULT_MAX_BATCH_SIZE = 10_000 # hopefully it doesn't write too much at once (max is 2GB) +type_ = type # keep python's type function + + +class TypedSequence: + """ + This data container generalizes the typing when instantiating pyarrow arrays, tabels or batches. + + More specifically it add several features: + - Support extension types like ``nlp.features.Array2DExtensionType``: + By default pyarrow arrays don't return extension arrays. One has to call + ``pa.ExtensionArray.from_storage(type, pa.array(data, type.storage_type_name))`` + in order to get an extension array. + - Support for ``try_type`` parameter that can be used instead of ``type``: + When an array is transformed, we like to keep the same type as before if possible. + For example when calling :func:`nlp.Dataset.map`, we don't want to change the type + of each column by default. + - Better error message when a pyarrow array overflows. + + Example:: + + from nlp.features import Array2DExtensionType + from nlp.arrow_writer import TypedSequence + import pyarrow as pa + + arr = pa.array(TypedSequence([1, 2, 3], type=pa.int32())) + assert arr.type == pa.int32() + + arr = pa.array(TypedSequence([1, 2, 3], try_type=pa.int32())) + assert arr.type == pa.int32() + + arr = pa.array(TypedSequence(["foo", "bar"], try_type=pa.int32())) + assert arr.type == pa.string() + + arr = pa.array(TypedSequence([[[1, 2, 3]]], type=Array2DExtensionType((1, 3), "int64"))) + assert arr.type == Array2DExtensionType((1, 3), "int64") + + table = pa.Table.from_pydict({ + "image": TypedSequence([[[1, 2, 3]]], type=Array2DExtensionType((1, 3), "int64")) + }) + assert table["image"].type == Array2DExtensionType((1, 3), "int64") + + """ + + def __init__(self, data, type=None, try_type=None): + assert type is None or try_type is None, "You cannot specify both type and try_type" + self.data = data + self.type = type + self.try_type = try_type # is ignored if it doesn't match the data + + def __arrow_array__(self, type=None): + """This function is called when calling pa.array(typed_sequence)""" + assert type is None, "TypedSequence is supposed to be used with pa.array(typed_sequence, type=None)" + trying_type = False + if type is None and self.try_type: + type = self.try_type + trying_type = True + else: + type = self.type + try: + if isinstance(type, _ArrayXDExtensionType): + return pa.ExtensionArray.from_storage(type, pa.array(self.data, type.storage_dtype)) + else: + return pa.array(self.data, type=type) + except (TypeError, pa.lib.ArrowInvalid) as e: # handle type errors and overflows + if trying_type: + try: + return pa.array(self.data, type=None) # second chance + except pa.lib.ArrowInvalid as e: + if "overflow" in str(e): + raise OverflowError( + "There was an overflow with type {}. Try to reduce writer_batch_size to have batches smaller than 2GB.\n({})".format( + type_(self.data), e + ) + ) + else: + raise + elif "overflow" in str(e): + raise OverflowError( + "There was an overflow with type {}. Try to reduce writer_batch_size to have batches smaller than 2GB.\n({})".format( + type_(self.data), e + ) + ) + else: + raise class ArrowWriter(object): @@ -44,13 +126,12 @@ class ArrowWriter(object): def __init__( self, - data_type: Optional[pa.DataType] = None, schema: Optional[pa.Schema] = None, features: Optional[Features] = None, path: Optional[str] = None, stream: Optional[pa.NativeFile] = None, writer_batch_size: Optional[int] = None, - disable_nullable: bool = True, + disable_nullable: bool = False, update_features: bool = False, with_metadata: bool = True, ): @@ -59,23 +140,15 @@ def __init__( if features is not None: self._features = features self._schema = pa.schema(features.type) - self._type: pa.DataType = pa.struct(field for field in self._schema) - elif data_type is not None: - self._type: pa.DataType = data_type - self._schema: pa.Schema = pa.schema(field for field in self._type) - self._features = Features.from_arrow_schema(self._schema) elif schema is not None: self._schema: pa.Schema = schema - self._type: pa.DataType = pa.struct(field for field in self._schema) self._features = Features.from_arrow_schema(self._schema) else: self._features = None self._schema = None - self._type = None if disable_nullable and self._schema is not None: - self._schema = pa.schema(pa.field(field.name, field.type, nullable=False) for field in self._type) - self._type = pa.struct(pa.field(field.name, field.type, nullable=False) for field in self._type) + self._schema = pa.schema(pa.field(field.name, field.type, nullable=False) for field in self._schema) self._path = path if stream is None: @@ -83,6 +156,7 @@ def __init__( else: self.stream = stream + self.disable_nullable = disable_nullable self.writer_batch_size = writer_batch_size or DEFAULT_MAX_BATCH_SIZE self.update_features = update_features self.with_metadata = with_metadata @@ -95,7 +169,7 @@ def __init__( def _build_writer(self, inferred_schema: pa.Schema): inferred_features = Features.from_arrow_schema(inferred_schema) if self._features is not None: - if self.update_features: + if self.update_features: # keep original features it they match, or update them fields = {field.name: field for field in self._features.type} for inferred_field in inferred_features.type: name = inferred_field.name @@ -104,11 +178,11 @@ def _build_writer(self, inferred_schema: pa.Schema): inferred_features[name] = self._features[name] self._features = inferred_features self._schema: pa.Schema = inferred_schema - self._type: pa.DataType = pa.struct(field for field in self._schema) else: self._features = inferred_features self._schema: pa.Schema = inferred_schema - self._type: pa.DataType = pa.struct(field for field in self._schema) + if self.disable_nullable: + self._schema = pa.schema(pa.field(field.name, field.type, nullable=False) for field in self._schema) if self.with_metadata: self._schema = self._schema.with_metadata(self._build_metadata(DatasetInfo(features=self._features))) self.pa_writer = pa.RecordBatchStreamWriter(self.stream, self._schema) @@ -122,46 +196,37 @@ def _build_metadata(self, info) -> Dict[str, str]: info_as_dict = asdict(info) return {"huggingface": json.dumps({key: info_as_dict[key] for key in keys})} - def _write_array_on_file(self, pa_array): - """Write a PyArrow Array""" - pa_batch = pa.RecordBatch.from_struct_array(pa_array) - self._num_bytes += pa_array.nbytes - if self.pa_writer is None: - pa_table = pa.Table.from_batches([pa_batch]) - self._build_writer(inferred_schema=pa_table.schema) - self.pa_writer.write_batch(pa_batch) - def write_on_file(self): """ Write stored examples """ - # infer type on first write - type = None if self.update_features and self.pa_writer is None else self._type - if self.current_rows: - pa_array = pa.array(self.current_rows, type=type) + if not self.current_rows: + return + cols = sorted(self.current_rows[0].keys()) + schema = None if self.pa_writer is None and self.update_features else self._schema + try_schema = self._schema if self.pa_writer is None and self.update_features else None + arrays = [] + inferred_types = [] + for col in cols: + col_type = schema.field(col).type if schema is not None else None + col_try_type = try_schema.field(col).type if try_schema is not None and col in try_schema.names else None + typed_sequence = TypedSequence( + [row[col] for row in self.current_rows], type=col_type, try_type=col_try_type + ) + pa_array = pa.array(typed_sequence) inferred_type = pa_array.type - first_example = pa.array(self.current_rows[0:1], type=inferred_type)[0] - # Sanity check - if pa_array[0] != first_example: - # There was an Overflow in StructArray. Let's reduce the batch_size - new_batch_size = self.writer_batch_size - while pa_array[0] != first_example: - if new_batch_size < 2: - raise RuntimeError("The given example is too big (>2GB) to fit in an array.") - new_batch_size = self.writer_batch_size // 2 - pa_array = pa.array(self.current_rows[:new_batch_size], type=inferred_type) - logger.warning( - "Batch size is too big (>2GB). Reducing it from {} to {}".format( - self.writer_batch_size, new_batch_size + first_example = pa.array(TypedSequence(typed_sequence.data[:1], type=inferred_type))[0] + if pa_array[0] != first_example: # Sanity check (check for overflow in StructArray or ListArray) + raise OverflowError( + "There was an overflow in the {}. Try to reduce writer_batch_size to have batches smaller than 2GB".format( + type(pa_array) ) ) - self.writer_batch_size = new_batch_size - for i in range(0, len(self.current_rows), new_batch_size): - rows_batch = self.current_rows[i, i + new_batch_size] - self._write_array_on_file(pa.array(rows_batch, type=inferred_type)) - else: - # All good - self._write_array_on_file(pa_array) - self.current_rows = [] + arrays.append(pa_array) + inferred_types.append(inferred_type) + schema = pa.schema(zip(cols, inferred_types)) if self.pa_writer is None else self._schema + table = pa.Table.from_arrays(arrays, schema=schema) + self.write_table(table) + self.current_rows = [] def write(self, example: Dict[str, Any], writer_batch_size: Optional[int] = None): """ Add a given Example to the write-pool which is written to file. @@ -170,7 +235,6 @@ def write(self, example: Dict[str, Any], writer_batch_size: Optional[int] = None example: the Example to add. """ self.current_rows.append(example) - self._num_examples += 1 if writer_batch_size is None: writer_batch_size = self.writer_batch_size if writer_batch_size is not None and len(self.current_rows) >= writer_batch_size: @@ -184,16 +248,16 @@ def write_batch( Args: example: the Example to add. """ - if self.pa_writer is None: - self._build_writer(inferred_schema=pa.Table.from_pydict(batch_examples).schema) - pa_table: pa.Table = pa.Table.from_pydict(batch_examples, schema=self._schema) - if writer_batch_size is None: - writer_batch_size = self.writer_batch_size - batches: List[pa.RecordBatch] = pa_table.to_batches(max_chunksize=writer_batch_size) - self._num_bytes += sum(batch.nbytes for batch in batches) - self._num_examples += pa_table.num_rows - for batch in batches: - self.pa_writer.write_batch(batch) + schema = None if self.pa_writer is None and self.update_features else self._schema + try_schema = self._schema if self.pa_writer is None and self.update_features else None + typed_sequence_examples = {} + for col in sorted(batch_examples.keys()): + col_type = schema.field(col).type if schema is not None else None + col_try_type = try_schema.field(col).type if try_schema is not None and col in try_schema.names else None + typed_sequence = TypedSequence(batch_examples[col], type=col_type, try_type=col_try_type) + typed_sequence_examples[col] = typed_sequence + pa_table = pa.Table.from_pydict(typed_sequence_examples) + self.write_table(pa_table) def write_table(self, pa_table: pa.Table, writer_batch_size: Optional[int] = None): """ Write a batch of Example to file. @@ -205,6 +269,7 @@ def write_table(self, pa_table: pa.Table, writer_batch_size: Optional[int] = Non writer_batch_size = self.writer_batch_size if self.pa_writer is None: self._build_writer(inferred_schema=pa_table.schema) + pa_table = pa_table.cast(self._schema) batches: List[pa.RecordBatch] = pa_table.to_batches(max_chunksize=writer_batch_size) self._num_bytes += sum(batch.nbytes for batch in batches) self._num_examples += pa_table.num_rows @@ -233,23 +298,23 @@ class BeamWriter(object): def __init__( self, - data_type: Optional[pa.DataType] = None, + features: Optional[Features] = None, schema: Optional[pa.Schema] = None, path: Optional[str] = None, namespace: Optional[str] = None, cache_dir: Optional[str] = None, ): - if data_type is None and schema is None: - raise ValueError("At least one of data_type and schema must be provided.") + if features is None and schema is None: + raise ValueError("At least one of features and schema must be provided.") if path is None: raise ValueError("Path must be provided.") - if data_type is not None: - self._type: pa.DataType = data_type - self._schema: pa.Schema = pa.schema(field for field in self._type) + if features is not None: + self._features: Features = features + self._schema: pa.Schema = pa.schema(features.type) else: self._schema: pa.Schema = schema - self._type: pa.DataType = pa.struct(field for field in self._schema) + self._features: Features = Features.from_arrow_schema(schema) self._path = path self._parquet_path = os.path.splitext(path)[0] # remove extension diff --git a/src/nlp/builder.py b/src/nlp/builder.py index f55de66373a..4e99747122e 100644 --- a/src/nlp/builder.py +++ b/src/nlp/builder.py @@ -804,8 +804,7 @@ def _prepare_split(self, split_generator): fname = "{}-{}.arrow".format(self.name, split_generator.name) fpath = os.path.join(self._cache_dir, fname) - examples_type = self.info.features.type - writer = ArrowWriter(data_type=examples_type, path=fpath, writer_batch_size=self._writer_batch_size) + writer = ArrowWriter(features=self.info.features, path=fpath, writer_batch_size=self._writer_batch_size) generator = self._generate_examples(**split_generator.gen_kwargs) for key, record in utils.tqdm(generator, unit=" examples", total=split_info.num_examples, leave=False): @@ -1001,8 +1000,9 @@ def _prepare_split(self, split_generator, pipeline): # To write examples to disk: fname = "{}-{}.arrow".format(self.name, split_name) fpath = os.path.join(self._cache_dir, fname) - examples_type = self.info.features.type - beam_writer = BeamWriter(examples_type, path=fpath, namespace=split_name, cache_dir=self._cache_dir) + beam_writer = BeamWriter( + features=self.info.features, path=fpath, namespace=split_name, cache_dir=self._cache_dir + ) self._beam_writers[split_name] = beam_writer encode_example = self.info.features.encode_example diff --git a/src/nlp/dataset_dict.py b/src/nlp/dataset_dict.py index 459a4454d4e..0b8a2aaa3e1 100644 --- a/src/nlp/dataset_dict.py +++ b/src/nlp/dataset_dict.py @@ -145,7 +145,7 @@ def map( cache_file_names: Optional[Dict[str, str]] = None, writer_batch_size: Optional[int] = 1000, features: Optional[Features] = None, - disable_nullable: bool = True, + disable_nullable: bool = False, verbose: bool = True, fn_kwargs: Optional[dict] = None, ) -> "DatasetDict": @@ -178,7 +178,7 @@ def map( Higher value gives smaller cache files, lower value consume less temporary memory while running `.map()`. `features` (`Optional[nlp.Features]`, default: `None`): Use a specific Features to store the cache file instead of the automatically generated one. - `disable_nullable` (`bool`, default: `True`): Allow null values in the table. + `disable_nullable` (`bool`, default: `False`): Disallow null values in the table. `verbose` (`bool`, default: `True`): Set to `False` to deactivate the tqdm progress bar and informations. `fn_kwargs` (`Optional[Dict]`, default: `None`): Keyword arguments to be passed to `function` """ diff --git a/src/nlp/features.py b/src/nlp/features.py index 497a740e73c..47b2f14732f 100644 --- a/src/nlp/features.py +++ b/src/nlp/features.py @@ -17,12 +17,14 @@ """ This class handle features definition in datasets and some utilities to display table type.""" import logging from collections.abc import Iterable -from dataclasses import dataclass, field -from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union +from dataclasses import dataclass, field, fields +from typing import Any, ClassVar, Dict, List, Optional, Sequence, Tuple, Union import numpy as np import pandas as pd import pyarrow as pa +from pandas.api.extensions import ExtensionArray as PandasExtensionArray +from pandas.api.extensions import ExtensionDtype as PandasExtensionDtype from . import utils from .utils.file_utils import _tf_available, _torch_available @@ -156,29 +158,236 @@ def encode_example(self, value): return value +class _ArrayXD: + def __post_init__(self): + self.shape = tuple(self.shape) + + def __call__(self): + pa_type = globals()[self.__class__.__name__ + "ExtensionType"](self.shape, self.dtype) + return pa_type + + def encode_example(self, value): + if isinstance(value, np.ndarray): + value = value.tolist() + return value + + @dataclass -class Tensor: - """ Construct a 0D or 1D Tensor feature. - If 0D, the Tensor is an dtype element, if 1D it will be a fixed length list or dtype elements. - Mostly here for compatiblity with tfds. - """ +class Array2D(_ArrayXD): + shape: tuple + dtype: str + id: Optional[str] = None + # Automatically constructed + _type: str = field(default="Array2D", init=False, repr=False) + - shape: Union[Tuple[int], List[int]] +@dataclass +class Array3D(_ArrayXD): + shape: tuple dtype: str id: Optional[str] = None # Automatically constructed - pa_type: ClassVar[Any] = None - _type: str = field(default="Tensor", init=False, repr=False) + _type: str = field(default="Array3D", init=False, repr=False) - def __post_init__(self): - assert len(self.shape) < 2, "Tensor can only take 0 or 1 dimensional shapes ." - if len(self.shape) == 1: - self.pa_type = pa.list_(string_to_arrow(self.dtype), self.shape[0]) + +@dataclass +class Array4D(_ArrayXD): + shape: tuple + dtype: str + id: Optional[str] = None + # Automatically constructed + _type: str = field(default="Array4D", init=False, repr=False) + + +@dataclass +class Array5D(_ArrayXD): + shape: tuple + dtype: str + id: Optional[str] = None + # Automatically constructed + _type: str = field(default="Array5D", init=False, repr=False) + + +class _ArrayXDExtensionType(pa.PyExtensionType): + + ndims: int = None + + def __init__(self, shape: tuple, dtype: str): + assert ( + self.ndims is not None and self.ndims > 1 + ), "You must instantiate an array type with a value for dim that is > 1" + assert len(shape) == self.ndims, "shape={} and ndims={} dom't match".format(shape, self.ndims) + self.shape = tuple(shape) + self.value_type = dtype + self.storage_dtype = self._generate_dtype(self.value_type) + pa.PyExtensionType.__init__(self, self.storage_dtype) + + def __reduce__(self): + return self.__class__, (self.shape, self.value_type,) + + def __arrow_ext_class__(self): + return ArrayExtensionArray + + def _generate_dtype(self, dtype): + dtype = string_to_arrow(dtype) + for d in reversed(self.shape): + dtype = pa.list_(dtype, d) + return dtype + + def to_pandas_dtype(self): + return PandasArrayExtensionDtype(self.value_type) + + +class Array2DExtensionType(_ArrayXDExtensionType): + ndims = 2 + + +class Array3DExtensionType(_ArrayXDExtensionType): + ndims = 3 + + +class Array4DExtensionType(_ArrayXDExtensionType): + ndims = 4 + + +class Array5DExtensionType(_ArrayXDExtensionType): + ndims = 5 + + +class ArrayExtensionArray(pa.ExtensionArray): + def __array__(self): + return self.to_numpy() + + def __getitem__(self, i): + # ExtensionScalar is the python list object + if isinstance(i, int): + return super().__getitem__(slice(i, i + 1)).to_pylist()[0] + return super().__getitem__(i) + + def to_numpy(self): + storage: pa.FixedSizeListArray = self.storage + size = 1 + for i in range(self.type.ndims): + size *= self.type.shape[i] + storage = storage.flatten() + numpy_arr = storage[self.offset * size : (self.offset + len(self)) * size].to_numpy() + numpy_arr = numpy_arr.reshape(len(self), *self.type.shape) + return numpy_arr + + def to_pylist(self): + return self.to_numpy().tolist() + + +class PandasArrayExtensionDtype(PandasExtensionDtype): + _metadata = "value_type" + + def __init__(self, value_type: Union["PandasArrayExtensionDtype", np.dtype]): + self._value_type = value_type + + def __from_arrow__(self, array): + if isinstance(array, pa.ChunkedArray): + numpy_arr = np.vstack([chunk.to_numpy() for chunk in array.chunks]) else: - self.pa_type = string_to_arrow(self.dtype) + numpy_arr = array.to_numpy() + return PandasArrayExtensionArray(numpy_arr) - def __call__(self): - return self.pa_type + @classmethod + def construct_array_type(cls): + return PandasArrayExtensionArray + + @property + def type(self) -> type: + return np.ndarray + + @property + def kind(self) -> str: + return "O" + + @property + def name(self) -> str: + return f"array[{self.value_type}]" + + @property + def value_type(self) -> np.dtype: + return self._value_type + + +class PandasArrayExtensionArray(PandasExtensionArray): + def __init__(self, data: np.ndarray, copy: bool = False): + self._data = data if not copy else np.array(data) + self._dtype = PandasArrayExtensionDtype(data.dtype) + + def copy(self, deep: bool = False) -> "PandasArrayExtensionArray": + return PandasArrayExtensionArray(self._data, copy=True) + + @classmethod + def _from_sequence( + cls, scalars, dtype: Optional[PandasArrayExtensionDtype] = None, copy: bool = False + ) -> "PandasArrayExtensionArray": + data = np.array(scalars, dtype=dtype if dtype is None else dtype.value_type, copy=copy) + return PandasArrayExtensionArray(data, dtype=dtype, copy=copy) + + @classmethod + def _concat_same_type(cls, to_concat: Sequence["PandasArrayExtensionArray"]) -> "PandasArrayExtensionArray": + data = np.vstack([va._data for va in to_concat]) + return cls(data, copy=False) + + @property + def dtype(self) -> PandasArrayExtensionDtype: + return self._dtype + + @property + def nbytes(self) -> int: + return self._data.nbytes + + def isna(self) -> np.ndarray: + if np.issubdtype(self.dtype.value_type, np.floating): + return np.array(np.isnan(arr).any() for arr in self._data) + return np.array((arr < 0).any() for arr in self._data) + + def __setitem__(self, key: Union[int, slice, np.ndarray], value: Any) -> None: + raise NotImplementedError() + + def __getitem__(self, item: Union[int, slice, np.ndarray]) -> Union[np.ndarray, "PandasArrayExtensionArray"]: + if isinstance(item, int): + return self._data[item] + return PandasArrayExtensionArray(self._data[item], copy=False) + + def take( + self, indices: Sequence[int], allow_fill: bool = False, fill_value: bool = None + ) -> "PandasArrayExtensionArray": + indices = np.asarray(indices, dtype="int") + if allow_fill: + fill_value = ( + self.dtype.na_value if fill_value is None else np.asarray(fill_value, dtype=self.dtype.value_type) + ) + mask = indices == -1 + if (indices < -1).any(): + raise ValueError("Invalid value in `indices`, must be all >= -1 for `allow_fill` is True") + elif len(self) > 0: + pass + elif not np.all(mask): + raise IndexError("Invalid take for empty PandasArrayExtensionArray, must be all -1.") + else: + data = np.array([fill_value] * len(indices), dtype=self.dtype.value_type) + return PandasArrayExtensionArray(data, copy=False) + took = self._data.take(indices, axis=0) + if allow_fill and mask.any(): + took[mask] = [fill_value] * np.sum(mask) + return PandasArrayExtensionArray(took, copy=False) + + def __len__(self) -> int: + return len(self._data) + + def __eq__(self, other) -> np.ndarray: + if not isinstance(other, PandasArrayExtensionArray): + raise NotImplementedError("Invalid type to compare to: {}".format(type(other))) + return (self._data == other._data).all() + + +def pandas_types_mapper(dtype): + if isinstance(dtype, _ArrayXDExtensionType): + return PandasArrayExtensionDtype(dtype.value_type) @dataclass @@ -452,7 +661,20 @@ class Sequence: _type: str = field(default="Sequence", init=False, repr=False) -FeatureType = Union[dict, list, tuple, Value, Tensor, ClassLabel, Translation, TranslationVariableLanguages, Sequence] +FeatureType = Union[ + dict, + list, + tuple, + Value, + ClassLabel, + Translation, + TranslationVariableLanguages, + Sequence, + Array2D, + Array3D, + Array4D, + Array5D, +] def get_nested_type(schema: FeatureType) -> pa.DataType: @@ -464,16 +686,16 @@ def get_nested_type(schema: FeatureType) -> pa.DataType: ) # sort to make the type deterministic elif isinstance(schema, (list, tuple)): assert len(schema) == 1, "We defining list feature, you should just provide one example of the inner type" - inner_type = get_nested_type(schema[0]) - return pa.list_(inner_type) + value_type = get_nested_type(schema[0]) + return pa.list_(value_type) elif isinstance(schema, Sequence): - inner_type = get_nested_type(schema.feature) + value_type = get_nested_type(schema.feature) # We allow to reverse list of dict => dict of list for compatiblity with tfds - if isinstance(inner_type, pa.StructType): - return pa.struct(dict(sorted((f.name, pa.list_(f.type, schema.length)) for f in inner_type))) - return pa.list_(inner_type, schema.length) + if isinstance(value_type, pa.StructType): + return pa.struct(dict(sorted((f.name, pa.list_(f.type, schema.length)) for f in value_type))) + return pa.list_(value_type, schema.length) - # Other objects are callable which returns their data type (ClassLabel, Tensor, Translation, Arrow datatype creation methods) + # Other objects are callable which returns their data type (ClassLabel, Array2D, Translation, Arrow datatype creation methods) return schema() @@ -508,12 +730,10 @@ def encode_nested_example(schema, obj): if isinstance(obj, str): # don't interpret a string as a list raise ValueError("Got a string but expected a list instead: '{}'".format(obj)) return [encode_nested_example(schema.feature, o) for o in obj] - # Object with special encoding: # ClassLabel will convert from string to int, TranslationVariableLanguages does some checks - elif isinstance(schema, (ClassLabel, TranslationVariableLanguages, Value)): + elif isinstance(schema, (ClassLabel, TranslationVariableLanguages, Value, _ArrayXD)): return schema.encode_example(obj) - # Other object should be directly convertible to a native Arrow type (like Translation and Translation) return obj @@ -532,7 +752,9 @@ def generate_from_dict(obj: Any): if class_type == Sequence: return Sequence(feature=generate_from_dict(obj["feature"]), length=obj["length"]) - return class_type(**obj) + + field_names = set(f.name for f in fields(class_type)) + return class_type(**{k: v for k, v in obj.items() if k in field_names}) def generate_from_arrow_type(pa_type: pa.DataType): @@ -545,6 +767,9 @@ def generate_from_arrow_type(pa_type: pa.DataType): if isinstance(feature, (dict, tuple, list)): return [feature] return Sequence(feature=feature) + elif isinstance(pa_type, _ArrayXDExtensionType): + array_feature = [None, None, Array2D, Array3D, Array4D, Array5D][pa_type.ndims] + return array_feature(shape=pa_type.shape, dtype=pa_type.value_type) elif isinstance(pa_type, pa.DictionaryType): raise NotImplementedError # TODO(thom) this will need access to the dictionary as well (for labels). I.e. to the py_table elif isinstance(pa_type, pa.DataType): @@ -577,5 +802,9 @@ def encode_batch(self, batch): if set(batch) != set(self): raise ValueError("Column mismatch between batch {} and features {}".format(set(batch), set(self))) for key, column in batch.items(): - encoded_batch[key] = [encode_nested_example(self[key], cast_to_python_objects(obj)) for obj in column] + column = cast_to_python_objects(column) + encoded_batch[key] = [encode_nested_example(self[key], obj) for obj in column] return encoded_batch + + def copy(self): + return Features(super().copy()) diff --git a/tests/test_array_xd.py b/tests/test_array_xd.py new file mode 100644 index 00000000000..7a9c7b67101 --- /dev/null +++ b/tests/test_array_xd.py @@ -0,0 +1,367 @@ +import os +import tempfile +import timeit +import unittest +from warnings import warn + +import numpy as np +import pandas as pd +from absl.testing import parameterized + +import nlp +from nlp.arrow_writer import ArrowWriter +from nlp.features import Array2D, Array3D, Array4D, Array5D, Value, _ArrayXD + + +SHAPE_TEST_1 = (30, 487) +SHAPE_TEST_2 = (36, 1024) +SPEED_TEST_SHAPE = (100, 100) +SPEED_TEST_N_EXAMPLES = 100 + +DEFAULT_FEATURES = nlp.Features( + {"text": Array2D(SHAPE_TEST_1, dtype="float32"), "image": Array2D(SHAPE_TEST_2, dtype="float32")} +) + + +def get_duration(func): + def wrapper(*args, **kwargs): + starttime = timeit.default_timer() + _ = func(*args, **kwargs) + delta = timeit.default_timer() - starttime + return delta + + wrapper.__name__ = func.__name__ + + return wrapper + + +def generate_examples(features: dict, num_examples=100): + dummy_data = [] + for i in range(num_examples): + example = {} + for col_id, (k, v) in enumerate(features.items()): + if isinstance(v, _ArrayXD): + data = np.random.rand(*v.shape).astype(v.dtype) + elif isinstance(v, nlp.Value): + data = "foo" + elif isinstance(v, nlp.Sequence): + shape = [] + while isinstance(v, nlp.Sequence): + shape.append(v.length) + v = v.feature + data = np.random.rand(*shape).astype(v.dtype) + example[k] = data + dummy_data.append((i, example)) + + return dummy_data + + +@get_duration +def write_array2d(feats, dummy_data, tmp_dir): + my_features = nlp.Features(feats) + writer = ArrowWriter(features=my_features, path=os.path.join(tmp_dir, "beta.arrow")) + for key, record in dummy_data: + example = my_features.encode_example(record) + writer.write(example) + num_examples, num_bytes = writer.finalize() + + +@get_duration +def write_nested_sequence(feats, dummy_data, tmp_dir): + my_features = nlp.Features(feats) + writer = ArrowWriter(features=my_features, path=os.path.join(tmp_dir, "beta.arrow")) + for key, record in dummy_data: + example = my_features.encode_example(record) + writer.write(example) + num_examples, num_bytes = writer.finalize() + + +@get_duration +def write_flattened_sequence(feats, dummy_data, tmp_dir): + my_features = nlp.Features(feats) + writer = ArrowWriter(features=my_features, path=os.path.join(tmp_dir, "beta.arrow")) + for key, record in dummy_data: + example = my_features.encode_example(record) + writer.write(example) + num_examples, num_bytes = writer.finalize() + + +@get_duration +def read_unformated(feats, tmp_dir): + dataset = nlp.Dataset.from_file(filename=os.path.join(tmp_dir, "beta.arrow"), info=nlp.DatasetInfo(features=feats)) + for _ in dataset: + pass + + +@get_duration +def read_formatted_as_numpy(feats, tmp_dir): + dataset = nlp.Dataset.from_file(filename=os.path.join(tmp_dir, "beta.arrow"), info=nlp.DatasetInfo(features=feats)) + dataset.set_format("numpy") + for _ in dataset: + pass + + +@get_duration +def read_batch_unformated(feats, tmp_dir): + batch_size = 10 + dataset = nlp.Dataset.from_file(filename=os.path.join(tmp_dir, "beta.arrow"), info=nlp.DatasetInfo(features=feats)) + for i in range(0, len(dataset), batch_size): + _ = dataset[i : i + batch_size] + + +@get_duration +def read_batch_formatted_as_numpy(feats, tmp_dir): + batch_size = 10 + dataset = nlp.Dataset.from_file(filename=os.path.join(tmp_dir, "beta.arrow"), info=nlp.DatasetInfo(features=feats)) + dataset.set_format("numpy") + for i in range(0, len(dataset), batch_size): + _ = dataset[i : i + batch_size] + + +@get_duration +def read_col_unformated(feats, tmp_dir): + dataset = nlp.Dataset.from_file(filename=os.path.join(tmp_dir, "beta.arrow"), info=nlp.DatasetInfo(features=feats)) + for col in feats: + _ = dataset[col] + + +@get_duration +def read_col_formatted_as_numpy(feats, tmp_dir): + dataset = nlp.Dataset.from_file(filename=os.path.join(tmp_dir, "beta.arrow"), info=nlp.DatasetInfo(features=feats)) + dataset.set_format("numpy") + for col in feats: + _ = dataset[col] + + +class ExtensionTypeCompatibilityTest(unittest.TestCase): + def test_array2d_nonspecific_shape(self): + with tempfile.TemporaryDirectory() as tmp_dir: + my_features = DEFAULT_FEATURES.copy() + writer = ArrowWriter(features=my_features, path=os.path.join(tmp_dir, "beta.arrow")) + for key, record in generate_examples(features=my_features, num_examples=1,): + example = my_features.encode_example(record) + writer.write(example) + num_examples, num_bytes = writer.finalize() + dataset = nlp.Dataset.from_file(os.path.join(tmp_dir, "beta.arrow")) + dataset.set_format("numpy") + row = dataset[0] + first_shape = row["image"].shape + second_shape = row["text"].shape + self.assertTrue(first_shape is not None and second_shape is not None, "need atleast 2 different shapes") + self.assertEqual(len(first_shape), len(second_shape), "both shapes are supposed to be equal length") + self.assertNotEqual(first_shape, second_shape, "shapes must not be the same") + + def test_multiple_extensions_same_row(self): + with tempfile.TemporaryDirectory() as tmp_dir: + my_features = DEFAULT_FEATURES.copy() + writer = ArrowWriter(features=my_features, path=os.path.join(tmp_dir, "beta.arrow")) + for key, record in generate_examples(features=my_features, num_examples=1): + example = my_features.encode_example(record) + writer.write(example) + num_examples, num_bytes = writer.finalize() + dataset = nlp.Dataset.from_file(os.path.join(tmp_dir, "beta.arrow")) + dataset.set_format("numpy") + row = dataset[0] + first_len = len(row["image"].shape) + second_len = len(row["text"].shape) + self.assertEqual(first_len, 2, "use a sequence type if dim is < 2") + self.assertEqual(second_len, 2, "use a sequence type if dim is < 2") + + def test_compatability_with_string_values(self): + with tempfile.TemporaryDirectory() as tmp_dir: + my_features = DEFAULT_FEATURES.copy() + my_features["image_id"] = nlp.Value("string") + writer = ArrowWriter(features=my_features, path=os.path.join(tmp_dir, "beta.arrow")) + for key, record in generate_examples(features=my_features, num_examples=1): + example = my_features.encode_example(record) + writer.write(example) + num_examples, num_bytes = writer.finalize() + dataset = nlp.Dataset.from_file(os.path.join(tmp_dir, "beta.arrow")) + self.assertTrue(isinstance(dataset[0]["image_id"], str), "image id must be of type string") + + def test_extension_indexing(self): + with tempfile.TemporaryDirectory() as tmp_dir: + my_features = DEFAULT_FEATURES.copy() + my_features["explicit_ext"] = Array2D((3, 3), dtype="float32") + writer = ArrowWriter(features=my_features, path=os.path.join(tmp_dir, "beta.arrow")) + for key, record in generate_examples(features=my_features, num_examples=1): + example = my_features.encode_example(record) + writer.write(example) + num_examples, num_bytes = writer.finalize() + dataset = nlp.Dataset.from_file(os.path.join(tmp_dir, "beta.arrow")) + dataset.set_format("numpy") + data = dataset[0]["explicit_ext"] + self.assertIsInstance(data, np.ndarray, "indexed extension must return numpy.ndarray") + + +class SpeedBenchmarkTest(unittest.TestCase): + def test_benchmark_speed(self): + times = {} + read_functions = ( + read_unformated, + read_formatted_as_numpy, + read_batch_unformated, + read_batch_formatted_as_numpy, + read_col_unformated, + read_col_formatted_as_numpy, + ) + with tempfile.TemporaryDirectory() as tmp_dir: + feats = nlp.Features({"image": Array2D(SPEED_TEST_SHAPE, dtype="float32")}) + data = generate_examples(features=feats, num_examples=SPEED_TEST_N_EXAMPLES) + write_func = write_array2d + times[write_func.__name__] = write_func(feats, data, tmp_dir) + for read_func in read_functions: + times[read_func.__name__ + " after " + write_func.__name__] = read_func(feats, tmp_dir) + + with tempfile.TemporaryDirectory() as tmp_dir: + feats = nlp.Features( + {"image": nlp.Sequence(nlp.Sequence(nlp.Value("float32"), SPEED_TEST_SHAPE[1]), SPEED_TEST_SHAPE[0])} + ) + data = generate_examples(features=feats, num_examples=SPEED_TEST_N_EXAMPLES) + write_func = write_nested_sequence + times[write_func.__name__] = write_func(feats, data, tmp_dir) + for read_func in read_functions: + times[read_func.__name__ + " after " + write_func.__name__] = read_func(feats, tmp_dir) + + with tempfile.TemporaryDirectory() as tmp_dir: + feats = nlp.Features( + {"image": nlp.Sequence(nlp.Value("float32"), SPEED_TEST_SHAPE[0] * SPEED_TEST_SHAPE[1])} + ) + data = generate_examples(features=feats, num_examples=SPEED_TEST_N_EXAMPLES) + write_func = write_flattened_sequence + times[write_func.__name__] = write_func(feats, data, tmp_dir) + for read_func in read_functions: + times[read_func.__name__ + " after " + write_func.__name__] = read_func(feats, tmp_dir) + + benchmark_df = pd.DataFrame.from_dict(times, orient="index", columns=["time"]).sort_index() + warn("Speed benchmark:\n" + str(benchmark_df)) + self.assertGreater( + times["write_nested_sequence"], times["write_array2d"] * 10 + ) # At leasr 10 times faster (it is supposed to be ~25 times faster) + self.assertGreater( + times["read_batch_formatted_as_numpy after write_nested_sequence"], + times["read_batch_formatted_as_numpy after write_array2d"], + ) # At least faster (it is supposed to be ~2 times faster) + self.assertGreater( + times["read_batch_unformated after write_nested_sequence"], + times["read_batch_formatted_as_numpy after write_array2d"] * 5, + ) # At least 5 times faster (it is supposed to be ~10 times faster) + + +def get_array_feature_types(): + shape_1 = [3] * 5 + shape_2 = [3, 4, 5, 6, 7] + return [ + { + "testcase_name": "{}d".format(d), + "array_feature": array_feature, + "shape_1": tuple(shape_1[:d]), + "shape_2": tuple(shape_2[:d]), + } + for d, array_feature in zip(range(2, 6), [Array2D, Array3D, Array4D, Array5D]) + ] + + +@parameterized.named_parameters(get_array_feature_types()) +class ArrayXDTest(unittest.TestCase): + def get_features(self, array_feature, shape_1, shape_2): + return nlp.Features( + { + "image": array_feature(shape_1, dtype="float32"), + "source": Value("string"), + "matrix": array_feature(shape_2, dtype="float32"), + } + ) + + def get_dict_example_0(self, shape_1, shape_2): + return { + "image": np.random.rand(*shape_1).astype("float32"), + "source": "foo", + "matrix": np.random.rand(*shape_2).astype("float32"), + } + + def get_dict_example_1(self, shape_1, shape_2): + return { + "image": np.random.rand(*shape_1).astype("float32"), + "matrix": np.random.rand(*shape_2).astype("float32"), + "source": "bar", + } + + def get_dict_examples(self, shape_1, shape_2): + return { + "image": np.random.rand(2, *shape_1).astype("float32").tolist(), + "source": ["foo", "bar"], + "matrix": np.random.rand(2, *shape_2).astype("float32").tolist(), + } + + def _check_getitem_output_type(self, dataset, shape_1, shape_2, first_matrix): + matrix_column = dataset["matrix"] + self.assertIsInstance(matrix_column, list) + self.assertIsInstance(matrix_column[0], list) + self.assertIsInstance(matrix_column[0][0], list) + self.assertEqual(np.array(matrix_column).shape, (2, *shape_2)) + + matrix_field_of_first_example = dataset[0]["matrix"] + self.assertIsInstance(matrix_field_of_first_example, list) + self.assertIsInstance(matrix_field_of_first_example, list) + self.assertEqual(np.array(matrix_field_of_first_example).shape, shape_2) + np.testing.assert_array_equal(np.array(matrix_field_of_first_example), np.array(first_matrix)) + + matrix_field_of_first_two_examples = dataset[:2]["matrix"] + self.assertIsInstance(matrix_field_of_first_two_examples, list) + self.assertIsInstance(matrix_field_of_first_two_examples[0], list) + self.assertIsInstance(matrix_field_of_first_two_examples[0][0], list) + self.assertEqual(np.array(matrix_field_of_first_two_examples).shape, (2, *shape_2)) + + with dataset.formatted_as("numpy"): + self.assertEqual(dataset["matrix"].shape, (2, *shape_2)) + self.assertEqual(dataset[0]["matrix"].shape, shape_2) + self.assertEqual(dataset[:2]["matrix"].shape, (2, *shape_2)) + + with dataset.formatted_as("pandas"): + self.assertIsInstance(dataset["matrix"], pd.Series) + self.assertIsInstance(dataset[0]["matrix"], pd.Series) + self.assertIsInstance(dataset[:2]["matrix"], pd.Series) + self.assertEqual(dataset["matrix"].to_numpy().shape, (2, *shape_2)) + self.assertEqual(dataset[0]["matrix"].to_numpy().shape, (1, *shape_2)) + self.assertEqual(dataset[:2]["matrix"].to_numpy().shape, (2, *shape_2)) + + def test_write(self, array_feature, shape_1, shape_2): + + with tempfile.TemporaryDirectory() as tmp_dir: + + my_features = self.get_features(array_feature, shape_1, shape_2) + writer = ArrowWriter(features=my_features, path=os.path.join(tmp_dir, "beta.arrow")) + my_examples = [ + (0, self.get_dict_example_0(shape_1, shape_2)), + (1, self.get_dict_example_1(shape_1, shape_2)), + ] + for key, record in my_examples: + example = my_features.encode_example(record) + writer.write(example) + num_examples, num_bytes = writer.finalize() + dataset = nlp.Dataset.from_file(os.path.join(tmp_dir, "beta.arrow")) + self._check_getitem_output_type(dataset, shape_1, shape_2, my_examples[0][1]["matrix"]) + + def test_write_batch(self, array_feature, shape_1, shape_2): + + with tempfile.TemporaryDirectory() as tmp_dir: + + my_features = self.get_features(array_feature, shape_1, shape_2) + writer = ArrowWriter(features=my_features, path=os.path.join(tmp_dir, "beta.arrow")) + + dict_examples = self.get_dict_examples(shape_1, shape_2) + dict_examples = my_features.encode_batch(dict_examples) + writer.write_batch(dict_examples) + num_examples, num_bytes = writer.finalize() + dataset = nlp.Dataset.from_file(os.path.join(tmp_dir, "beta.arrow")) + self._check_getitem_output_type(dataset, shape_1, shape_2, dict_examples["matrix"][0]) + + def test_from_dict(self, array_feature, shape_1, shape_2): + dict_examples = self.get_dict_examples(shape_1, shape_2) + dataset = nlp.Dataset.from_dict(dict_examples, features=self.get_features(array_feature, shape_1, shape_2)) + self._check_getitem_output_type(dataset, shape_1, shape_2, dict_examples["matrix"][0]) + + +if __name__ == "__main__": # useful to run the profiler + SpeedBenchmarkTest().test_benchmark_speed() diff --git a/tests/test_arrow_dataset.py b/tests/test_arrow_dataset.py index f24ddaaf37a..f0bb2466e8b 100644 --- a/tests/test_arrow_dataset.py +++ b/tests/test_arrow_dataset.py @@ -902,3 +902,12 @@ def test_format_nested(self): self.assertIsNotNone(dset[:2]) self.assertIsInstance(dset[:2]["nested"][0]["foo"], torch.Tensor) self.assertIsInstance(dset["nested"][0]["foo"], torch.Tensor) + + def test_format_pandas(self): + dset = self._create_dummy_dataset(multiple_columns=True) + import pandas as pd + + dset.set_format("pandas") + self.assertIsInstance(dset[0], pd.DataFrame) + self.assertIsInstance(dset[:2], pd.DataFrame) + self.assertIsInstance(dset["col_1"], pd.Series) diff --git a/tests/test_arrow_writer.py b/tests/test_arrow_writer.py new file mode 100644 index 00000000000..71381c68f8b --- /dev/null +++ b/tests/test_arrow_writer.py @@ -0,0 +1,124 @@ +import os +import tempfile +from unittest import TestCase + +import pyarrow as pa + +from nlp.arrow_writer import ArrowWriter, TypedSequence +from nlp.features import Array2DExtensionType + + +class TypedSequenceTest(TestCase): + def test_no_type(self): + arr = pa.array(TypedSequence([1, 2, 3])) + self.assertEqual(arr.type, pa.int64()) + + def test_array_type_forbidden(self): + with self.assertRaises(AssertionError): + _ = pa.array(TypedSequence([1, 2, 3]), type=pa.int64()) + + def test_try_type_and_type_forbidden(self): + with self.assertRaises(AssertionError): + _ = pa.array(TypedSequence([1, 2, 3], try_type=pa.bool_(), type=pa.int64())) + + def test_compatible_type(self): + arr = pa.array(TypedSequence([1, 2, 3], type=pa.int32())) + self.assertEqual(arr.type, pa.int32()) + + def test_incompatible_type(self): + with self.assertRaises((TypeError, pa.lib.ArrowInvalid)): + _ = pa.array(TypedSequence(["foo", "bar"], type=pa.int64())) + + def test_try_compatible_type(self): + arr = pa.array(TypedSequence([1, 2, 3], try_type=pa.int32())) + self.assertEqual(arr.type, pa.int32()) + + def test_try_incompatible_type(self): + arr = pa.array(TypedSequence(["foo", "bar"], try_type=pa.int64())) + self.assertEqual(arr.type, pa.string()) + + def test_compatible_extension_type(self): + arr = pa.array(TypedSequence([[[1, 2, 3]]], type=Array2DExtensionType((1, 3), "int64"))) + self.assertEqual(arr.type, Array2DExtensionType((1, 3), "int64")) + + def test_incompatible_extension_type(self): + with self.assertRaises((TypeError, pa.lib.ArrowInvalid)): + _ = pa.array(TypedSequence(["foo", "bar"], type=Array2DExtensionType((1, 3), "int64"))) + + def test_try_compatible_extension_type(self): + arr = pa.array(TypedSequence([[[1, 2, 3]]], try_type=Array2DExtensionType((1, 3), "int64"))) + self.assertEqual(arr.type, Array2DExtensionType((1, 3), "int64")) + + def test_try_incompatible_extension_type(self): + arr = pa.array(TypedSequence(["foo", "bar"], try_type=Array2DExtensionType((1, 3), "int64"))) + self.assertEqual(arr.type, pa.string()) + + def test_catch_overflow(self): + with self.assertRaises(OverflowError): + _ = pa.array(TypedSequence([["x" * 1024]] * ((2 << 20) + 1))) # ListArray with a bit more than 2GB + + +class ArrowWriterTest(TestCase): + def _check_output(self, output): + mmap = pa.BufferReader(output) if isinstance(output, pa.Buffer) else pa.memory_map(output) + f = pa.ipc.open_stream(mmap) + pa_table: pa.Table = f.read_all() + self.assertDictEqual(pa_table.to_pydict(), {"col_1": ["foo", "bar"], "col_2": [1, 2]}) + + def test_write_no_schema(self): + output = pa.BufferOutputStream() + writer = ArrowWriter(stream=output) + writer.write({"col_1": "foo", "col_2": 1}) + writer.write({"col_1": "bar", "col_2": 2}) + num_examples, num_bytes = writer.finalize() + self.assertEqual(num_examples, 2) + self.assertGreater(num_bytes, 0) + fields = {"col_1": pa.string(), "col_2": pa.int64()} + self.assertEqual(writer._schema, pa.schema(fields)) + self._check_output(output.getvalue()) + + def test_write_schema(self): + fields = {"col_1": pa.string(), "col_2": pa.int64()} + output = pa.BufferOutputStream() + writer = ArrowWriter(stream=output, schema=pa.schema(fields)) + writer.write({"col_1": "foo", "col_2": 1}) + writer.write({"col_1": "bar", "col_2": 2}) + num_examples, num_bytes = writer.finalize() + self.assertEqual(num_examples, 2) + self.assertGreater(num_bytes, 0) + self.assertEqual(writer._schema, pa.schema(fields)) + self._check_output(output.getvalue()) + + def test_write_batch_no_schema(self): + output = pa.BufferOutputStream() + writer = ArrowWriter(stream=output) + writer.write_batch({"col_1": ["foo", "bar"], "col_2": [1, 2]}) + num_examples, num_bytes = writer.finalize() + self.assertEqual(num_examples, 2) + self.assertGreater(num_bytes, 0) + fields = {"col_1": pa.string(), "col_2": pa.int64()} + self.assertEqual(writer._schema, pa.schema(fields)) + self._check_output(output.getvalue()) + + def test_write_batch_schema(self): + fields = {"col_1": pa.string(), "col_2": pa.int64()} + output = pa.BufferOutputStream() + writer = ArrowWriter(stream=output, schema=pa.schema(fields)) + writer.write_batch({"col_1": ["foo", "bar"], "col_2": [1, 2]}) + num_examples, num_bytes = writer.finalize() + self.assertEqual(num_examples, 2) + self.assertGreater(num_bytes, 0) + self.assertEqual(writer._schema, pa.schema(fields)) + self._check_output(output.getvalue()) + + def test_write_file(self): + with tempfile.TemporaryDirectory() as tmp_dir: + fields = {"col_1": pa.string(), "col_2": pa.int64()} + output = os.path.join(tmp_dir, "test.arrow") + writer = ArrowWriter(path=output, schema=pa.schema(fields)) + writer.write_batch({"col_1": ["foo", "bar"], "col_2": [1, 2]}) + num_examples, num_bytes = writer.finalize() + self.assertEqual(num_examples, 2) + self.assertGreater(num_bytes, 0) + self.assertEqual(writer._schema, pa.schema(fields)) + self._check_output(output) diff --git a/tests/test_dataset_dict.py b/tests/test_dataset_dict.py index 2e640b84442..8cbbcccfc1d 100644 --- a/tests/test_dataset_dict.py +++ b/tests/test_dataset_dict.py @@ -166,7 +166,7 @@ def test_map(self): lambda ex: {"bar": ["foo"] * len(ex["filename"])}, batched=True, cache_file_names=cache_file_names ) self.assertListEqual(list(dsets.keys()), list(mapped_dsets_2.keys())) - self.assertListEqual(mapped_dsets_2["train"].column_names, ["filename", "foo", "bar"]) + self.assertListEqual(sorted(mapped_dsets_2["train"].column_names), sorted(["filename", "foo", "bar"])) def test_filter(self): with tempfile.TemporaryDirectory() as tmp_dir: