-
Notifications
You must be signed in to change notification settings - Fork 14
/
parquet.py
1216 lines (1049 loc) · 44.9 KB
/
parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools
import io as py_io
import itertools
import logging
import math
import operator
import os
import threading
import warnings
from collections import defaultdict
from uuid import uuid4
from packaging.version import Version
try:
import cudf
import dask_cudf
from cudf.io.parquet import ParquetWriter as pwriter_cudf
from dask_cudf.io.parquet import CudfEngine
except ImportError:
cudf = None
import dask
import dask.dataframe as dd
import fsspec
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as pa_ds
import toolz as tlz
from dask.base import tokenize
from dask.dataframe.core import _concat, new_dd_object
from dask.dataframe.io.parquet.arrow import ArrowDatasetEngine
from dask.dataframe.io.parquet.core import apply_filters
from dask.dataframe.io.parquet.utils import _analyze_paths
from dask.delayed import Delayed
from dask.highlevelgraph import HighLevelGraph
from dask.utils import natural_sort_key, parse_bytes
from fsspec.core import get_fs_token_paths
from pyarrow import parquet as pq
from pyarrow.parquet import ParquetWriter as pwriter_pyarrow
if Version(dask.__version__) >= Version("2021.07.1"):
from dask.dataframe.io.parquet.core import aggregate_row_groups
else:
aggregate_row_groups = None
from merlin.core.utils import run_on_worker
from merlin.io.dataset_engine import DatasetEngine
from merlin.io.fsspec_utils import _optimized_read_partition_remote, _optimized_read_remote
from merlin.io.shuffle import Shuffle, shuffle_df
from merlin.io.writer import ThreadedWriter
LOG = logging.getLogger("merlin")
class CPUParquetEngine(ArrowDatasetEngine):
@staticmethod
def read_metadata(*args, **kwargs):
return _override_read_metadata(ArrowDatasetEngine.read_metadata, *args, **kwargs)
@classmethod
def multi_support(cls):
return hasattr(ArrowDatasetEngine, "multi_support") and ArrowDatasetEngine.multi_support()
@classmethod
def read_partition(cls, *args, **kwargs):
part = ArrowDatasetEngine.read_partition(*args, **kwargs)
# NVTabular does NOT currently support nullable pandas dtypes.
# Convert everything to non-nullable dtypes instead:
# (TODO: Fix issues in Merlin/NVTabular for nullable dtypes)
for k, v in part.dtypes.items():
type_name = str(v)
if type_name.startswith("Int"):
part[k] = part[k].astype(type_name.replace("Int", "int"))
elif type_name.startswith("Float"):
part[k] = part[k].astype(type_name.replace("Float", "float"))
elif type_name.startswith("string"):
# Converts pd.StringDtype to "Object"
part[k] = part[k].astype("O")
return part
# Define GPUParquetEngine if cudf is available
if cudf is not None:
class GPUParquetEngine(CudfEngine):
@staticmethod
def read_metadata(*args, **kwargs):
cudf_version = Version(cudf.__version__)
if (cudf_version.major == 21 and cudf_version.minor == 10) or (
cudf_version.major == 0 and cudf_version.minor == 0
):
# We only need this work-around for cudf-21.10
return _override_read_metadata(_cudf_read_metadata, *args, **kwargs)
return _override_read_metadata(CudfEngine.read_metadata, *args, **kwargs)
@classmethod
def multi_support(cls):
return hasattr(CudfEngine, "multi_support") and CudfEngine.multi_support()
@classmethod
def read_partition(cls, fs, pieces, *args, **kwargs):
cudf_version = Version(cudf.__version__)
cudf_optimized_remote = (cudf_version.major, cudf_version.minor) >= (22, 2)
if (
cudf_optimized_remote
or cudf.utils.ioutils._is_local_filesystem(fs)
or (isinstance(pieces, list) and len(pieces) > 1)
):
# Use dask_cudf version if this is a local file system,
# or if the version of cudf is optimized for remote storage.
# We also fall back to cudf for multi-file aggregation.
return CudfEngine.read_partition(fs, pieces, *args, **kwargs)
# This version of cudf does not include optimized
# fsspec usage for remote storage - Use custom code path.
# TODO: Remove `_optimized_read_partition_remote` once the
# earliest supported cudf version is 22.02
return _optimized_read_partition_remote(fs, pieces, *args, **kwargs)
def _cudf_read_metadata(*args, **kwargs):
#
# NOTE: This function was copied directly from
# cudf:branch-21.12 to avoid bugs in
# cudf:branch-21.10. This work-around can
# be removed when NVTabular is pinned
# to cudf>=21.12
#
meta, stats, parts, index = ArrowDatasetEngine.read_metadata(*args, **kwargs)
new_meta = cudf.from_pandas(meta)
if parts:
# Re-set "object" dtypes align with pa schema
set_object_dtypes_from_pa_schema(
new_meta,
parts[0].get("common_kwargs", {}).get("schema", None),
)
# If `strings_to_categorical==True`, convert objects to int32
strings_to_cats = kwargs.get("strings_to_categorical", False)
for col in new_meta._data.names:
if isinstance(new_meta._data[col], cudf.core.column.StringColumn) and strings_to_cats:
new_meta._data[col] = new_meta._data[col].astype("int32")
return (new_meta, stats, parts, index)
def set_object_dtypes_from_pa_schema(df, schema):
#
# NOTE: This utility was copied directly from
# cudf:branch-21.12 to avoid bugs in
# cudf:branch-21.10. This work-around can
# be removed when NVTabular is pinned
# to cudf>=21.12
#
# Simple utility to modify cudf DataFrame
# "object" dtypes to agree with a specific
# pyarrow schema
if schema:
for col_name, col in df._data.items():
if col_name is None:
# Pyarrow cannot handle `None` as a field name.
# However, this should be a simple range index that
# we can ignore anyway
continue
typ = cudf.utils.dtypes.cudf_dtype_from_pa_type(schema.field(col_name).type)
if (
col_name in schema.names
and not isinstance(typ, (cudf.ListDtype, cudf.StructDtype))
and isinstance(col, cudf.core.column.StringColumn)
):
df._data[col_name] = col.astype(typ)
def _override_read_metadata(
parent_read_metadata,
fs,
paths,
index=None,
gather_statistics=None,
split_row_groups=None,
filters=None,
aggregate_files=None,
dataset=None,
chunksize=None,
**global_kwargs,
):
# This function is used by both CPU and GPU-backed
# ParquetDatasetEngine instances to override the `read_metadata`
# component of the upstream `read_parquet` logic. This provides
# NVTabular with direct access to the final partitioning behavior.
# For now, disallow the user from setting `chunksize`
if chunksize:
raise ValueError(
"NVTabular does not yet support the explicit use " "of Dask's `chunksize` argument."
)
# Extract metadata_collector from the dataset "container"
dataset = dataset or {}
metadata_collector = dataset.pop("metadata_collector", None)
# Gather statistics by default.
# This enables optimized length calculations
if gather_statistics is None:
gather_statistics = True
# Use a local_kwarg dictionary to make it easier to exclude
# `aggregate_files` for older Dask versions
local_kwargs = {
"index": index,
"filters": filters,
# Use chunksize=1 to "ensure" statistics are gathered
# if `gather_statistics=True`. Note that Dask will bail
# from statistics gathering if it does not expect statistics
# to be "used" after `read_metadata` returns.
"chunksize": 1 if gather_statistics else None,
"gather_statistics": gather_statistics,
"split_row_groups": split_row_groups,
}
if aggregate_row_groups is not None:
# File aggregation is only available for Dask>=2021.07.1
local_kwargs["aggregate_files"] = aggregate_files
elif aggregate_files:
raise ValueError("This version of Dask does not support the `aggregate_files` argument.")
# Start with "super-class" read_metadata logic
read_metadata_result = parent_read_metadata(
fs,
paths,
**local_kwargs,
**global_kwargs,
)
parts = read_metadata_result[2].copy()
statistics = read_metadata_result[1].copy()
# Process the statistics.
# Note that these steps are usaually performed after
# `engine.read_metadata` returns (in Dask), but we are doing
# it ourselves in NVTabular (to capture the expected output
# partitioning plan)
if statistics:
result = list(
zip(*[(part, stats) for part, stats in zip(parts, statistics) if stats["num-rows"] > 0])
)
parts, statistics = result or [[], []]
# Apply filters
if filters:
parts, statistics = apply_filters(parts, statistics, filters)
# Apply file aggregation
if aggregate_row_groups is not None:
# Convert `aggregate_files` to an integer `aggregation_depth`
aggregation_depth = False
if len(parts) and aggregate_files:
aggregation_depth = parts[0].get("aggregation_depth", aggregation_depth)
# Aggregate parts/statistics if we are splitting by row-group
if chunksize or (split_row_groups and int(split_row_groups) > 1):
parts, statistics = aggregate_row_groups(
parts, statistics, chunksize, split_row_groups, fs, aggregation_depth
)
# Update `metadata_collector` and return the "original" `read_metadata_result`
metadata_collector["stats"] = statistics
metadata_collector["parts"] = parts
return read_metadata_result
class ParquetDatasetEngine(DatasetEngine):
"""ParquetDatasetEngine is a Dask-based version of cudf.read_parquet."""
def __init__(
self,
paths,
part_size,
storage_options,
row_groups_per_part=None,
legacy=False,
batch_size=None, # Ignored
cpu=False,
**kwargs,
):
super().__init__(paths, part_size, cpu=cpu, storage_options=storage_options)
self._pp_map = None
self._pp_nrows = None
self._pp_metadata = None
self._real_meta = None
# Process `kwargs`
self.read_parquet_kwargs = kwargs.copy()
self.aggregate_files = self.read_parquet_kwargs.pop("aggregate_files", False)
self.filters = self.read_parquet_kwargs.pop("filters", None)
self.dataset_kwargs = self.read_parquet_kwargs.pop("dataset", {})
if row_groups_per_part is None:
self._real_meta, rg_byte_size_0 = run_on_worker(
_sample_row_group,
self._path0,
self.fs,
cpu=self.cpu,
memory_usage=True,
**self.read_parquet_kwargs,
)
row_groups_per_part = self.part_size / rg_byte_size_0
if row_groups_per_part < 1.0:
warnings.warn(
f"Row group memory size ({rg_byte_size_0}) (bytes) of parquet file is bigger"
f" than requested part_size ({self.part_size}) for the NVTabular dataset."
f"A row group memory size of 128 MB is generally recommended. You can find"
f" info on how to set the row group size of parquet files in "
f"https://nvidia-merlin.github.io/NVTabular/main/resources/troubleshooting.html"
f"#setting-the-row-group-size-for-the-parquet-files"
)
row_groups_per_part = 1.0
self.row_groups_per_part = int(row_groups_per_part)
assert self.row_groups_per_part > 0
@property # type: ignore
@functools.lru_cache(1)
def _path0(self):
return next(self._dataset.get_fragments()).path
@property # type: ignore
@functools.lru_cache(1)
def _legacy_dataset(self):
# TODO: Remove this after finding a way to avoid
# the use of `ParquetDataset` in `validate_dataset`
paths = self.paths
fs = self.fs
if len(paths) > 1:
# This is a list of files
dataset = pq.ParquetDataset(paths, filesystem=fs, validate_schema=False)
elif fs.isdir(paths[0]):
# This is a directory
dataset = pq.ParquetDataset(paths[0], filesystem=fs, validate_schema=False)
else:
# This is a single file
dataset = pq.ParquetDataset(paths[0], filesystem=fs)
return dataset
@property # type: ignore
@functools.lru_cache(1)
def _dataset(self):
paths = self.stripped_paths
fs = self.fs
if len(paths) > 1:
# This is a list of files
dataset = pa_ds.dataset(paths, filesystem=fs)
else:
# This is a directory or a single file
dataset = pa_ds.dataset(paths[0], filesystem=fs)
return dataset
@property
def _file_partition_map(self):
if self._pp_map is None:
self._process_parquet_metadata()
return self._pp_map
@property
def _partition_lens(self):
if self._pp_nrows is None:
self._process_parquet_metadata()
return self._pp_nrows
@property
def num_rows(self):
# TODO: Avoid parsing metadata once upstream dask
# can get the length efficiently (in all practical cases)
if self._partition_lens:
return sum(self._partition_lens)
return len(self.to_ddf().index)
def _process_parquet_metadata(self):
# Utility shared by `_file_partition_map` and `_partition_lens`
# to collect useful information from the parquet metadata
# First, we need to populate `self._pp_metadata`
if self._pp_metadata is None:
_ = self.to_ddf()
# Second, we can use the path and num-rows information
# in parts and stats
parts = self._pp_metadata["parts"]
stats = self._pp_metadata["stats"]
_pp_map = {}
_pp_nrows = []
distinct_files = True
for i, (part, stat) in enumerate(zip(parts, stats)):
if distinct_files:
if isinstance(part, list):
if len(part) > 1:
distinct_files = False
else:
path = part[0]["piece"][0]
_pp_map[path] = i
else:
path = part["piece"][0]
_pp_map[path] = i
_pp_nrows.append(stat["num-rows"])
self._pp_nrows = _pp_nrows
self._pp_map = _pp_map
def to_ddf(self, columns=None, cpu=None):
# Check if we are using cpu or gpu backend
cpu = self.cpu if cpu is None else cpu
backend_engine = CPUParquetEngine if cpu else GPUParquetEngine
# Use dask-dataframe with appropriate engine
metadata_collector = {"stats": [], "parts": []}
dataset_kwargs = {"metadata_collector": metadata_collector}
dataset_kwargs.update(self.dataset_kwargs)
ddf = dd.read_parquet(
self.paths,
columns=columns,
engine=backend_engine,
index=False,
aggregate_files=self.aggregate_files,
filters=self.filters,
split_row_groups=self.row_groups_per_part,
storage_options=self.storage_options,
dataset=dataset_kwargs,
**self.read_parquet_kwargs,
)
self._pp_metadata = metadata_collector
return ddf
def to_cpu(self):
self.cpu = True
def to_gpu(self):
self.cpu = False
def sample_data(self, n=1):
"""Return a real data sample from the Dataset"""
if self._real_meta is not None:
# First check is we already cached a data sample
# while calculating `row_groups_per_part`
_len = len(self._real_meta)
if _len >= n:
if _len == n:
_real_meta = self._real_meta
else:
_real_meta = self._real_meta.take(list(range(n)))
# We can clear self._real_meta, because the data
# will still be cached at the Dataset level
self._real_meta = None
return _real_meta
# Real metadata sample is not cached - Sample from
# the first row-group in the Dataset
return run_on_worker(
_sample_row_group,
self._path0,
self.fs,
cpu=self.cpu,
n=n,
memory_usage=False,
**self.read_parquet_kwargs,
).take(list(range(n)))
def validate_dataset(
self,
add_metadata_file=False,
require_metadata_file=True,
row_group_max_size=None,
file_min_size=None,
):
"""Validate ParquetDatasetEngine object for efficient processing.
The purpose of this method is to validate that the raw dataset
meets the minimal requirements for efficient NVTabular processing.
Warnings are raised if any of the following conditions are not met:
- The raw dataset directory should contain a global "_metadata"
file. If this file is missing, ``add_metadata_file=True`` can
be passed to generate a new one.
- If there is no _metadata file, the parquet schema must be
consistent for all row-groups/files in the raw dataset.
Otherwise, a new _metadata file must be generated to avoid
errors at IO time.
- The row-groups should be no larger than the maximum size limit
(``row_group_max_size``).
- For multi-file datasets, the files should be no smaller than
the minimum size limit (``file_min_size``).
Parameters
-----------
add_metadata_file : bool, default False
Whether to add a global _metadata file to the dataset if one
is missing.
require_metadata_file : bool, default True
Whether to require the existence of a _metadata file to pass
the dataset validation.
row_group_max_size : int or str, default None
Maximum size (in bytes) of each parquet row-group in the
dataset. If None, the minimum of ``self.part_size`` and 500MB
will be used.
file_min_size : int or str, default None
Minimum size (in bytes) of each parquet file in the dataset. This
limit is only applied if there are >1 file in the dataset. If None,
``self.part_size`` will be used.
Returns
-------
valid : bool
`True` if the input dataset is valid for efficient NVTabular
processing.
"""
meta_valid = True # Parquet format and _metadata exists
size_valid = False # Row-group sizes are appropriate
# Check for user-specified row-group size limit.
# Otherwise we use the smaller of the dataset partition
# size and 500MB.
if row_group_max_size is None:
row_group_max_size = min(self.part_size, 500_000_000)
else:
row_group_max_size = parse_bytes(row_group_max_size)
# Check for user-specified file size limit.
# Otherwise we use the smaller of the dataset partition
# size and 500MB.
if file_min_size is None:
file_min_size = self.part_size
else:
file_min_size = parse_bytes(file_min_size)
# Get dataset and path list
pa_dataset = self._legacy_dataset
paths = [p.path for p in pa_dataset.pieces]
root_dir, fns = _analyze_paths(paths, self.fs)
# Collect dataset metadata
metadata_file_exists = bool(pa_dataset.metadata)
schema_errors = defaultdict(set)
if metadata_file_exists:
# We have a metadata file
metadata = pa_dataset.metadata
else:
# No metadata file - Collect manually
metadata = None
for piece, fn in zip(pa_dataset.pieces, fns):
md = piece.get_metadata()
md.set_file_path(fn)
if metadata:
_append_row_groups(metadata, md, schema_errors, piece.path)
else:
metadata = md
# Check for inconsistent schemas.
# This is not a problem if a _metadata file exists
for field in schema_errors:
msg = f"Schema mismatch detected in column: '{field}'."
warnings.warn(msg)
for item in schema_errors[field]:
msg = f"[{item[0]}] Expected {item[1]}, got {item[2]}."
warnings.warn(msg)
# If there is schema mismatch, urge the user to add a _metadata file
if len(schema_errors):
meta_valid = False # There are schema-mismatch errors
# Check that the Dask version supports `create_metadata_file`
if Version(dask.__version__) <= Version("2.30.0"):
msg = (
"\nThe installed version of Dask is too old to handle "
"schema mismatch. Try installing the latest version."
)
warnings.warn(msg)
return meta_valid and size_valid # Early return
# Collect the metadata with dask_cudf and then convert to pyarrow
metadata_bytes = dask_cudf.io.parquet.create_metadata_file(
paths,
out_dir=False,
)
with py_io.BytesIO() as myio:
myio.write(memoryview(metadata_bytes))
myio.seek(0)
metadata = pq.ParquetFile(myio).metadata
if not add_metadata_file:
msg = (
"\nPlease pass add_metadata_file=True to add a global "
"_metadata file, or use the regenerate_dataset utility to "
"rewrite your dataset. Without a _metadata file, the schema "
"mismatch may cause errors at read time."
)
warnings.warn(msg)
# Record the total byte size of all row groups and files
max_rg_size = 0
max_rg_size_path = None
file_sizes = defaultdict(int)
for rg in range(metadata.num_row_groups):
row_group = metadata.row_group(rg)
path = row_group.column(0).file_path
total_byte_size = row_group.total_byte_size
if total_byte_size > max_rg_size:
max_rg_size = total_byte_size
max_rg_size_path = path
file_sizes[path] += total_byte_size
# Check if any row groups are prohibitively large.
# Also check if any row groups are larger than recommended.
if max_rg_size > row_group_max_size:
# One or more row-groups are above the "required" limit
msg = (
f"Excessive row_group size ({max_rg_size}) detected in file "
f"{max_rg_size_path}. Please use the regenerate_dataset utility "
f"to rewrite your dataset."
)
warnings.warn(msg)
else:
# The only way size_valid==True is if we get here
size_valid = True
# Check if any files are smaller than the desired size.
# We only warn if there are >1 files in the dataset.
for path, size in file_sizes.items():
if size < file_min_size and len(pa_dataset.pieces) > 1:
msg = (
f"File {path} is smaller than the desired dataset "
f"partition size ({self.part_size}). Consider using the "
f"regenerate_dataset utility to rewrite your dataset with a smaller "
f"number of (larger) files."
)
warnings.warn(msg)
size_valid = False
# If the _metadata file is missing, we need to write
# it (or inform the user that it is missing)
if not metadata_file_exists:
if add_metadata_file:
# Write missing _metadata file
fs = self.fs
metadata_path = fs.sep.join([root_dir, "_metadata"])
with fs.open(metadata_path, "wb") as fil:
metadata.write_metadata_file(fil)
meta_valid = True
else:
# Inform user that the _metadata file is missing
msg = (
"For best performance with NVTabular, there should be a "
"global _metadata file located in the root directory of the "
"dataset. Please pass add_metadata_file=True to add the "
"missing file."
)
warnings.warn(msg)
if require_metadata_file:
meta_valid = False
# Return True if we have a parquet dataset with a _metadata file (meta_valid)
# and the row-groups and file are appropriate sizes (size_valid)
return meta_valid and size_valid
@classmethod
def regenerate_dataset(
cls,
dataset,
output_path,
columns=None,
file_size=None,
part_size=None,
cats=None,
conts=None,
labels=None,
storage_options=None,
):
"""Regenerate an NVTabular Dataset for efficient processing.
Example Usage::
dataset = Dataset("/path/to/data_pq", engine="parquet")
dataset.regenerate_dataset(
out_path, part_size="1MiB", file_size="10MiB"
)
Parameters
-----------
dataset : Dataset
Input `Dataset` object (to be regenerated).
output_path : string
Root directory path to use for the new (regenerated) dataset.
columns : list[string], optional
Subset of columns to include in the regenerated dataset.
file_size : int or string, optional
Desired size of each output file.
part_size : int or string, optional
Desired partition size to use within regeneration algorithm.
Note that this is effectively the size of each contiguous write
operation in cudf.
cats : list[string], optional
Categorical column list.
conts : list[string], optional
Continuous column list.
labels : list[string], optional
Label column list.
storage_options : dict, optional
Storage-option kwargs to pass through to the `fsspec` file-system
interface.
Returns
-------
result : int or Delayed
If `compute=True` (default), the return value will be an integer
corresponding to the number of generated data files. If `False`,
the returned value will be a `Delayed` object.
"""
# Specify ideal file size and partition size
row_group_size = 128_000_000
file_size = parse_bytes(file_size) or row_group_size * 100
part_size = parse_bytes(part_size) or row_group_size * 10
part_size = min(part_size, file_size)
fs, _, _ = get_fs_token_paths(output_path, mode="wb", storage_options=storage_options)
# Start by converting the original dataset to a Dask-Dataframe
# object in CPU memory. We avoid GPU memory in case the original
# dataset is prone to OOM errors.
_ddf = dataset.engine.to_ddf(columns=columns, cpu=True)
# Prepare general metadata (gmd)
gmd = {}
cats = cats or []
conts = conts or []
labels = labels or []
if not len(cats + conts + labels):
warnings.warn(
"General-metadata information not detected! "
"Please pass lists for `cats`, `conts`, and `labels` as"
"arguments to `regenerate_dataset` to ensure a complete "
"and correct _metadata.json file."
)
col_idx = {str(name): i for i, name in enumerate(_ddf.columns)}
gmd["cats"] = [{"col_name": c, "index": col_idx[c]} for c in cats]
gmd["conts"] = [{"col_name": c, "index": col_idx[c]} for c in conts]
gmd["labels"] = [{"col_name": c, "index": col_idx[c]} for c in labels]
# Get list of partition lengths
token = tokenize(
dataset,
output_path,
columns,
part_size,
file_size,
cats,
conts,
labels,
storage_options,
)
getlen_name = "getlen-" + token
name = "all-" + getlen_name
dsk = {(getlen_name, i): (len, (_ddf._name, i)) for i in range(_ddf.npartitions)}
dsk[name] = [(getlen_name, i) for i in range(_ddf.npartitions)]
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[_ddf])
size_list = Delayed(name, graph).compute()
# Get memory usage per row using first partition
p0_mem_size = _ddf.partitions[0].memory_usage(deep=True, index=True).sum().compute()
mem_per_row = int(float(p0_mem_size) / float(size_list[0]))
# Determine the number of rows to assign to each output partition
# and the number of output partitions to assign to each output file
rows_per_part = int(part_size / mem_per_row)
parts_per_file = int(file_size / part_size)
# Construct re-partition graph
dsk2 = {}
repartition_name = "repartition-" + token
split_name = "split-" + repartition_name
getitem_name = "getitem-" + repartition_name
gets = defaultdict(list)
out_parts = 0
remaining_out_part_rows = rows_per_part
for i, in_part_size in enumerate(size_list):
# The `split` dictionary will be passed to this input
# partition to dictate how that partition will be split
# into different output partitions/files. The "key" of
# this dict is the output partition, and the value is a
# tuple specifying the (start, end) row range.
split = {}
last = 0
while in_part_size >= remaining_out_part_rows:
gets[out_parts].append(i)
split[out_parts] = (last, last + remaining_out_part_rows)
last += remaining_out_part_rows
in_part_size = in_part_size - remaining_out_part_rows
remaining_out_part_rows = rows_per_part
out_parts += 1
if in_part_size:
gets[out_parts].append(i)
split[out_parts] = (last, last + in_part_size)
remaining_out_part_rows -= in_part_size
if remaining_out_part_rows == 0:
remaining_out_part_rows = rows_per_part
out_parts += 1
dsk2[(split_name, i)] = (_split_part, (_ddf._name, i), split)
npartitions = max(gets) + 1
for k, v_list in gets.items():
last = None
_concat_list = []
for v in v_list:
key = (getitem_name, v, k)
_concat_list.append(key)
dsk2[key] = (operator.getitem, (split_name, v), k)
ignore_index = True
dsk2[(repartition_name, k)] = (_concat, _concat_list, ignore_index)
graph2 = HighLevelGraph.from_collections(repartition_name, dsk2, dependencies=[_ddf])
divisions = [None] * (npartitions + 1)
_ddf2 = new_dd_object(graph2, repartition_name, _ddf._meta, divisions)
# Make sure the root directory exists
fs.mkdirs(output_path, exist_ok=True)
# Construct rewrite graph
dsk3 = {}
rewrite_name = "rewrite-" + token
write_data_name = "write-data-" + rewrite_name
write_metadata_name = "write-metadata-" + rewrite_name
inputs = []
final_inputs = []
for i in range(_ddf2.npartitions):
index = i // parts_per_file
nex_index = (i + 1) // parts_per_file
package_task = (index != nex_index) or (i == (_ddf2.npartitions - 1))
fn = f"part.{index}.parquet"
inputs.append((repartition_name, i))
if package_task:
final_inputs.append((write_data_name, i))
dsk3[(write_data_name, i)] = (
_write_data,
inputs,
output_path,
fs,
fn,
)
inputs = []
# Final task collects and writes all metadata
dsk3[write_metadata_name] = (
_write_metadata_file,
final_inputs,
fs,
output_path,
gmd,
)
graph3 = HighLevelGraph.from_collections(write_metadata_name, dsk3, dependencies=[_ddf2])
return Delayed(write_metadata_name, graph3)
def _write_metadata_file(md_list, fs, output_path, gmd_base):
# Prepare both "general" and parquet metadata
gmd = gmd_base.copy()
pmd = {}
data_paths = []
file_stats = []
for m in md_list:
for path in m.keys():
md = m[path]["md"]
rows = m[path]["rows"]
pmd[path] = md
data_paths.append(path)
fn = path.split(fs.sep)[-1]
file_stats.append({"file_name": fn, "num_rows": rows})
gmd["data_paths"] = data_paths
gmd["file_stats"] = file_stats
# Write general metadata file
GPUParquetWriter.write_general_metadata(gmd, fs, output_path)
# Write specialized parquet metadata file
GPUParquetWriter.write_special_metadata(pmd, fs, output_path)
# Return total file count (sanity check)
return len(data_paths)
def _write_data(data_list, output_path, fs, fn):
# Initialize chunked writer
path = fs.sep.join([output_path, fn])
writer = pwriter_cudf(path, compression=None)
rows = 0
# Loop over the data_list, convert to cudf,
# and append to the file
for data in data_list:
rows += len(data)
writer.write_table(cudf.from_pandas(data))
# Return metadata and row-count in dict
return {fn: {"md": writer.close(metadata_file_path=fn), "rows": rows}}
class BaseParquetWriter(ThreadedWriter):
def __init__(self, out_dir, suffix=".parquet", **kwargs):
super().__init__(out_dir, **kwargs)
self.data_files = []
self.data_bios = []
self._lock = threading.RLock()
self.pwriter = self._pwriter
self.pwriter_kwargs = {}
self.suffix = suffix
@property
def _pwriter(self):
"""Returns ParquetWriter Backend Class"""
raise (NotImplementedError)
def _read_parquet(self, source):
"""Read parquet data from source"""
raise (NotImplementedError)
def _to_parquet(self, df, sink):
"""Write data to parquet and return pq metadata"""
raise (NotImplementedError)
def _get_filename(self, i):
if self.fns:
fn = self.fns[i]
elif self.use_guid:
fn = f"{i}.{guid()}{self.suffix}"
else:
fn = f"{i}{self.suffix}"
return os.path.join(self.out_dir, fn)
def _append_writer(self, path, schema=None, add_args=None, add_kwargs=None):
# Add additional args and kwargs
_args = add_args or []
_kwargs = tlz.merge(self.pwriter_kwargs, add_kwargs or {})
if self.bytes_io:
bio = py_io.BytesIO()
self.data_bios.append(bio)
self.data_writers.append(self.pwriter(bio, *_args, **_kwargs))
else:
f = fsspec.open(path, mode="wb").open()
self.data_files.append(f)
self.data_writers.append(self.pwriter(f, *_args, **_kwargs))
def _get_or_create_writer(self, idx, schema=None):
# lazily initializes a writer for the given index
with self._lock:
while len(self.data_writers) <= idx:
# Append writer
path = self._get_filename(len(self.data_writers))
self.data_paths.append(path)
self._append_writer(path, schema=schema)
return self.data_writers[idx]
def _write_table(self, idx, data):
"""Write data"""
raise (NotImplementedError)
def _write_thread(self):
while True:
item = self.queue.get()
try:
if item is self._eod:
break
idx, data = item
with self.write_locks[idx]:
self._write_table(idx, data)
finally: