Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-add iceberg bounded source; test splitting #30805

Merged
merged 3 commits into from Apr 12, 2024

Conversation

kennknowles
Copy link
Member

Currently we have failing in the exhaustive splitting, but I would also like feedback early.

This is exactly #30797 plus one commit, so if you click on just the last commit you should be able to see just the read/source diff


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@kennknowles
Copy link
Member Author

R: @chamikaramj

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@kennknowles
Copy link
Member Author

The failure is in the basic reading of data - none is read. I'll try to grok things and see about that. Still interested in commentary on the approaches here.

@chamikaramj chamikaramj self-requested a review April 1, 2024 17:48
Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

};
}

public static class Write<ElementT, DestinationT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that this also has Write stuff but I'm ignoring those for this review.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing. I will revert any added write stuff in this PR.

import org.apache.iceberg.data.Record;
import org.checkerframework.checker.nullness.qual.NonNull;

public class IcebergIO {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we are missing the Read transform ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do soon. I realize all the tests are a vanilla Read.from(IcebergBoundedSource)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import org.apache.iceberg.io.CloseableIterable;
import org.checkerframework.checker.nullness.qual.Nullable;

public class IcebergBoundedSource extends BoundedSource<Row> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should use SDF but I think it would suffice to add a TODO to convert this to an SDF in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

TableScan tableScan = table().newScan();

if (desiredBundleSizeBytes > 0) {
tableScan = tableScan.option(TableProperties.SPLIT_SIZE, "" + desiredBundleSizeBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.valueOf(desiredBundleSizeBytes)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
break;
case BATCH:
// TODO: Add batch scan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fail here to prevent data loss.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

InputFilesDecryptor decryptor =
checkStateNotNull(this.decryptor, "decryptor null in adance() - did you call start()?");

// This is a lie, but the most expedient way to work with IcebergIO's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update/remove comment ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I added this comment to explain why the type was fake. I reworded it.

@Nullable FileScanTask fileTask = null;
while (!files.isEmpty() && fileTask == null) {
fileTask = files.remove();
if (fileTask.isDataTask()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we skipping data tasks ? Was this supposed to be if (!fileTask.isDataTask()) ?

Seems like DataTasks contain actual data: https://iceberg.apache.org/javadoc/0.11.0/org/apache/iceberg/DataTask.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was obsolete code. The issue is that ScanTask is a oneof and this is checking a oneof that it is not.

private Row convert(Record record) {
Row.Builder b = Row.withSchema(schema);
for (int i = 0; i < schema.getFieldCount(); i++) {
// TODO: A lot obviously
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha it means that this was a fake implementation. I added conversions.

@codecov-commenter
Copy link

codecov-commenter commented Apr 9, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 71.47%. Comparing base (b018c25) to head (21e0a47).
Report is 19 commits behind head on master.

❗ Current head 21e0a47 differs from pull request most recent head a169e6a. Consider uploading reports for the commit a169e6a to get more accurate results

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #30805      +/-   ##
============================================
+ Coverage     70.96%   71.47%   +0.51%     
============================================
  Files          1257      710     -547     
  Lines        140931   104815   -36116     
  Branches       4307        0    -4307     
============================================
- Hits         100007    74915   -25092     
+ Misses        37444    28268    -9176     
+ Partials       3480     1632    -1848     
Flag Coverage Δ
java ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@kennknowles kennknowles marked this pull request as ready for review April 10, 2024 01:03
@github-actions github-actions bot added the build label Apr 10, 2024
@kennknowles kennknowles force-pushed the iceberg-read branch 2 times, most recently from b53e022 to 0b5e391 Compare April 10, 2024 01:10
@kennknowles
Copy link
Member Author

Looks like there is something broke in the GHA workflow FYI just so you don't wait for that to go green. I pushed some possible fixes up for that too but I'm not sure if in this case the workflow comes from master or the PR.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

case BATCH:
throw new UnsupportedOperationException("BATCH scan not supported");
}
return splits;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if the sources produced at line 104 above get re-split by the runner ?

If such sources cannot be re-split, we should have a trivial case where we just return the original source to prevent data loss.

}

@Override
public BoundedSource<Row> getCurrentSource() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to also implement getFractionConsumed to support progress reporting in a meaningful way ?

I think it will be very useful for autoscaling when using this with Dataflow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be in a future PR since we want to get this in by release cut.


testPipeline.run();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add a test for splitting using SourceTestUtils.assertSourcesEqualReferenceSource.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@kennknowles kennknowles force-pushed the iceberg-read branch 4 times, most recently from 19037fb to a5b995a Compare April 11, 2024 18:05
@github-actions github-actions bot removed the build label Apr 11, 2024
}
break;
case BATCH:
throw new UnsupportedOperationException("BATCH scan not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fail for the default path prevent dataloss (or just return the original source if that can be read directly).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@chamikaramj
Copy link
Contributor

LGTM other than handling the re-splitting case above.

Copy link
Member Author

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for double-splitting. To make splitting behavior clearer, I factored into two different kinds of sources.

}
break;
case BATCH:
throw new UnsupportedOperationException("BATCH scan not supported");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

@kennknowles kennknowles merged commit 96dc16a into apache:master Apr 12, 2024
23 checks passed
@kennknowles kennknowles deleted the iceberg-read branch April 13, 2024 00:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants