Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API, Spark: Fix aggregation pushdown on struct fields #9176

Merged
merged 3 commits into from Jan 31, 2024

Conversation

amogh-jahagirdar
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar commented Nov 29, 2023

Currently, aggregation pushdowns on struct fields fail due to casting errors because in certain cases (e.g. optional field in struct) the accessor visitor construct a WrappedPositionAccessor when visiting the field. This WrappedPositionAccessor has an expectation that the element it's accessing is a StructLike but for aggregation pushdown this does not apply (the top level element is a Java type).

Since there's always a single value I think we should be able to just read the evaluated value without actually performing any term evaluation.

@amogh-jahagirdar amogh-jahagirdar changed the title API, Spark: Fix aggregation pushodwn on struct fields API, Spark: Fix aggregation pushdown on struct fields Nov 29, 2023
Comment on lines 38 to 42
valueStruct.setValue(evaluateRef(file));
return term().eval(valueStruct);
return (T) evaluateRef(file);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so the main thing here is do we acutally need to pass the single value down through term evaluation? what's a case where this would fail since all the tests currently pass without it but I'm doubtful that this generalizes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK to me to just return evaluateRef(file).

I'd like to get @rdblue 's opinion on this as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should also note, if we determine this approach is good we can remove the SingleValueStruct class itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case that I think is missing is when the term is not a simple reference and is instead a transformed value. That's why the SingleValueStruct class is here. The idea is to evaluate the original term (which could be a transform of a field) on the value that was retrieved from the DataFile metadata.

Also, it's good to keep in mind what each eval method is for. The eval(StructLike) method is for evaluating the aggregate on individual rows. That's because this is generic and we can use the aggregate framework to calculate aggregates of normal rows. That means the change to that method isn't correct. That eval method should call the accessor that was bound to the schema of incoming rows.

This eval method, eval(DataFile) is called to produce a value for a set of rows. As I said, that value may be then transformed by the expression so we need to run the expression term on it. To be able to run the term, we need a row that can supply the value that came from the DataFile and that's where SingleValueStruct comes in: it returns the DataFile value for any position that is queried. It should work with any accessor. The problem is that I didn't consider nested cases when writing it. It always returns the value, but it should return itself if the caller expected the value to be nested.

Here's an alternative fix for SingleValueStruct that keeps the original eval implementations but handles nested values:

    public <T> T get(int pos, Class<T> javaClass) {
      if (javaClass.isAssignableFrom(StructLike.class)) {
        return (T) this;
      } else {
        return (T) value;
      }
    }

The tests from this PR pass with that change.

@amogh-jahagirdar amogh-jahagirdar marked this pull request as ready for review November 30, 2023 18:10
@rdblue rdblue added this to the Iceberg 1.5.0 milestone Jan 2, 2024
@@ -249,6 +250,78 @@ public void testAggregateNotPushDownIfOneCantPushDown() {
assertEquals("expected and actual should equal", expected, actual);
}

@Test
public void testAggregationPushdownStructInteger() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an alternative to having multiple methods that effectively call the same test code is to have a single test method that is parameterized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some more refactoring, while we could use @Parameterized in this case it ends up not being super readable compared to just being explicit imo. I've updated the PR so that there's helpers for asserting the aggregates and the expected contents of the explain plan.

long timestamp = System.currentTimeMillis();
long futureTimestamp = timestamp + 5000;
Timestamp expectedMax = new Timestamp(futureTimestamp / 1000 * 1000);
Timestamp expectedMin = new Timestamp(1000 * (timestamp / 1000));
Copy link
Contributor

@rdblue rdblue Jan 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the expressions different other than the timestamp variable?

Also, do these need to be timestamps? Or can you convert them back to millis or micros in Spark? We want to avoid using Timestamp in tests because it has wacky behavior across time zones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was aiming to test multiple data types. The issue with converting in Spark, is that the goal of the test is to verify that there was indeed aggregation pushdown and using any conversion functions will eliminate the pushdown that's performed and the test loses that value.

In the latest version in Spark I insert records explicitly in UTC time (which will then be stored in timestamp without timezone format in Iceberg). Then when the records are read, the java.sql.Timestamp will be a point in time, milliseconds since epoch. This would be deterministic across time zones unless there's some implicit session conversion that I'm missing.

@rdblue
Copy link
Contributor

rdblue commented Jan 28, 2024

@amogh-jahagirdar, I fixed the implementation in the PR above. It would be great to get this into 1.5.0 also!

@rdblue rdblue merged commit 9de693f into apache:main Jan 31, 2024
42 checks passed
@rdblue
Copy link
Contributor

rdblue commented Jan 31, 2024

Nice work, @amogh-jahagirdar! Thanks for getting this done for the 1.5 release!

Arrays.stream(expectedFragments)
.forEach(
fragment ->
Assertions.assertThat(explainString.contains(fragment))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be Assertions.assertThat(explainString).as(...).contains(fragment). We typically try to avoid usage of isTrue() / isFalse() on assertions like these because they don't provide any contextual insight when an assertion fails.
On the other hand, using assertThat(explainString).as(...).contains(fragment) will always show the content of explainString and fragment in case the assertion fails.
Also the .as() typically needs to be specified before the final assertion and will be ignored otherwise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants