/
table.py
2126 lines (1767 loc) 路 81.9 KB
/
table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import copy
import os
import tempfile
import warnings
from functools import partial
from itertools import groupby
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, TypeVar, Union
import numpy as np
import pyarrow as pa
from . import config
from .utils.logging import get_logger
if TYPE_CHECKING:
from .features.features import Features, FeatureType
logger = get_logger(__name__)
def inject_arrow_table_documentation(arrow_table_method):
def wrapper(fn):
fn.__doc__ = arrow_table_method.__doc__ + (fn.__doc__ if fn.__doc__ is not None else "")
fn.__doc__ = fn.__doc__.replace("pyarrow.Table", "Table")
if hasattr(arrow_table_method, "__annotations__"):
fn.__annotations__ = arrow_table_method.__annotations__
return fn
return wrapper
def _in_memory_arrow_table_from_file(filename: str) -> pa.Table:
in_memory_stream = pa.input_stream(filename)
opened_stream = pa.ipc.open_stream(in_memory_stream)
pa_table = opened_stream.read_all()
return pa_table
def _in_memory_arrow_table_from_buffer(buffer: pa.Buffer) -> pa.Table:
stream = pa.BufferReader(buffer)
opened_stream = pa.ipc.open_stream(stream)
table = opened_stream.read_all()
return table
def _memory_mapped_arrow_table_from_file(filename: str) -> pa.Table:
memory_mapped_stream = pa.memory_map(filename)
opened_stream = pa.ipc.open_stream(memory_mapped_stream)
pa_table = opened_stream.read_all()
return pa_table
def _write_table_to_file(table: pa.Table, filename: str) -> int:
with open(filename, "wb") as sink:
writer = pa.RecordBatchStreamWriter(sink=sink, schema=table.schema)
batches: List[pa.RecordBatch] = table.to_batches()
for batch in batches:
writer.write_batch(batch)
writer.close()
return sum(batch.nbytes for batch in batches)
def _deepcopy(x, memo: dict):
"""deepcopy a regular class instance"""
cls = x.__class__
result = cls.__new__(cls)
memo[id(x)] = result
for k, v in x.__dict__.items():
setattr(result, k, copy.deepcopy(v, memo))
return result
def _interpolation_search(arr: List[int], x: int) -> int:
"""
Return the position i of a sorted array so that arr[i] <= x < arr[i+1]
Args:
arr (:obj:`List[int]`): non-empty sorted list of integers
x (:obj:`int`): query
Returns:
`int`: the position i so that arr[i] <= x < arr[i+1]
Raises:
`IndexError`: if the array is empty or if the query is outside the array values
"""
i, j = 0, len(arr) - 1
while i < j and arr[i] <= x < arr[j]:
k = i + ((j - i) * (x - arr[i]) // (arr[j] - arr[i]))
if arr[k] <= x < arr[k + 1]:
return k
elif arr[k] < x:
i, j = k + 1, j
else:
i, j = i, k
raise IndexError(f"Invalid query '{x}' for size {arr[-1] if len(arr) else 'none'}.")
class IndexedTableMixin:
def __init__(self, table: pa.Table):
self._schema = table.schema
self._batches = [recordbatch for recordbatch in table.to_batches() if len(recordbatch) > 0]
self._offsets: np.ndarray = np.cumsum([0] + [len(b) for b in self._batches], dtype=np.int64)
def fast_gather(self, indices: Union[List[int], np.ndarray]) -> pa.Table:
"""
Create a pa.Table by gathering the records at the records at the specified indices. Should be faster
than pa.concat_tables(table.fast_slice(int(i) % table.num_rows, 1) for i in indices) since NumPy can compute
the binary searches in parallel, highly optimized C
"""
if not len(indices):
raise ValueError("Indices must be non-empty")
batch_indices = np.searchsorted(self._offsets, indices, side="right") - 1
return pa.Table.from_batches(
[
self._batches[batch_idx].slice(i - self._offsets[batch_idx], 1)
for batch_idx, i in zip(batch_indices, indices)
],
schema=self._schema,
)
def fast_slice(self, offset=0, length=None) -> pa.Table:
"""
Slice the Table using interpolation search.
The behavior is the same as :obj:`pyarrow.Table.slice` but it's significantly faster.
Interpolation search is used to find the start and end indexes of the batches we want to keep.
The batches to keep are then concatenated to form the sliced Table.
"""
if offset < 0:
raise IndexError("Offset must be non-negative")
elif offset >= self._offsets[-1] or (length is not None and length <= 0):
return pa.Table.from_batches([], schema=self._schema)
i = _interpolation_search(self._offsets, offset)
if length is None or length + offset >= self._offsets[-1]:
batches = self._batches[i:]
batches[0] = batches[0].slice(offset - self._offsets[i])
else:
j = _interpolation_search(self._offsets, offset + length - 1)
batches = self._batches[i : j + 1]
batches[-1] = batches[-1].slice(0, offset + length - self._offsets[j])
batches[0] = batches[0].slice(offset - self._offsets[i])
return pa.Table.from_batches(batches, schema=self._schema)
class Table(IndexedTableMixin):
"""
Wraps a pyarrow Table by using composition.
This is the base class for InMemoryTable, MemoryMappedTable and ConcatenationTable.
It implements all the basic attributes/methods of the pyarrow Table class except
the Table transforms: slice, filter, flatten, combine_chunks, cast, add_column,
append_column, remove_column, set_column, rename_columns and drop.
The implementation of these methods differs for the subclasses.
"""
def __init__(self, table: pa.Table):
super().__init__(table)
self.table = table
def __deepcopy__(self, memo: dict):
# arrow tables are immutable, so there's no need to copy self.table
# moreover calling deepcopy on a pyarrow table seems to make pa.total_allocated_bytes() decrease for some reason
# by adding it to the memo, self.table won't be copied
memo[id(self.table)] = self.table
# same for the recordbatches used by the index
memo[id(self._batches)] = list(self._batches)
return _deepcopy(self, memo)
def __getstate__(self):
# We can't pickle objects that are bigger than 4GiB, or it causes OverflowError
# So we write the table on disk instead
if self.table.nbytes >= config.MAX_TABLE_NBYTES_FOR_PICKLING:
table = self.table
with tempfile.NamedTemporaryFile("wb", delete=False, suffix=".arrow") as tmp_file:
filename = tmp_file.name
logger.debug(
f"Attempting to pickle a table bigger than 4GiB. Writing it on the disk instead at {filename}"
)
_write_table_to_file(table=table, filename=filename)
return {"path": filename}
else:
return {"table": self.table}
def __setstate__(self, state):
if "path" in state:
filename = state["path"]
logger.debug(f"Unpickling a big table from the disk at {filename}")
table = _in_memory_arrow_table_from_file(filename)
logger.debug(f"Removing temporary table file at {filename}")
os.remove(filename)
else:
table = state["table"]
Table.__init__(self, table)
def validate(self, *args, **kwargs):
"""
Perform validation checks. An exception is raised if validation fails.
By default only cheap validation checks are run. Pass `full=True`
for thorough validation checks (potentially O(n)).
Args:
full (:obj:`bool`, defaults to :obj:`False`):
If True, run expensive checks, otherwise cheap checks only.
Raises:
pa.lib.ArrowInvalid: if validation fails
"""
return self.table.validate(*args, **kwargs)
def equals(self, *args, **kwargs):
"""
Check if contents of two tables are equal.
Args:
other (:class:`datasets.table.Table`):
Table to compare against.
check_metadata (:obj:`bool`, defaults to :obj:`False`):
Whether schema metadata equality should be checked as well.
Returns:
:obj:`bool`
"""
args = tuple(arg.table if isinstance(arg, Table) else arg for arg in args)
kwargs = {k: v.table if isinstance(v, Table) else v for k, v in kwargs}
return self.table.equals(*args, **kwargs)
def to_batches(self, *args, **kwargs):
"""
Convert Table to list of (contiguous) RecordBatch objects.
Args:
max_chunksize (:obj:`int`, defaults to `None`):
Maximum size for RecordBatch chunks. Individual chunks may be
smaller depending on the chunk layout of individual columns.
Returns:
:obj:`List[pyarrow.RecordBatch]`:
"""
return self.table.to_batches(*args, **kwargs)
def to_pydict(self, *args, **kwargs):
"""
Convert the Table to a dict or OrderedDict.
Returns:
:obj:`dict`
"""
return self.table.to_pydict(*args, **kwargs)
def to_pylist(self, *args, **kwargs):
"""
Convert the Table to a list
Returns:
:obj:`list`
"""
try:
return self.table.to_pylist(*args, **kwargs)
except AttributeError: # pyarrow <7 does not have to_pylist, so we use to_pydict
pydict = self.table.to_pydict(*args, **kwargs)
return [{k: pydict[k][i] for k in pydict} for i in range(len(self.table))]
def to_pandas(self, *args, **kwargs):
"""
Convert to a pandas-compatible NumPy array or DataFrame, as appropriate
Args:
memory_pool (:obj:`MemoryPool`, defaults to :obj:`None`):
Arrow MemoryPool to use for allocations. Uses the default memory
pool is not passed.
strings_to_categorical (:obj:`bool`, defaults to :obj:`False`):
Encode string (UTF8) and binary types to pandas.Categorical.
categories (:obj:`list`, defaults to :obj:`empty`):
List of fields that should be returned as pandas.Categorical. Only
applies to table-like data structures.
zero_copy_only (:obj:`bool`, defaults to :obj:`False`):
Raise an ArrowException if this function call would require copying
the underlying data.
integer_object_nulls (:obj:`bool`, defaults to :obj:`False`):
Cast integers with nulls to objects
date_as_object (:obj:`bool`, defaults to :obj:`True`):
Cast dates to objects. If False, convert to datetime64[ns] dtype.
timestamp_as_object (:obj:`bool`, defaults to :obj:`False`):
Cast non-nanosecond timestamps (np.datetime64) to objects. This is
useful if you have timestamps that don't fit in the normal date
range of nanosecond timestamps (1678 CE-2262 CE).
If False, all timestamps are converted to datetime64[ns] dtype.
use_threads (:obj:`bool`, defaults to :obj:`True`):
Whether to parallelize the conversion using multiple threads.
deduplicate_objects (:obj:`bool`, defaults to :obj:`False`):
Do not create multiple copies Python objects when created, to save
on memory use. Conversion will be slower.
ignore_metadata (:obj:`bool`, defaults to :obj:`False`):
If True, do not use the 'pandas' metadata to reconstruct the
DataFrame index, if present
safe (:obj:`bool`, defaults to :obj:`True`):
For certain data types, a cast is needed in order to store the
data in a pandas DataFrame or Series (e.g. timestamps are always
stored as nanoseconds in pandas). This option controls whether it
is a safe cast or not.
split_blocks (:obj:`bool`, defaults to :obj:`False`):
If True, generate one internal "block" for each column when
creating a pandas.DataFrame from a RecordBatch or Table. While this
can temporarily reduce memory note that various pandas operations
can trigger "consolidation" which may balloon memory use.
self_destruct (:obj:`bool`, defaults to :obj:`False`):
EXPERIMENTAL: If True, attempt to deallocate the originating Arrow
memory while converting the Arrow object to pandas. If you use the
object after calling to_pandas with this option it will crash your
program.
types_mapper (:obj:`function`, defaults to :obj:`None`):
A function mapping a pyarrow DataType to a pandas ExtensionDtype.
This can be used to override the default pandas type for conversion
of built-in pyarrow types or in absence of pandas_metadata in the
Table schema. The function receives a pyarrow DataType and is
expected to return a pandas ExtensionDtype or ``None`` if the
default conversion should be used for that type. If you have
a dictionary mapping, you can pass ``dict.get`` as function.
Returns:
:obj:`pandas.Series` or :obj:`pandas.DataFrame`: :obj:`pandas.Series` or :obj:`pandas.DataFrame` depending on type of object
"""
return self.table.to_pandas(*args, **kwargs)
def to_string(self, *args, **kwargs):
return self.table.to_string(*args, **kwargs)
def field(self, *args, **kwargs):
"""
Select a schema field by its column name or numeric index.
Args:
i (:obj:`Union[int, str]`):
The index or name of the field to retrieve.
Returns:
:obj:`pyarrow.Field`:
"""
return self.table.field(*args, **kwargs)
def column(self, *args, **kwargs):
"""
Select a column by its column name, or numeric index.
Args:
i (:obj:`Union[int, str]`):
The index or name of the column to retrieve.
Returns:
:obj:`pyarrow.ChunkedArray`:
"""
return self.table.column(*args, **kwargs)
def itercolumns(self, *args, **kwargs):
"""
Iterator over all columns in their numerical order.
Yields:
:obj:`pyarrow.ChunkedArray`:
"""
return self.table.itercolumns(*args, **kwargs)
@property
def schema(self):
"""
Schema of the table and its columns.
Returns:
:obj:`pyarrow.Schema`:
"""
return self.table.schema
@property
def columns(self):
"""
List of all columns in numerical order.
Returns:
:obj:`List[pa.ChunkedArray]`:
"""
return self.table.columns
@property
def num_columns(self):
"""
Number of columns in this table.
Returns:
int:
"""
return self.table.num_columns
@property
def num_rows(self):
"""
Number of rows in this table.
Due to the definition of a table, all columns have the same number of
rows.
Returns:
int:
"""
return self.table.num_rows
@property
def shape(self):
"""
Dimensions of the table: (#rows, #columns).
Returns:
:obj:`(int, int)`: Number of rows and number of columns.
"""
return self.table.shape
@property
def nbytes(self):
"""
Total number of bytes consumed by the elements of the table.
"""
return self.table.nbytes
@property
def column_names(self):
"""
Names of the table's columns
"""
return self.table.column_names
def __eq__(self, other):
return self.equals(other)
def __getitem__(self, i):
return self.table[i]
def __len__(self):
return len(self.table)
def __repr__(self):
return self.table.__repr__().replace("pyarrow.Table", self.__class__.__name__)
def __str__(self):
return self.table.__str__().replace("pyarrow.Table", self.__class__.__name__)
def slice(self, *args, **kwargs):
"""
Compute zero-copy slice of this Table
Args:
offset (:obj:`int`, defaults to :obj:`0`):
Offset from start of table to slice
length (:obj:`int`, defaults to :obj:`None`):
Length of slice (default is until end of table starting from
offset)
Returns:
:class:`datasets.table.Table`:
"""
raise NotImplementedError()
def filter(self, *args, **kwargs):
"""
Select records from a Table. See pyarrow.compute.filter for full usage.
"""
raise NotImplementedError()
def flatten(self, *args, **kwargs):
"""
Flatten this Table. Each column with a struct type is flattened
into one column per struct field. Other columns are left unchanged.
Args:
memory_pool (:obj:`MemoryPool`, defaults to :obj:`None`):
For memory allocations, if required, otherwise use default pool
Returns:
:class:`datasets.table.Table`:
"""
raise NotImplementedError()
def combine_chunks(self, *args, **kwargs):
"""
Make a new table by combining the chunks this table has.
All the underlying chunks in the ChunkedArray of each column are
concatenated into zero or one chunk.
Args:
memory_pool (:obj:`MemoryPool`, defaults to :obj:`None`):
For memory allocations, if required, otherwise use default pool
Returns:
:class:`datasets.table.Table`:
"""
raise NotImplementedError()
def cast(self, *args, **kwargs):
"""
Cast table values to another schema
Args:
target_schema (:obj:`Schema`):
Schema to cast to, the names and order of fields must match
safe (:obj:`bool`, defaults to :obj:`True`):
Check for overflows or other unsafe conversions
Returns:
:class:`datasets.table.Table`:
"""
raise NotImplementedError()
def replace_schema_metadata(self, *args, **kwargs):
"""
EXPERIMENTAL: Create shallow copy of table by replacing schema
key-value metadata with the indicated new metadata (which may be None,
which deletes any existing metadata
Args:
metadata (:obj:`dict`, defaults to :obj:`None`):
Returns:
:class:`datasets.table.Table`: shallow_copy
"""
raise NotImplementedError()
def add_column(self, *args, **kwargs):
"""
Add column to Table at position.
A new table is returned with the column added, the original table
object is left unchanged.
Args:
i (:obj:`int`):
Index to place the column at.
field_ (:obj:`Union[str, pyarrow.Field]`):
If a string is passed then the type is deduced from the column
data.
column (:obj:`Union[pyarrow.Array, List[pyarrow.Array]]`):
Column data.
Returns:
:class:`datasets.table.Table`: New table with the passed column added.
"""
raise NotImplementedError()
def append_column(self, *args, **kwargs):
"""
Append column at end of columns.
Args:
field_ (:obj:`Union[str, pyarrow.Field]`):
If a string is passed then the type is deduced from the column
data.
column (:obj:`Union[pyarrow.Array, List[pyarrow.Array]]`):
Column data.
Returns:
:class:`datasets.table.Table`: New table with the passed column added.
"""
raise NotImplementedError()
def remove_column(self, *args, **kwargs):
"""
Create new Table with the indicated column removed.
Args:
i (:obj:`int`):
Index of column to remove.
Returns:
:class:`datasets.table.Table`: New table without the column.
"""
raise NotImplementedError()
def set_column(self, *args, **kwargs):
"""
Replace column in Table at position.
Args:
i (:obj:`int`):
Index to place the column at.
field_ (:obj:`Union[str, pyarrow.Field]`):
If a string is passed then the type is deduced from the column
data.
column (:obj:`Union[pyarrow.Array, List[pyarrow.Array]]`):
Column data.
Returns:
:class:`datasets.table.Table`: New table with the passed column set.
"""
raise NotImplementedError()
def rename_columns(self, *args, **kwargs):
"""
Create new table with columns renamed to provided names.
"""
raise NotImplementedError()
def drop(self, *args, **kwargs):
"""
Drop one or more columns and return a new table.
Args:
columns (:obj:`List[str]`):
List of field names referencing existing columns.
Raises:
KeyError : if any of the passed columns name are not existing.
Returns:
:class:`datasets.table.Table`: New table without the columns.
"""
raise NotImplementedError()
# Additional methods that are based on the PyArrow Table methods
def select_columns(self, columns: List[int]) -> "Table":
"""Return the table by keeping only the requested columns
Returns:
:class:`datasets.table.Table`: table with only a subset of the columns
"""
for column_to_remove in set(range(len(self.column_names))) - set(columns):
self = self.remove_column(column_to_remove)
return self
class TableBlock(Table):
"""
TableBlock is the allowed class inside a ConcanetationTable.
Only MemoryMappedTable and InMemoryTable are TableBlock.
This is because we don't want a ConcanetationTable made out of other ConcanetationTables.
"""
pass
class InMemoryTable(TableBlock):
"""
The table is said in-memory when it is loaded into the user's RAM.
Pickling it does copy all the data using memory.
Its implementation is simple and uses the underlying pyarrow Table methods directly.
This is different from the MemoryMapped table, for which pickling doesn't copy all the
data in memory. For a MemoryMapped, unpickling instead reloads the table from the disk.
InMemoryTable must be used when data fit in memory, while MemoryMapped are reserved for
data bigger than memory or when you want the memory footprint of your application to
stay low.
"""
@classmethod
def from_file(cls, filename: str):
table = _in_memory_arrow_table_from_file(filename)
return cls(table)
@classmethod
def from_buffer(cls, buffer: pa.Buffer):
table = _in_memory_arrow_table_from_buffer(buffer)
return cls(table)
@classmethod
def from_pandas(cls, *args, **kwargs):
"""
Convert pandas.DataFrame to an Arrow Table.
The column types in the resulting Arrow Table are inferred from the
dtypes of the pandas.Series in the DataFrame. In the case of non-object
Series, the NumPy dtype is translated to its Arrow equivalent. In the
case of `object`, we need to guess the datatype by looking at the
Python objects in this Series.
Be aware that Series of the `object` dtype don't carry enough
information to always lead to a meaningful Arrow type. In the case that
we cannot infer a type, e.g. because the DataFrame is of length 0 or
the Series only contains None/nan objects, the type is set to
null. This behavior can be avoided by constructing an explicit schema
and passing it to this function.
Args:
df (:obj:`pandas.DataFrame`):
schema (:obj:`pyarrow.Schema`, optional):
The expected schema of the Arrow Table. This can be used to
indicate the type of columns if we cannot infer it automatically.
If passed, the output will have exactly this schema. Columns
specified in the schema that are not found in the DataFrame columns
or its index will raise an error. Additional columns or index
levels in the DataFrame which are not specified in the schema will
be ignored.
preserve_index (:obj:`bool`, optional):
Whether to store the index as an additional column in the resulting
``Table``. The default of None will store the index as a column,
except for RangeIndex which is stored as metadata only. Use
``preserve_index=True`` to force it to be stored as a column.
nthreads (:obj:`int`, defaults to :obj:`None` (may use up to system CPU count threads))
If greater than 1, convert columns to Arrow in parallel using
indicated number of threads
columns (:obj:`List[str]`, optional):
List of column to be converted. If None, use all columns.
safe (:obj:`bool`, defaults to :obj:`True`):
Check for overflows or other unsafe conversions
Returns:
:class:`datasets.table.Table`:
Examples:
```python
>>> import pandas as pd
>>> import pyarrow as pa
>>> df = pd.DataFrame({
... 'int': [1, 2],
... 'str': ['a', 'b']
... })
>>> pa.Table.from_pandas(df)
<pyarrow.lib.Table object at 0x7f05d1fb1b40>
```
"""
return cls(pa.Table.from_pandas(*args, **kwargs))
@classmethod
def from_arrays(cls, *args, **kwargs):
"""
Construct a Table from Arrow arrays
Args:
arrays (:obj:`List[Union[pyarrow.Array, pyarrow.ChunkedArray]]`):
Equal-length arrays that should form the table.
names (:obj:`List[str]`, optional):
Names for the table columns. If not passed, schema must be passed
schema (:obj:`Schema`, defaults to :obj:`None`):
Schema for the created table. If not passed, names must be passed
metadata (:obj:`Union[dict, Mapping]`, default None):
Optional metadata for the schema (if inferred).
Returns:
:class:`datasets.table.Table`:
"""
return cls(pa.Table.from_arrays(*args, **kwargs))
@classmethod
def from_pydict(cls, *args, **kwargs):
"""
Construct a Table from Arrow arrays or columns
Args:
mapping (:obj:`Union[dict, Mapping]`):
A mapping of strings to Arrays or Python lists.
schema (:obj:`Schema`, defaults to :obj:`None`):
If not passed, will be inferred from the Mapping values
metadata (:obj:`Union[dict, Mapping]`, default None):
Optional metadata for the schema (if inferred).
Returns:
:class:`datasets.table.Table`:
"""
return cls(pa.Table.from_pydict(*args, **kwargs))
@classmethod
def from_pylist(cls, mapping, *args, **kwargs):
"""
Construct a Table from list of rows / dictionaries.
Args:
mapping (:obj:`List[dict]`):
A mapping of strings to row values.
schema (:obj:`Schema`, defaults to :obj:`None`):
If not passed, will be inferred from the Mapping values
metadata (:obj:`Union[dict, Mapping]`, default None):
Optional metadata for the schema (if inferred).
Returns:
:class:`datasets.table.Table`:
"""
try:
return cls(pa.Table.from_pylist(mapping, *args, **kwargs))
except AttributeError: # pyarrow <7 does not have from_pylist, so we convert and use from_pydict
mapping = {k: [r.get(k) for r in mapping] for k in mapping[0]} if mapping else {}
return cls(pa.Table.from_pydict(mapping, *args, **kwargs))
@classmethod
def from_batches(cls, *args, **kwargs):
"""
Construct a Table from a sequence or iterator of Arrow RecordBatches.
Args:
batches (:obj:`Union[Sequence[pyarrow.RecordBatch], Iterator[pyarrow.RecordBatch]]`):
Sequence of RecordBatch to be converted, all schemas must be equal.
schema (:obj:`Schema`, defaults to :obj:`None`):
If not passed, will be inferred from the first RecordBatch.
Returns:
:class:`datasets.table.Table`:
"""
return cls(pa.Table.from_batches(*args, **kwargs))
def slice(self, offset=0, length=None):
"""
Compute zero-copy slice of this Table
Args:
offset (:obj:`int`, defaults to :obj:`0`):
Offset from start of table to slice
length (:obj:`int`, defaults to :obj:`None`):
Length of slice (default is until end of table starting from
offset)
Returns:
:class:`datasets.table.Table`:
"""
# Use fast slicing here
return InMemoryTable(self.fast_slice(offset=offset, length=length))
def filter(self, *args, **kwargs):
"""
Select records from a Table. See pyarrow.compute.filter for full usage.
"""
return InMemoryTable(self.table.filter(*args, **kwargs))
def flatten(self, *args, **kwargs):
"""
Flatten this Table. Each column with a struct type is flattened
into one column per struct field. Other columns are left unchanged.
Args:
memory_pool (:obj:`MemoryPool`, defaults to :obj:`None`):
For memory allocations, if required, otherwise use default pool
Returns:
:class:`datasets.table.Table`:
"""
return InMemoryTable(table_flatten(self.table, *args, **kwargs))
def combine_chunks(self, *args, **kwargs):
"""
Make a new table by combining the chunks this table has.
All the underlying chunks in the ChunkedArray of each column are
concatenated into zero or one chunk.
Args:
memory_pool (:obj:`MemoryPool`, defaults to :obj:`None`):
For memory allocations, if required, otherwise use default pool
Returns:
:class:`datasets.table.Table`:
"""
return InMemoryTable(self.table.combine_chunks(*args, **kwargs))
def cast(self, *args, **kwargs):
"""
Cast table values to another schema
Args:
target_schema (:obj:`Schema`):
Schema to cast to, the names and order of fields must match
safe (:obj:`bool`, defaults to :obj:`True`):
Check for overflows or other unsafe conversions
Returns:
:class:`datasets.table.Table`:
"""
return InMemoryTable(table_cast(self.table, *args, **kwargs))
def replace_schema_metadata(self, *args, **kwargs):
"""
EXPERIMENTAL: Create shallow copy of table by replacing schema
key-value metadata with the indicated new metadata (which may be None,
which deletes any existing metadata
Args:
metadata (:obj:`dict`, defaults to :obj:`None`):
Returns:
:class:`datasets.table.Table`: shallow_copy
"""
return InMemoryTable(self.table.replace_schema_metadata(*args, **kwargs))
def add_column(self, *args, **kwargs):
"""
Add column to Table at position.
A new table is returned with the column added, the original table
object is left unchanged.
Args:
i (:obj:`int`):
Index to place the column at.
field_ (:obj:`Union[str, pyarrow.Field]`):
If a string is passed then the type is deduced from the column
data.
column (:obj:`Union[pyarrow.Array, List[pyarrow.Array]]`):
Column data.
Returns:
:class:`datasets.table.Table`: New table with the passed column added.
"""
return InMemoryTable(self.table.add_column(*args, **kwargs))
def append_column(self, *args, **kwargs):
"""
Append column at end of columns.
Args:
field_ (:obj:`Union[str, pyarrow.Field]`):
If a string is passed then the type is deduced from the column
data.
column (:obj:`Union[pyarrow.Array, List[pyarrow.Array]]`):
Column data.
Returns:
:class:`datasets.table.Table`:
New table with the passed column added.
"""
return InMemoryTable(self.table.append_column(*args, **kwargs))
def remove_column(self, *args, **kwargs):
"""
Create new Table with the indicated column removed.
Args:
i (:obj:`int`):
Index of column to remove.
Returns:
:class:`datasets.table.Table`:
New table without the column.
"""
return InMemoryTable(self.table.remove_column(*args, **kwargs))
def set_column(self, *args, **kwargs):
"""
Replace column in Table at position.
Args:
i (:obj:`int`):
Index to place the column at.
field_ (:obj:`Union[str, pyarrow.Field]`):
If a string is passed then the type is deduced from the column
data.
column (:obj:`Union[pyarrow.Array, List[pyarrow.Array]]`):
Column data.
Returns:
:class:`datasets.table.Table`:
New table with the passed column set.
"""
return InMemoryTable(self.table.set_column(*args, **kwargs))
def rename_columns(self, *args, **kwargs):
"""
Create new table with columns renamed to provided names.
"""
return InMemoryTable(self.table.rename_columns(*args, **kwargs))
def drop(self, *args, **kwargs):
"""
Drop one or more columns and return a new table.
Args:
columns (:obj:`List[str]`):
List of field names referencing existing columns.
Raises:
KeyError : if any of the passed columns name are not existing.
Returns:
:class:`datasets.table.Table`:
New table without the columns.
"""
return InMemoryTable(self.table.drop(*args, **kwargs))
# The MemoryMappedTable needs replays to properly reload tables from the disk
Replay = Tuple[str, tuple, dict]
class MemoryMappedTable(TableBlock):
"""
The table is said memory mapped when it doesn't use the user's RAM but loads the data
from the disk instead.
Pickling it doesn't copy the data into memory.
Instead, only the path to the memory mapped arrow file is pickled, as well as the list
of transforms to "replay" when reloading the table from the disk.
Its implementation requires to store an history of all the transforms that were applied
to the underlying pyarrow Table, so that they can be "replayed" when reloading the Table
from the disk.
This is different from the InMemoryTable table, for which pickling does copy all the
data in memory.
InMemoryTable must be used when data fit in memory, while MemoryMapped are reserved for