Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema evolution support #745

Merged
merged 1 commit into from
Feb 13, 2020
Merged

Conversation

ravichinoy
Copy link
Contributor

@ravichinoy ravichinoy commented Jan 21, 2020

  1. skip check ordering
  2. use input schema to build accessors

Please refer Issue #741 for details

@rdblue
Copy link
Contributor

rdblue commented Jan 22, 2020

From #741, it looks like the problem this is trying to address is that you can't write to a table with a different column order than the order of the table schema.

The reason for this restriction is that Spark should be responsible for reconciling table columns with the data from a query. Spark has two different modes for doing this: by position for SQL, and by name for DataFrame writes. I think that delegating this to Spark is the right long-term solution.

In the short term, Spark 2.4 has no resolution step for v2 writes. That's why Iceberg has the current checks that it does, so that you don't corrupt a table by writing the wrong thing. I'd rather not extend the current checks if we don't need to, given that this will be handled by Spark in the next version. Is that reasonable?

@davrmac
Copy link

davrmac commented Jan 23, 2020

Hi @rdblue, I'm working with @ravichinoy on this issue and had a couple of follow up questions:

  1. For the compatibility check change is your concern just about not updating a code-path that's about to be deprecated, or do you also have correctness concerns about dropping the ordering check? From my understanding of the Spark-to-Iceberg write path, Spark has already bound field names to all of the columns being written by the time it calls the IcebergSource to create a writer, and Iceberg uses those field names (and not the field order) to perform the mapping back to the corresponding fieldIds in the Iceberg schema (which it then uses to perform the rest of its compatibility checks). Are there other parts of the Iceberg write path that still depend on field order that would cause corruptions if the ordering check wasn't enforced?

  2. Do you have any concerns with the second half of @ravichinoy's change (i.e. switching from using the Iceberg table schema to the Spark schema when building the PartitionKey accessor)? It's less relevant if we're still enforcing that the Spark schema match the field order of the Iceberg table schema, but I believe there are still scenarios where, if the Spark schema is missing optional fields that are defined in the Iceberg table schema, the write can pass the compatibility check but build a PartitionKey accessor that fails to pull the correct fields out of each InternalRow (we can try writing a test case to prove whether or not this is actually an issue).

@rdblue
Copy link
Contributor

rdblue commented Jan 24, 2020

Do you also have correctness concerns about dropping the ordering check?

I don't have correctness concerns. Handling field reordering is a requirement, and Spark recently added DDL statements to reorder fields.

Mainly, I wonder if it is a good idea to write data files with a column order that depends on the incoming query instead of the table's current column order. Reordering columns can change performance in Avro and it may be surprising when people look at a file in the table and see columns in an unexpected order. These aren't correctness blockers, but if this is going to be moot in the next Spark release then my thinking was that we don't need to choose what the behavior should be.

Do you have any concerns with the second half of @ravichinoy's change (i.e. switching from using the Iceberg table schema to the Spark schema when building the PartitionKey accessor)?

No. And now that you mentioned your concerns, I think that's probably a good idea to use the incoming schema.

@@ -78,7 +78,7 @@

private CheckCompatibility(Schema schema, boolean checkOrdering, boolean checkNullability) {
this.schema = schema;
this.checkOrdering = checkOrdering;
this.checkOrdering = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should still pass checkOrdering correctly. To fix the problem this is trying to address, I think this should add a write option to that gets passed into this.

@@ -491,10 +491,10 @@ public void write(InternalRow row) throws IOException {
AppenderFactory<InternalRow> appenderFactory,
WriterFactory.OutputFileFactory fileFactory,
FileIO fileIo,
long targetFileSize) {
long targetFileSize,
Schema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this writeSchema or some name that indicates it is the schema of the incoming data, not necessarily the table schema?

@@ -132,7 +132,7 @@ public void testNullPartitionValue() throws Exception {

try {
// TODO: incoming columns must be ordered according to the table's schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to remove this TODO.

@rdblue
Copy link
Contributor

rdblue commented Jan 24, 2020

@ravichinoy & @davrmac, I think we can get this in if we add a write config option that controls whether to do the order check, since we already have a boolean for it. We just need to pass it in from writes and then it's up to the user whether to require the same column order as the table. Does that work for you?

@ravichinoy ravichinoy force-pushed the schema_evolution_support branch 4 times, most recently from 5ad1135 to 9a25c9b Compare January 29, 2020 06:52
List<String> errors;
if (checkNullability) {
errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema, checkOrdering);
} else {
errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ravichinoy checkOrdering needs to be passed here as well. You should be able to turn off both the nullability and ordering checks.

* @param checkOrdering If false, allow input schema to have different ordering than table schema
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> writeCompatibilityErrors(Schema readSchema, Schema writeSchema, Boolean checkOrdering) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a boxed boolean?

@@ -189,12 +189,13 @@ private static void mergeIcebergHadoopConfs(
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
}

private void validateWriteSchema(Schema tableSchema, Schema dsSchema, Boolean checkNullability) {
private void validateWriteSchema(
Schema tableSchema, Schema dsSchema, Boolean checkNullability, Boolean checkOrdering) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is getting too complicated. We now have 2 boolean options, one of which is used to select the compatibility checking method (which have similar names) and the other is passed as an arg (which isn't readable). I think it is a good idea to convert compatibility checking to a builder-like pattern:

CheckCompatibility
    .writeSchema(dsSchema)
    .readSchema(tableSchema)
    .checkOrdering(true)
    .checkNullability(false)
    .throwOnValidationError();

@rdblue
Copy link
Contributor

rdblue commented Feb 12, 2020

@ravichinoy, thanks for working on this. It looks good.

Can you either fix the methods that use a boxed Boolean or implement the builder-like API I suggested? I can add the builder in a follow-up if you want to get this in more quickly.

make check ordering configurable
use input schema to build accessors
@rdblue rdblue merged commit b35899f into apache:master Feb 13, 2020
@rdblue
Copy link
Contributor

rdblue commented Feb 13, 2020

Thanks for updating this. I merged it.

@ravichinoy
Copy link
Contributor Author

thanks @rdblue we will try to implement the Builder pattern as part of follow up PR.

jun-ma-0 pushed a commit to jun-ma-0/incubator-iceberg that referenced this pull request May 11, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants