-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert date and timestamp values in generics #983
Convert date and timestamp values in generics #983
Conversation
@@ -449,6 +454,45 @@ private DataFile writeFile(String location, String filename, List<Record> record | |||
} | |||
} | |||
|
|||
@Test | |||
public void testFilterWithComplexType() throws IOException { | |||
if (format == FileFormat.PARQUET) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is due to https://issues.apache.org/jira/browse/PARQUET-1851.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests should be disabled using Assume
, so it doesn't look like they are passing and are skipped instead.
I think it makes sense to only disable the parts of the test that don't pass in Parquet -- I think just the timestamp without zone type. Can you separate that into a different case and add the assume to that one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works after #891 merged.
Thanks, @chenjunjiedada, but I don't think this is the right solution to the problem. This adds quite a bit of logic to get and changes the class argument so that it is a type request -- so the record will convert a value to the requested type. But get is used in a tight loop and we don't want it to do that additional work to not only return a value, but detect what it is supposed to convert to and do that conversion. Also, this changes the contract of a public method and we don't want to make additional guarantees here. Instead, the accessors that are build when binding the message to an expression should be used. This happens in Spark, where string values are converted from UTF8String to a CharSequence using a StringAccessor. This should be similar, where a DateAccessor, or TimestampAccessor is used to return the right internal representation from a record that stores LocalDate or OffsetDateTime. |
d937e4f
to
cfd1bf6
Compare
Make sense to me, thanks @rdblue! Now I added accessors to The unit test |
Integer tsDay = (Integer) day.apply(date20191201); | ||
|
||
Predicate pred = in("ts", date20191201, date20191202); | ||
Predicate pred = in("ts", "2019-12-01T00:00:00.00000", "2019-12-01T00:00:00.00000"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why were these changes necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unit tests failed with the following exception, the same reason that tranform.day doesn't produce the date value. These changes are not necessary just use to pass unit tests. I will revert the changes.
Cannot cast java.lang.Integer to java.time.LocalDate
java.lang.ClassCastException: Cannot cast java.lang.Integer to java.time.LocalDate
at java.lang.Class.cast(Class.java:3369)
at org.apache.iceberg.TestHelpers$Row.get(TestHelpers.java:121)
at org.apache.iceberg.Accessors$DateAccessor.get(Accessors.java:216)
at org.apache.iceberg.Accessors$DateAccessor.get(Accessors.java:209)
at org.apache.iceberg.expressions.BoundReference.eval(BoundReference.java:39)
at org.apache.iceberg.expressions.ResidualEvaluator$ResidualVisitor.in(ResidualEvaluator.java:201)
at org.apache.iceberg.expressions.ResidualEvaluator$ResidualVisitor.in(ResidualEvaluator.java:120)
at org.apache.iceberg.expressions.ExpressionVisitors$BoundExpressionVisitor.predicate(ExpressionVisitors.java:152)
at org.apache.iceberg.expressions.ResidualEvaluator$ResidualVisitor.predicate(ResidualEvaluator.java:258)
at org.apache.iceberg.expressions.ResidualEvaluator$ResidualVisitor.predicate(ResidualEvaluator.java:281)
at org.apache.iceberg.expressions.ResidualEvaluator$ResidualVisitor.predicate(ResidualEvaluator.java:120)
at org.apache.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:283)
at org.apache.iceberg.expressions.ResidualEvaluator$ResidualVisitor.eval(ResidualEvaluator.java:125)
at org.apache.iceberg.expressions.ResidualEvaluator$ResidualVisitor.access$100(ResidualEvaluator.java:120)
at org.apache.iceberg.expressions.ResidualEvaluator.residualFor(ResidualEvaluator.java:117)
at org.apache.iceberg.transforms.TestResiduals.testInTimestamp(TestResiduals.java:211)
@@ -187,6 +206,47 @@ public String toString() { | |||
} | |||
} | |||
|
|||
private static class DateAccessor extends PositionAccessor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These accessors are for types used by Iceberg generics. I don't think that we want to make them part of the public API, and we need to be able to add them when using generics but not use them when binding to other rows. These should not be used with PartitionData
instances, for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is the cause of the test failures:
org.apache.iceberg.spark.source.TestFilteredScan > testDayPartitionedTimestampFilters[2] FAILED
java.lang.IllegalArgumentException: Wrong class, java.time.LocalDate, for object: 17521
at org.apache.iceberg.PartitionData.get(PartitionData.java:120)
at org.apache.iceberg.Accessors$DateAccessor.get(Accessors.java:216)
at org.apache.iceberg.Accessors$DateAccessor.get(Accessors.java:209)
at org.apache.iceberg.expressions.BoundReference.eval(BoundReference.java:39)
We need to substitute this accessor for the default accessor only when using Iceberg generics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that these accessors should be specific for Iceberg generics and basically there are three options to do that:
-
Identify Iceberg generic class in
Accessors
in iceberg-api, but that would mix in iceberg-data into iceberg-api package. -
Copy all related
Accessors
into iceberg-data package, butEvaluator
constructs accessors viaSchema#accessForField
which comes from iceberg-api package, so that we can not pass iceberg generic accessors to them. Or we need to update Evaluator but it is still in iceberg-api package. -
Fix the issue: Transform.day does not produce DateType value #984, so that DateAccessor works for PartitionData as well.
IMO, option 3 is feasible, what do you think? Or you may have other options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue , I just use another solution for this. Take accessors
from passing parameter instead of parsing it from schema when binding expression with struct type. With this decoupling we could customize the accessors
for different StructLike
. Does that make sense to you?
cfd1bf6
to
e8f94bf
Compare
e8f94bf
to
d7f3d50
Compare
257c509
to
ea42cce
Compare
@chenjunjiedada, please take a look at #1038. The test cases here made me think about the existing tests. Ideally, we would extend those tests with assertions for sequence numbers rather than only testing a subset. That PR demonstrates how we would do that, by adding validation methods that test sequence numbers if they are provided. It also uses assertions that only apply for a specific table version, like That would be a more thorough way to accomplish what you're trying to do here. Would you like to use that approach? |
@chenjunjiedada, you're right. It looks like I made that comment on the wrong PR. Sorry for the confusion. |
@chenjunjiedada, I think what you have here works, but right now we have two sets of accessors (Spark and internal) and we would need more for other data models. I think what we need to do instead is to separate the concerns. Accessors should understand structure and rows should return the correct value types. That way we can eventually move to use the same accessors and we will just need rows that correctly translate for StructLike. How about using a wrapper class like this one when passing records? Would that work? class InternalRecordWrapper implements StructLike {
private final Function<Object, Object>[] transforms;
private StructLike wrapped = null;
@SuppressWarnings("unchecked")
InternalRecordWrapper(Types.StructType struct) {
this.transforms = struct.fields().stream()
.map(field -> converter(field.type()))
.toArray(length -> (Function<Object, Object>[]) Array.newInstance(Function.class, length));
}
private static Function<Object, Object> converter(Type type) {
switch (type.typeId()) {
case DATE:
return date -> DateTimeUtil.daysFromDate((LocalDate) date);
case TIME:
return time -> DateTimeUtil.microsFromTime((LocalTime) time);
case TIMESTAMP:
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
return timestamp -> DateTimeUtil.microsFromTimestamptz((OffsetDateTime) timestamp);
} else {
return timestamp -> DateTimeUtil.microsFromTimestamp((LocalDateTime) timestamp);
}
case FIXED:
return bytes -> ByteBuffer.wrap((byte[]) bytes);
case STRUCT:
InternalRecordWrapper wrapper = new InternalRecordWrapper(type.asStructType());
return struct -> wrapper.wrap((StructLike) struct);
default:
}
return null;
}
public InternalRecordWrapper wrap(StructLike record) {
this.wrapped = record;
return this;
}
@Override
public int size() {
return wrapped.size();
}
@Override
public <T> T get(int pos, Class<T> javaClass) {
if (transforms[pos] != null) {
return javaClass.cast(transforms[pos].apply(wrapped.get(pos, Object.class)));
}
return wrapped.get(pos, javaClass);
}
@Override
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException("Cannot update InternalRecordWrapper");
}
} |
@rdblue , The wrapper class tries to do something like the first commit I submit, IIUC, it should work but that would cause the concerns you raised before (impact perf in the tight loop and break the contact). Plus, the class doesn't handle the
Either one of the concerns gets solved should work, right? Given that we will have more data models (PIG, HIVE, Flink and etc..), eventually we will need more accessors or wrapper classes. IMHO, the accessor way looks a bit cleaner, actually, we could make the functions and subclasses in |
You're right that it's more similar to your original commit than to my suggestion to use accessors. The other issue with accessors made me think that conversion should not be tightly coupled there. This is also a bit different from your original version: the |
Also, to address your comment that the accessors approach is cleaner, we already have two copies of the accessors (in Spark and in API) and running evaluators on another object model would require another copy. Moving the conversion responsibility to a wrapper means we can use the same accessors in other places. And a wrapper also fits well with the need to adapt other record models to This doesn't support lists and maps because those can't be used in expressions. If we start supporting expressions that operate on collections, we would need to build a visitor like you suggested. |
3474c6d
to
6865f13
Compare
@rdblue, I have updated to use wrapper class, would you please take another look? |
@@ -161,7 +161,10 @@ public boolean hasNext() { | |||
|
|||
if (task.residual() != null && task.residual() != Expressions.alwaysTrue()) { | |||
Evaluator filter = new Evaluator(projection.asStruct(), task.residual(), caseSensitive); | |||
this.currentIterator = Iterables.filter(reader, filter::eval).iterator(); | |||
this.currentIterator = Iterables.filter(reader, record -> { | |||
InternalRecordWrapper wrapperRecord = new InternalRecordWrapper(record.struct()).wrap(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wrapper can be reused so it should be moved outside the predicate. The predicate just needs to be this:
record -> filter.eval(wrapper.wrap(record))
@chenjunjiedada, it looks great! Just one thing to fix: we don't want to create a new wrapper each time we evaluate the predicate. Thanks for fixing this, and for trying different approaches until we found the right one. |
@rdblue , you are welcome! Also, thank you for the great patience in reviewing! |
This is strange, the local build passed. Let me rebase and test again. |
This commit: e8f7379 break the unit tests. @shardulm94, FYI. |
Thanks, @chenjunjiedada. I merged this. @shardulm94, we should fix the new test that fails for ORC pushdown in a separate PR. I didn't want to block this PR on an unrelated issue. |
@rdblue Ack, on it. |
This fixes #972.