Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Integration to read from Snapshot ref #5150

Merged
merged 24 commits into from Nov 9, 2022

Conversation

namrathamyske
Copy link
Contributor

@namrathamyske namrathamyske commented Jun 28, 2022

Issue adressed: #3899

This PR provides a way to spark query using snapshot ref

@namrathamyske namrathamyske changed the title Spark integration ref Spark 3.2 Integration to read from Snapshot ref Jun 28, 2022
@namrathamyske
Copy link
Contributor Author

@amogh-jahagirdar Took a pull from your PR. let me know if my commit 8132d20 looks good.

@namrathamyske namrathamyske marked this pull request as ready for review August 5, 2022 22:55
@namrathamyske namrathamyske changed the title Spark 3.2 Integration to read from Snapshot ref Spark Integration to read from Snapshot ref Aug 5, 2022

@Test
public void testSnapshotSelectionByRef() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to polish these testcases!


@Test
public void testSnapshotSelectionByRef() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to polish these testcases!

@amogh-jahagirdar
Copy link
Contributor

amogh-jahagirdar commented Aug 7, 2022

@namrathamyske Thanks for this!
High level comment, I think we should separate the API and the Spark integration changes. Also at the API level, I think it makes sense to separate useBranch and useTag, rather than having one useSnapshotRef , because branching can be combined with time travel, but tagging cannot be; although that could just be an implementation detail we handle. Semantically from an API perspective though it seems cleaner to separate the 2.

Let me know what you think. Checkout this thread #4428 (comment)

I have this PR #5364 for branching + time travel, I think we could do a separate one for tagging.

* @return a new scan based on this with the given snapshot Ref
* @throws IllegalArgumentException if the snapshot cannot be found
*/
TableScan useSnapshotRef(String snapshotRef);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this this should be useRef(String branchOrTagName). The term SnapshotRef is internal and I don't think it should be exposed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to separate the useBranch and useTag APIs. As you said, refs are internal. From a Spark user perspective we also want to only expose the branch/tag terms; imo I think the same case could be applied to the API level. Also considering branches can be combined with time travel we could do a separate API for that ; although there's an argument to be made to just combine useBranch + as Of Time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I considered that as well. The problem is that the caller doesn't know whether the ref is a tag or a branch before calling the method. That's determined when we look at table metadata and we don't want to force the caller to do that.

There may be a better name than "ref" for useRef. That seems like the problem to me. Maybe we could simplify it to use? I'm not sure that's obvious enough.

@aokolnychyi, do you have any thoughts on the name here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue @amogh-jahagirdar I agree that we can use a common API for tag or branch like useRef.

We have two signatures:

useRef(String refName)

useRef(String refName, Long timeStampMillis) -> will throw exception for tag type, since we cant do time travel for tag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this sounds reasonable. The only thing is I think if we do useRef (or if we come up with a better name) then we would not want to have the useRef(String refName, Long timeStampMillis). A user would chain it with the existing useTimestamp and then the validation that it's a branch would happen in the scan context.useRef().asOfTime() I don't think we would want the extra method because time travel would only apply for branches so having the ref in that case doesn't make sense to me since it's really only supported for 1 ref type, the branch.

If we have consensus on this, then I can update https://github.com/apache/iceberg/pull/5364/files with the updated approach. Then this, PR could be focused on the Spark side of the integration. Will wait to hear what @aokolnychyi suggests as well!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will throw exception for tag type, since we cant do time travel for tag.

In that case I would suggest:

  • useRef(String refName)
  • useBranchAsOfTime(String branchName, Long timeStampMillis)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see the alternative is just .useRef(refName).asOfTime(timestampMillis). That also works, in that case +1 for useRef(String refName)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like there is consensus for useRef.

@namrathamyske
Copy link
Contributor Author

@rdblue @amogh-jahagirdar Thanks for your review! working on changes for this!

@@ -226,4 +226,88 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException {
.hasMessageContaining("Cannot specify both snapshot-id")
.hasMessageContaining("and as-of-timestamp");
}

@Test
public void testSnapshotSelectionByTag() throws IOException {
Copy link
Contributor

@rdblue rdblue Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need tests to show that branch and tag options can't be used at the same time, and tests to validate what happens when snapshot or timestamp are set along with branch or tag. It should be easy to make a few tests for those error cases.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @namrathamyske! This looks close.

// branch ref of the table snapshot to read from
public static final String BRANCH = "branch";

// tag ref of the table snapshot to read from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit: I think for the comments we can leave off the "table snapshot" and "ref" part and the comment could look something like

"Tag to read from"
"Branch to read from"

@github-actions github-actions bot removed the API label Oct 20, 2022
@namrathamyske
Copy link
Contributor Author

namrathamyske commented Oct 23, 2022

I came across duplicate implementations of using timestamp option in

and
Matcher at = AT_TIMESTAMP.matcher(ident.name());
. I am unsure of how to proceed for branches and tags usecase. Just making changes to read from branch/tag in SparkScanBuilder worked before for previous versions of spark - 3.1, 3.2. But for 3.3 , properties get overridden in SparkCatalog.java before reaching SparkScanBuilder.java. @rdblue @amogh-jahagirdar any thoughts appreciated.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namrathamyske I checked out your branch and debugged, see my comment for what the bug is; after the fix the tests should start passing (verified locally)

Comment on lines 225 to 229
if (branch != null) {
scan.useRef(branch);
} else if (tag != null) {
scan.useRef(tag);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue is that scan.useRef(tag) by itself doesn't update the snapshot in the context on the existing scan; it's a builder like pattern so it needs to be scan = scan.useRef(ref). That's why the tests are failing; the context snapshot isn't set, so by default the scan reads main.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad missed this! Thanks for pointing out!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem!

@@ -270,6 +282,8 @@ && readSchema().equals(that.readSchema())
&& Objects.equals(startSnapshotId, that.startSnapshotId)
&& Objects.equals(endSnapshotId, that.endSnapshotId)
&& Objects.equals(asOfTimestamp, that.asOfTimestamp);
// && Objects.equals(branch, that.branch)
// && Objects.equals(tag, that.tag);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have to uncomment this , but getting a checkstyle cyclomatic complexity error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering it's required for a correct equals implementation of SparkBatchQueryScan, I think it makes the most sense just to suppress the warnings on the method @SuppressWarnings("checkstyle:CyclomaticComplexity")

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks great to me just some nits. Thanks for contributing this @namrathamyske ! cc @rdblue

@rdblue
Copy link
Contributor

rdblue commented Nov 7, 2022

I am unsure of how to proceed for branches and tags usecase. Just making changes to read from branch/tag in SparkScanBuilder worked before for previous versions of spark - 3.1, 3.2. But for 3.3 , properties get overridden in SparkCatalog.java before reaching SparkScanBuilder.java

The current implementation looks fine to me. If those code paths are taken, it indicates that Spark was passed syntax like TIMESTAMP AS OF '...', which should be incompatible with branch or tag options. This PR already implements that because the snapshot passed to SparkTable is added to options.

@rdblue
Copy link
Contributor

rdblue commented Nov 7, 2022

This looks good to me. I'm rerunning CI since the failures don't look related to this.

@rdblue rdblue merged commit 305e320 into apache:master Nov 9, 2022
Fokko pushed a commit to Fokko/iceberg that referenced this pull request Nov 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants