Core: Add v4 TrackedFileAdapters to bridge Data/Delete Files#16100
Core: Add v4 TrackedFileAdapters to bridge Data/Delete Files#16100anoopj wants to merge 7 commits intoapache:mainfrom
Conversation
…leteFile APIs This adapter would allow to minimize the v4 related code changes during scan planning and commits.
| return new TrackedDeleteFile(file, spec); | ||
| } | ||
|
|
||
| // TODO: TrackedFile will likely get an explicit partition tuple field (using a union partition |
There was a problem hiding this comment.
This will change after the approach to store partition tuple is settled.
| return result.isEmpty() ? null : result; | ||
| } | ||
|
|
||
| static Map<Integer, Long> nullValueCounts(ContentStats stats) { |
There was a problem hiding this comment.
An open question is whether it's worth caching the stats (lazy/eager). I don't see a lot of repeated reads, so may not be worth it.
There was a problem hiding this comment.
It's probably fine. I have a comment about this below.
|
|
||
| private TrackedFileAdapters() {} | ||
|
|
||
| static DataFile asDataFile(TrackedFile file, PartitionSpec spec) { |
There was a problem hiding this comment.
What was your reason not to make this a method of TrackedFile?
Also, PartitionSpec is tracked by the file itself, so is very strange to pass it in here. At a minimum, I would expect this to have a validation that the spec's ID matches the file's spec ID. But it would also be better to have some way to look up spec by ID instead of forcing the caller to do it and then validate that the caller did it correctly.
There was a problem hiding this comment.
I intentionally kept the adapters outside of TrackedFile to avoid coupling it with Data/DeleteFile. It seemed like an adapter concern.
The tracked file keeps track of the spec ID, but not the spec itself. I saw some v3 code paths that followed a similar pattern of passing around specsById. Pretty much open to changing it if you have suggestions.
There was a problem hiding this comment.
I changed the methods to take in specsById map instead.
|
|
||
| @Override | ||
| public Integer sortOrderId() { | ||
| return null; |
There was a problem hiding this comment.
Good context from the spec for a comment:
Position deletes are required to be sorted by file and position, not a table order, and should set sort order id to null
There was a problem hiding this comment.
Maybe note that this is from the spec.
| // each reported the full file size). | ||
| @Override | ||
| public long fileSizeInBytes() { | ||
| return dv.sizeInBytes(); |
There was a problem hiding this comment.
I appreciate the comment. The decision here seems reasonable to me since we don't know the total Puffin file size.
We should also consider whether we want to have a field in the tracked_file struct for this. We originally wanted to use file_size_in_bytes for the Puffin file size so that we could determine when DVs should be compacted. But variance in the footer was a problem that prevented it from being used as intended.
@aokolnychyi should we revisit this?
There was a problem hiding this comment.
even if we know the total Puffin file size, dv.sizeInBytes still seems the correct value here. A puffin file may contain thousands of DVs. A logical DeleteFile should only contain one DV. The DeleteFile size should be the DV size.
|
|
||
| @Override | ||
| public Long firstRowId() { | ||
| return tracking != null ? tracking.firstRowId() : null; |
There was a problem hiding this comment.
From the spec:
The value of
first_row_idfor delete manifests is alwaysnull.
The value of
first_row_idfor delete files is alwaysnull.
This should just be null.
I also wonder if there are some implementations that can be refactored out, to avoid duplication between this and the other adapters? It seems like many of these should be the same as for data file and for v2 delete files that haven't been compacted into DVs yet.
There was a problem hiding this comment.
This should just be null.
Great callout. Fixed.
There was a problem hiding this comment.
I'll add an abstract based class to reduce the duplication.
|
|
||
| @Override | ||
| public Map<Integer, ByteBuffer> lowerBounds() { | ||
| return TrackedFileAdapters.lowerBounds(file.contentStats()); |
There was a problem hiding this comment.
This is okay, but a little concerning because the helper method here creates a new map every time it is called. Creating the map is an expensive operation because it allocates buffers to hold each column bound and serializes into that buffer. Then the map is thrown away rather than reused.
The evaluators themselves (for example, InclusiveMetricsEvaluator) only call these methods once to evaluate for the data file, but if multiple evaluators are used then we should expect a performance degredation.
On the other hand, we want to have evaluators that work directly with ContentStats instead of going through these methods. I think I'm fine with leaving this as-is for now, but we need to make sure that we use the right evaluators everywhere.
There was a problem hiding this comment.
Agree. Will build a new evaluator that operates on content stats, as a followup. cc @nastra
There was a problem hiding this comment.
Let's hold off on doing it right now. I think the ContentStats API is going to change.
| return new TrackedDataFile(file, spec); | ||
| } | ||
|
|
||
| static DeleteFile asDVDeleteFile(TrackedFile file, PartitionSpec spec) { |
There was a problem hiding this comment.
We will also need a way to wrap TrackedFile as a v2 DeleteFile for position deletes.
There was a problem hiding this comment.
Ack. I will create a followup PR for this so that the size is manageable.
| private final Tracking tracking; | ||
| private final PartitionSpec spec; | ||
|
|
||
| private AbstractTrackedContentFile(TrackedFile file, PartitionSpec spec) { |
There was a problem hiding this comment.
Rather than Abstract, we typically use Base for the prefix. For example, BaseAction or BaseContentStats.
There was a problem hiding this comment.
I think we can even drop the Base here with just TrackedContentFile, because ContentFile is a base class. This is also consistent with other classes like TrackedDataFile, TrackedDVDeleteFile.
|
|
||
| @Override | ||
| public int specId() { | ||
| return spec.specId(); |
There was a problem hiding this comment.
If the spec ID is set on file then it should be returned.
The last version was this:
return file.specId() != null ? file.specId() : 0;The problem wasn't that it was returning file.specId(). When that spec ID is set, it is canonical and is the ID that was used to look up spec. The problem was that it was guessing ID 0 for the unpartitioned spec, which is not correct. The updated version should be this:
return file.specId() != null ? file.specId() : spec.specId();| * <p>Subclasses provide {@code content()}, {@code firstRowId()}, {@code equalityFieldIds()}, and | ||
| * the copy methods. | ||
| */ | ||
| private abstract static class AbstractTrackedContentFile<F extends ContentFile<F>> |
There was a problem hiding this comment.
It would be helpful to implement the version for v2 position deletes here as well. That would make it easier to evaluate the implementations here, although I suspect it will be fine.
|
|
||
| @Override | ||
| public Long pos() { | ||
| return tracking != null ? tracking.manifestPos() : null; |
There was a problem hiding this comment.
I wonder if the methods that delegate to Tracking could be shared through another base class.
|
|
||
| @Override | ||
| public int specId() { | ||
| return spec.specId(); |
There was a problem hiding this comment.
Needs to delegate to file.specId() as well.
| private TrackedFileAdapters() {} | ||
|
|
||
| static DataFile asDataFile(TrackedFile file, Map<Integer, PartitionSpec> specsById) { | ||
| Preconditions.checkState( |
There was a problem hiding this comment.
nit: checkState is for invariants on internal state; checkArgument is the right call here since file.contentType() is essentially validating the input. Same applies to asDVDeleteFile (the two checkState calls on L54/L58) and asEqualityDeleteFile (L64).
|
|
||
| @Override | ||
| public DeleteFile copy() { | ||
| return new TrackedDVDeleteFile(file.copy(), spec); |
There was a problem hiding this comment.
The DV adapter never exposes the underlying TrackedFile's content stats — valueCounts() / lowerBounds() / etc. all return null directly. So file.copy() here retains content stats that will never be read through this adapter.
Using file.copyWithoutStats() would let copy callers drop them at the source and avoid retaining the dead weight when many DVs are held in memory. Same applies to the other copy variants below.
| tracking.set(3, 11L); | ||
| tracking.set(5, 1000L); | ||
| tracking.setManifestLocation("s3://bucket/manifest.avro"); | ||
| tracking.set(8, 7L); |
There was a problem hiding this comment.
switch to the builder added recently?
| private final Tracking tracking; | ||
| private final PartitionSpec spec; | ||
|
|
||
| private AbstractTrackedContentFile(TrackedFile file, PartitionSpec spec) { |
There was a problem hiding this comment.
I think we can even drop the Base here with just TrackedContentFile, because ContentFile is a base class. This is also consistent with other classes like TrackedDataFile, TrackedDVDeleteFile.
| } | ||
| } | ||
|
|
||
| /** Adapts a TrackedFile EQUALITY_DELETES entry to the {@link DeleteFile} interface. */ |
There was a problem hiding this comment.
Is this class only for EQUALITY_DELETES, as the content() method override indicates? if yes, it can probably names as TrackedEqualityDeleteFile
Just to confirm my understanding. This is for new V4 manifest files, where v2 position delete file entries won't exist.
| // each reported the full file size). | ||
| @Override | ||
| public long fileSizeInBytes() { | ||
| return dv.sizeInBytes(); |
There was a problem hiding this comment.
even if we know the total Puffin file size, dv.sizeInBytes still seems the correct value here. A puffin file may contain thousands of DVs. A logical DeleteFile should only contain one DV. The DeleteFile size should be the DV size.
The adapter bridges TrackedFile to existing DataFile/DeleteFile APIs and would allow to minimize the v4 related code changes during scan planning and commits.