Data: Add TCK coverage for reader default values#16638
Conversation
| .map(generator -> Arguments.of(format, generator))) | ||
| .toList(); | ||
|
|
||
| private static final List<Arguments> PRIMITIVE_TYPES_AND_DEFAULTS = |
There was a problem hiding this comment.
Could we use a DataGenerator for this?
There was a problem hiding this comment.
The matrix is (type, default-literal) pairs and the test asserts the reader injects exactly that literal, so the type has to stay paired with its expected value. DataGenerator is one schema() + random rows, so it can't carry that pairing, at least as I understand the current interface.
There was a problem hiding this comment.
Can we create a generator with a schema with default values, and create 2 tests, one with nulls where we check for the default values, and one with random data (not nulls), and check for the generated data?
There was a problem hiding this comment.
Added DataGenerators.PrimitiveDefaults and two tests, testPrimitiveDefaultValues writes only id so the defaulted columns are absent and the reader injects the defaults, testPrimitiveDefaultValuesNotApplied writes random values into all columns and checks the round trip.
I replaced the old testReaderSchemaEvolutionNewColumnWithDefault + the type and default matrix, which I removed.
|
|
||
| @ParameterizedTest | ||
| @FieldSource("FILE_FORMATS") | ||
| void testDefaultValues(FileFormat fileFormat) throws IOException { |
There was a problem hiding this comment.
Do we need this test ? Seem the same like testSchemaEvolutionAddColumn?
There was a problem hiding this comment.
testSchemaEvolutionAddColumn adds plain optional columns that read back as null. testDefaultValues instead checks that columns with defaults get the default injected, and that a written value isn't overwritten by its default, and there's some overlap there with the new testPrimitiveDefaultValues. @pvary @Guosmilesmile do you think testDefaultValues is worth keeping, or should I remove it?
There was a problem hiding this comment.
Make sense ,we can keep it for now.
| expectedNested.setField("missing_inner_float", -0.0F); | ||
| expected.setField("nested", expectedNested); | ||
| } | ||
| return expected; |
| .copy("value_str", val.getField("value_str"), "value_int", 34))); | ||
| expected.setField("nested_map", rebuilt); | ||
| } | ||
| return expected; |
There was a problem hiding this comment.
nit: new line. Please check all the code .Thanks
| .collect(Collectors.toList()); | ||
| expected.setField("nested_list", rebuilt); | ||
| } | ||
| return expected; |
| List<Record> genericRecords = RandomGenericData.generate(writeSchema, 10, 1L); | ||
| writeGenericRecords(fileFormat, writeSchema, genericRecords); | ||
|
|
||
| Schema expectedSchema = |
There was a problem hiding this comment.
Can we reuse writeSchema to create expectedSchema?
There was a problem hiding this comment.
Extracted idField and dataField and shared them between the write and expected schemas.
| List<Record> genericRecords = RandomGenericData.generate(writeSchema, 10, 1L); | ||
| writeGenericRecords(fileFormat, writeSchema, genericRecords); | ||
|
|
||
| Schema expectedSchema = |
There was a problem hiding this comment.
Extracted idField and dataField and shared them between the write and expected schemas.
| // TODO: include TIME once the engine readers support it. | ||
| // TODO: include FIXED once Spark supports it. |
There was a problem hiding this comment.
We have 3 implementations:
- Generic
- Flink
- Spark
Which one supports which of these types?
Shall we create specific test methods for these?
There was a problem hiding this comment.
TIME: works on Flink; fails on Spark, java.lang.UnsupportedOperationException: Unsupported logical type: TIME_MICROS.
FIXED: works on Flink; fails on Spark,java.lang.ClassCastException: class [B cannot be cast to class java.nio.ByteBuffer.
Since both work on Flink and only Spark fails, including them would mean running on Flink but skipping on Spark. I'm not sure of the best way to gate that per-engine here, open to suggestions.
Or we could keep both excluded and just update the TODO to note it's Spark specific, or I could look into the Spark side TIME and FIXED handling to see if it can be fixed, though I'm not familiar with those code paths yet and would need to dig in (which I don't mind).
There was a problem hiding this comment.
I assume both works when we use Generic readers and writers
There was a problem hiding this comment.
In another PR we should add TestGenericFormatModel and exclude duplicated tests
There was a problem hiding this comment.
TIME: works on Flink; fails on Spark, java.lang.UnsupportedOperationException: Unsupported logical type: TIME_MICROS.
Maybe #16665 could be interesting here. We plan to use supportsTime for this in the meantime which could be overridden by TestSparkFormatModel
FIXED: works on Flink; fails on Spark,java.lang.ClassCastException: class [B cannot be cast to class java.nio.ByteBuffer.
Is it a bug? Maybe a separate PR could be good where we discuss what is the expected behavior. In the meantime we can add a TODO.
This is also highlighted in #15795
There was a problem hiding this comment.
There was a problem hiding this comment.
I wouldn't wait.
Add the same supportsTime method as defined in #15795 and the faster one wins, the second one rebases 😄
Keep a TODO for the bytebuffer stuff. This is for default values, not the primitive type tests
There was a problem hiding this comment.
I have added supportsTime() matching #15795, gated TIME on it, Spark overrides false.
FIXED left out with a TODO.
There was a problem hiding this comment.
I was looking into the FIXED ClassCastException, the TCK's InternalRowConverter assumed a FIXED value is always a ByteBuffer and cast it directly. But a generic Record actually holds FIXED as a byte[] IIUC, so casting byte[] to ByteBuffer throws the exception. It never showed up before because the TCK had no FIXED column, so that branch never ran on a real FIXED value, adding one as part of this PR is what surfaced it.
The fix checks the value type, handling byte[], ByteBuffer, and GenericData.Fixed, all yielding byte[], which is what Spark's InternalRow expects IIUC, mirroring what TestHelpers does.
Should I keep this here and drop the TODO, or put it into a seperate PR and keep the TODO in this one?
|
|
||
| @ParameterizedTest | ||
| @FieldSource("FILE_FORMATS") | ||
| void testDefaultValues(FileFormat fileFormat) throws IOException { |
There was a problem hiding this comment.
Make sense ,we can keep it for now.
| Schema readSchema = | ||
| supportsTime() | ||
| ? DataGenerators.PrimitiveDefaults.READ_SCHEMA | ||
| : TypeUtil.selectNot( | ||
| DataGenerators.PrimitiveDefaults.READ_SCHEMA, | ||
| Set.of( | ||
| DataGenerators.PrimitiveDefaults.READ_SCHEMA | ||
| .findField("time_with_default") | ||
| .fieldId())); | ||
|
|
||
| List<Record> sourceRecords = RandomGenericData.generate(readSchema, 10, 1L); | ||
| writeGenericRecords(fileFormat, readSchema, sourceRecords); | ||
|
|
||
| readAndAssertEngineRecords(fileFormat, readSchema, sourceRecords, record -> record); |
There was a problem hiding this comment.
The situation in testPrimitiveDefaultValues seems to be quite similar. Can we extract the same parts from there?
There was a problem hiding this comment.
Extracted into primitiveDefaultsReadSchema(), shared by both tests.
| .build()); | ||
|
|
||
| assertThatThrownBy( | ||
| () -> readAndAssertGenericRecords(fileFormat, expectedSchema, genericRecords)) |
There was a problem hiding this comment.
This test only checks the generic reader component. Do we need to add tests for the engine-related components as well?
There was a problem hiding this comment.
You are right, switched it to readAndAssertEngineRecords, throws the same IllegalArgumentException.
| return schema; | ||
| } | ||
|
|
||
| return TypeUtil.selectNot(schema, Set.of(schema.findField("time_with_default").fieldId())); |
There was a problem hiding this comment.
Could we do this without relying on the field name?
Collect the fieldIds where the type is not supported? See #15795 supportedSchema for inspiration.
There was a problem hiding this comment.
I like @rambleraptor's solution which removes the need for supportsTime even more.
There was a problem hiding this comment.
I am now collecting the fieldIds by type instead of the name:
Set<Integer> unsupportedFieldIds =
schema.columns().stream()
.filter(field -> field.type().typeId() == Type.TypeID.TIME)
.map(Types.NestedField::fieldId)
.collect(Collectors.toSet());
return TypeUtil.selectNot(schema, unsupportedFieldIds);About dropping supportsTime() entirely via @rambleraptor's filterUnsupported/excludeColumnsContaining, since #15795 adds those same methods to BaseFormatModelTests, I didn't want to duplicate them here and collide. Should we move to that once #15795 lands? Please let me know what you think.
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Co-authored-by: Joy Haldar <joy.haldar@target.com>
6d1a5f8 to
aede51b
Compare
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Co-authored-by: Joy Haldar <joy.haldar@target.com>
add7874 to
b8ba50d
Compare
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Adds the reader default-value tests from DataTestBase into the Base Format model TCK:
Adds
DataGenerators.PrimitiveDefaultsand the following tests:Adds a
supportsTime()hook, defaultstrue, gates theTIMEcolumn,TestSparkFormatModeloverrides it tofalse.FIXEDexcluded with aTODO.Removes
testReaderSchemaEvolutionNewColumnWithDefault.