Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema evolution support #745

Merged
merged 1 commit into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,36 @@ public class CheckCompatibility extends TypeUtil.CustomOrderSchemaVisitor<List<S
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> writeCompatibilityErrors(Schema readSchema, Schema writeSchema) {
return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, true, true));
return writeCompatibilityErrors(readSchema, writeSchema, true);
}

/**
* Returns a list of compatibility errors for writing with the given write schema.
* This includes nullability: writing optional (nullable) values to a required field is an error
* Optionally this method allows case where input schema has different ordering than table schema.
* @param readSchema a read schema
* @param writeSchema a write schema
* @param checkOrdering If false, allow input schema to have different ordering than table schema
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> writeCompatibilityErrors(Schema readSchema, Schema writeSchema, boolean checkOrdering) {
return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, checkOrdering, true));
}

/**
* Returns a list of compatibility errors for writing with the given write schema.
* This checks type compatibility and not nullability: writing optional (nullable) values
* to a required field is not an error. To check nullability as well as types,
* Optionally this method allows case where input schema has different ordering than table schema.
* use {@link #writeCompatibilityErrors(Schema, Schema)}.
*
* @param readSchema a read schema
* @param writeSchema a write schema
* @param checkOrdering If false, allow input schema to have different ordering than table schema
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> typeCompatibilityErrors(Schema readSchema, Schema writeSchema, boolean checkOrdering) {
return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, checkOrdering, false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,22 @@ public void testIncompatibleListAndPrimitive() {
errors.get(0).contains("list cannot be read as a string"));
}

@Test
public void testDifferentFieldOrdering() {
// writes should not reorder fields
Schema read = new Schema(required(0, "nested", Types.StructType.of(
required(1, "field_a", Types.IntegerType.get()),
required(2, "field_b", Types.IntegerType.get())
)));
Schema write = new Schema(required(0, "nested", Types.StructType.of(
required(2, "field_b", Types.IntegerType.get()),
required(1, "field_a", Types.IntegerType.get())
)));

List<String> errors = CheckCompatibility.writeCompatibilityErrors(read, write, false);
Assert.assertEquals("Should produce 0 error message", 0, errors.size());
}

@Test
public void testStructWriteReordering() {
rdblue marked this conversation as resolved.
Show resolved Hide resolved
// writes should not reorder fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
Schema dsSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
validateWriteSchema(table.schema(), dsSchema, checkNullability(options));
validateWriteSchema(table.schema(), dsSchema, checkNullability(options), checkOrdering(options));
validatePartitionTransforms(table.spec());
String appId = lazySparkSession().sparkContext().applicationId();
String wapId = lazySparkSession().conf().get("spark.wap.id", null);
Expand All @@ -122,7 +122,7 @@ public StreamWriter createStreamWriter(String runId, StructType dsStruct,
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
Schema dsSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
validateWriteSchema(table.schema(), dsSchema, checkNullability(options));
validateWriteSchema(table.schema(), dsSchema, checkNullability(options), checkOrdering(options));
validatePartitionTransforms(table.spec());
// Spark 2.4.x passes runId to createStreamWriter instead of real queryId,
// so we fetch it directly from sparkContext to make writes idempotent
Expand Down Expand Up @@ -189,12 +189,13 @@ private static void mergeIcebergHadoopConfs(
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
}

private void validateWriteSchema(Schema tableSchema, Schema dsSchema, Boolean checkNullability) {
private void validateWriteSchema(
Schema tableSchema, Schema dsSchema, Boolean checkNullability, Boolean checkOrdering) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is getting too complicated. We now have 2 boolean options, one of which is used to select the compatibility checking method (which have similar names) and the other is passed as an arg (which isn't readable). I think it is a good idea to convert compatibility checking to a builder-like pattern:

CheckCompatibility
    .writeSchema(dsSchema)
    .readSchema(tableSchema)
    .checkOrdering(true)
    .checkNullability(false)
    .throwOnValidationError();

List<String> errors;
if (checkNullability) {
errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema, checkOrdering);
} else {
errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema);
errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema, checkOrdering);
}
if (!errors.isEmpty()) {
StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -228,6 +229,13 @@ private boolean checkNullability(DataSourceOptions options) {
return sparkCheckNullability && dataFrameCheckNullability;
}

private boolean checkOrdering(DataSourceOptions options) {
boolean sparkCheckOrdering = Boolean.parseBoolean(lazySpark.conf()
.get("spark.sql.iceberg.check-ordering", "true"));
boolean dataFrameCheckOrdering = options.getBoolean("check-ordering", true);
return sparkCheckOrdering && dataFrameCheckOrdering;
}

private FileIO fileIO(Table table) {
if (table.io() instanceof HadoopFileIO) {
// we need to use Spark's SerializableConfiguration to avoid issues with Kryo serialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PartitionKey implements StructLike {
private final Accessor<InternalRow>[] accessors;

@SuppressWarnings("unchecked")
PartitionKey(PartitionSpec spec) {
PartitionKey(PartitionSpec spec, Schema inputSchema) {
this.spec = spec;

rdblue marked this conversation as resolved.
Show resolved Hide resolved
List<PartitionField> fields = spec.fields();
Expand All @@ -58,7 +58,7 @@ class PartitionKey implements StructLike {
this.accessors = (Accessor<InternalRow>[]) Array.newInstance(Accessor.class, size);

Schema schema = spec.schema();
Map<Integer, Accessor<InternalRow>> newAccessors = buildAccessors(schema);
Map<Integer, Accessor<InternalRow>> newAccessors = buildAccessors(inputSchema);
for (int i = 0; i < size; i += 1) {
PartitionField field = fields.get(i);
Accessor<InternalRow> accessor = newAccessors.get(field.sourceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, lo
if (spec.fields().isEmpty()) {
return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
} else {
return new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
return new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, dsSchema);
}
}

Expand Down Expand Up @@ -491,10 +491,10 @@ private static class PartitionedWriter extends BaseWriter {
AppenderFactory<InternalRow> appenderFactory,
WriterFactory.OutputFileFactory fileFactory,
FileIO fileIo,
long targetFileSize) {
long targetFileSize,
Schema writeSchema) {
super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);

this.key = new PartitionKey(spec);
this.key = new PartitionKey(spec, writeSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public void testNullPartitionValue() throws Exception {
Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);

try {
// TODO: incoming columns must be ordered according to the table's schema
df.select("id", "data").write()
.format("iceberg")
.mode("append")
Expand All @@ -154,6 +153,95 @@ public void testNullPartitionValue() throws Exception {
}
}

@Test
public void testReorderedColumns() throws Exception {
String desc = "reorder_columns";
File parent = temp.newFolder(desc);
File location = new File(parent, "test");
File dataFolder = new File(location, "data");
Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());

HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString());
table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();

List<SimpleRecord> expected = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c")
);

Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);

try {
df.select("data", "id").write()
.format("iceberg")
.mode("append")
.option("check-ordering", "false")
.save(location.toString());

Dataset<Row> result = spark.read()
.format("iceberg")
.load(location.toString());

List<SimpleRecord> actual = result
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();

Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);

} finally {
TestTables.clearTables();
}
}

@Test
public void testReorderedColumnsNoNullability() throws Exception {
String desc = "reorder_columns_no_nullability";
File parent = temp.newFolder(desc);
File location = new File(parent, "test");
File dataFolder = new File(location, "data");
Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());

HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString());
table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();

List<SimpleRecord> expected = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c")
);

Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);

try {
df.select("data", "id").write()
.format("iceberg")
.mode("append")
.option("check-ordering", "false")
.option("check-nullability", "false")
.save(location.toString());

Dataset<Row> result = spark.read()
.format("iceberg")
.load(location.toString());

List<SimpleRecord> actual = result
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();

Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);

} finally {
TestTables.clearTables();
}
}

@Test
public void testPartitionValueTypes() throws Exception {
String[] columnNames = new String[] {
Expand Down