New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Webdataset reader operator implementation #3306
Conversation
std::unordered_map<std::string, std::vector<size_t>> | ||
ext_map_; // maps an extension to sample indicies | ||
MissingExt missing_component_behavior_; | ||
std::vector<DALIDataType> dtype_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::vector<DALIDataType> dtype_; | |
std::vector<DALIDataType> dtypes_; |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright, wasn't sure about naming it
"Invalid value for missing_component_behavior"); | ||
|
||
for (auto& component_dtype : dtype_) { | ||
DALI_ENFORCE(kSupportedTypes.find(component_dtype) != kSupportedTypes.end(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DALI_ENFORCE(kSupportedTypes.find(component_dtype) != kSupportedTypes.end(), | |
DALI_ENFORCE(kSupportedTypes.count(component_dtype), |
This pull request introduces 3 alerts when merging ffae5a2482858e95f9907eba273db68621a2dc2a into 5403edd - view on LGTM.com new alerts:
|
std::cout << "TEST" << std::endl; | ||
std::cout << "TEST" << std::endl; | ||
std::cout << "TEST" << std::endl; | ||
std::cout << "TEST" << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some tests, stuff, that's why it's still WIP
tools/wds2idx.py
Outdated
#if member.type != tarfile.REGTYPE or member.name.startswith('.'): | ||
# last_skipped = member.offset | ||
# continue | ||
#last_skipped = self.farchive.fileobj.tell() | ||
#basename, extension = IndexCreator.split_name(member.name) | ||
#offset = member.offset | ||
#if not data or data[-1][0] != basename: | ||
# data.append((offset, [extension])) | ||
#else: | ||
# data[-1][1].append(extension) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also debugging stuff
tools/wds2idx.py
Outdated
creator.close() | ||
|
||
if __name__ == '__main__': | ||
main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a newline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright
std::unordered_map<std::string, std::vector<size_t>> | ||
ext_map_; // maps an extension to sample indicies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::unordered_map<std::string, std::vector<size_t>> | |
ext_map_; // maps an extension to sample indicies | |
std::unordered_map<std::string, std::vector<size_t>> ext_map_; // maps an extension to sample indicies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100 character limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then how about:
std::unordered_map<std::string, std::vector<size_t>> | |
ext_map_; // maps an extension to sample indicies | |
// maps an extension to sample indicies | |
std::unordered_map<std::string, std::vector<size_t>> ext_map_; |
?
struct SampleConfig { | ||
int64_t start_offset; | ||
int64_t end_offset; | ||
std::set<std::string> extensions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can each sample have different set of extensions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
|
||
DALI_ENFORCE(uris_.size() == configs_.size(), | ||
"Number of uris does not match the number of config files"); | ||
DALI_ENFORCE(uris_.size() == dtype_.size(), "Number of uris does not match the number of types"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand uris are index files. So why you need to specify dtype for each index file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that is a bug, should be ext
dtype_ = spec.HasArgument("dtype") ? spec.GetRepeatedArgument<DALIDataType>("dtype") : | ||
std::vector<DALIDataType>(uris_.size(), DALI_UINT8); | ||
|
||
DALI_ENFORCE(uris_.size() == configs_.size(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the config file is for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's the index file with offsets for all the samples and the extensions of components in that specific sample
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you have index and config file side by side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, the index file is the config file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just thought that "configs" variable would sound better than "indexes"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for indexes
(indices
?), file_maps
or something along these lines. "Config" sounds more like some kind of setting that's there beside the contents of the file and which can be changed independently, whereas the index is determined exactly by the file contents.
for (int data_idx = 0; data_idx < num_samples; data_idx++) { | ||
auto& sample = GetSample(data_idx); | ||
for (int output_idx = 0; output_idx < num_outputs; output_idx++) { | ||
ws.OutputRef<CPUBackend>(output_idx)[data_idx].ShareData(&sample[output_idx]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if it is safe.
ReadOne
from the loader.h returns LoadTargetSharedPtr
, which is stored in prefetched_batch_queue_
.
GetSample
returns a *LoadTargetSharedPtr
.
When you continue reading prefetched_batch_queue_
is trashed, LoadTargetSharedPtr
calls its custom deleter which moves underlying tensor to empty_tensors_
(see).
So sharing memory with something what can be trashed doesn't sound like a safe thing to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's how all the other readers fetch their data, I just used it analogously
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think other readers do parsing and do not directly share the tensors they get from GetSample
(at least I cannot recall nay example now). But I can be wrong...
shared_tensor_data, size, {size / static_cast<int64_t>(component_dtype_info.size())}, | ||
component_dtype_info); | ||
} | ||
sample_was_set[component_index] = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it break
here or keep looping and share over and over the same shared_tensor_data
with different sample[component_index]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it should
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add it for the readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add what? break would mean a behavior that we would not want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mhh, so why we share the same memory with different sample[component_index]
? I would expect to get a different piece every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, because what ext_map_ does is it maps a specific extension to which indicies of samples it should go to, so those components are actually meant to share the same data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having second thought about that. How often we want components to share the underlying memory?
Would be something like:
["a.a;a.b;a.a;a.b"],
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well that's just one output so no it wouldn't
// Check in case of encountering an unneeded entry | ||
const std::string extension = GetExtension(current_wds_shard.GetFileName()); | ||
if (ext_map_.find(extension) == ext_map_.end()) { | ||
DALI_ENFORCE(current_wds_shard.NextFile(), "Index file reporting a file longer than actual"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please adjust the error message. It would be good to print filename and offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright
while (current_wds_shard.TellArchive() < current_sample.end_offset) { | ||
// Check in case of encountering a tar entry that is not a file | ||
if (current_wds_shard.GetFileType() != detail::TarArchive::ENTRY_FILE) { | ||
DALI_ENFORCE(current_wds_shard.NextFile(), "Index file reporting a file longer than actual"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to print filename and offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright
|
||
// initializing all the readers | ||
for (auto& uri : uris_) { | ||
wds_shards_.emplace_back(FileStream::Open(uri, read_ahead_, !dont_use_mmap_)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to keep all files open? In some edge cases I'm afraid we can run over the open file descriptors limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, because technically the way things are implemented does allow for the loader interface to change the argument for reset (even though it's not implemented like that at the moment), so yes that does necessitate keeping all of them open. What I could do would be to open them only when I use them, but that would also mean getting rid of potential caching implemented in that specific file stream later on (for example in the case of the web accessed tar files)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFRecordReader and MXNet reader opens on demand.
Do you have any particular caching in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the one that the implementation of the web reader will probably do. It's just that I don't want to assume anything about the underlying implementation of FileStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more thing:
wds_shards_.emplace_back(FileStream::Open(uri, read_ahead_, !dont_use_mmap_)); | |
wds_shards_.emplace_back(FileStream::Open(uri, read_ahead_, !copy_read_data_);)); |
if (stick_to_shard_) { | ||
current_wds_shard_index_ = first_wds_shard_index_; | ||
current_sample_index_ = first_sample_index_; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (stick_to_shard_) { | |
current_wds_shard_index_ = first_wds_shard_index_; | |
current_sample_index_ = first_sample_index_; | |
} | |
current_wds_shard_index_ = first_wds_shard_index_; | |
current_sample_index_ = first_sample_index_; |
stick_to_shard_
matters only when Reset
is called. Here it should be irrelevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't since reset is not called after the first looping over. Other implementations solve that like that as well.
} | ||
|
||
// initializing the first reader | ||
if (stick_to_shard_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
for (detail::TarArchive& wds_shard : wds_shards_) { | ||
wds_shard.SeekArchive(0); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L184 handles that. This is not needed here I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed in the current changes, will push it tomorrow
This pull request introduces 2 alerts when merging fb46bf71278f21e65f940714c9e1cf97e7919cb5 into f05e931 - view on LGTM.com new alerts:
|
This pull request introduces 2 alerts when merging 53fa3a0c7a2cfeb8c24be21271d75be3d585d89c into f1a61b6 - view on LGTM.com new alerts:
|
53fa3a0
to
aeaa897
Compare
This pull request introduces 2 alerts when merging aeaa897219dd007e7173b9308919c1c5eb829271 into a49640d - view on LGTM.com new alerts:
|
This pull request introduces 2 alerts when merging 6bf084fb591809517f2b49cc15da47cc06e9578c into 948ccb7 - view on LGTM.com new alerts:
|
This pull request introduces 2 alerts when merging 7301b1a733fb84ddb8811de24278f0e6c58b133d into 948ccb7 - view on LGTM.com new alerts:
|
This pull request introduces 2 alerts when merging cc4cf9456bf53d44185a0f8c597fa71121fc408d into 948ccb7 - view on LGTM.com new alerts:
|
CI MESSAGE: [2990240]: BUILD STARTED |
CI MESSAGE: [2987769]: BUILD FAILED |
CI MESSAGE: [2990240]: BUILD FAILED |
CI MESSAGE: [2990240]: BUILD PASSED |
Signed-off-by: Bartłomiej Cieślar <bcieslar2001@gmail.com>
!build |
CI MESSAGE: [2993471]: BUILD STARTED |
CI MESSAGE: [2993471]: BUILD PASSED |
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Implementation of nvidia.dali.fn.readers.webdataset
Description
What happened in this PR
This PR adds a reader operator for loading tar-based webdatasets.
Additional information
Checklist
Tests
Documentation
DALI team only
Requirements
REQ IDs: RDWDS.01, RDWDS.02, RDWDS.03, RDWDS.04, RDWDS.05, RDWDS.06, RDWDS.07, RDWDS.08, RDWDS.09, RDWDS.10, RDWDS.11, RDWDS.12, RDWDS.13, RDWDS.20
JIRA TASK: DALI-2231