From 5ad1135002bf9e39cfba9305becddb71ac206dc5 Mon Sep 17 00:00:00 2001 From: Ravi Chinoy Date: Tue, 28 Jan 2020 22:47:54 -0800 Subject: [PATCH] schema evolution support 1. skip check ordering 2. use input schema to build accessors --- .../iceberg/types/CheckCompatibility.java | 15 +++++- .../iceberg/types/TestReadabilityChecks.java | 16 +++++++ .../iceberg/spark/source/IcebergSource.java | 16 +++++-- .../iceberg/spark/source/PartitionKey.java | 4 +- .../apache/iceberg/spark/source/Writer.java | 8 ++-- .../spark/source/TestPartitionValues.java | 46 ++++++++++++++++++- 6 files changed, 93 insertions(+), 12 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java b/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java index 89ea1b052e54..24e8f0053885 100644 --- a/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java +++ b/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java @@ -39,7 +39,20 @@ public class CheckCompatibility extends TypeUtil.CustomOrderSchemaVisitor writeCompatibilityErrors(Schema readSchema, Schema writeSchema) { - return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, true, true)); + return writeCompatibilityErrors(readSchema, writeSchema, true); + } + + /** + * Returns a list of compatibility errors for writing with the given write schema. + * This includes nullability: writing optional (nullable) values to a required field is an error + * Optionally this method allows case where input schema has different ordering than table schema. + * @param readSchema a read schema + * @param writeSchema a write schema + * @param checkOrdering If false, allow input schema to have different ordering than table schema + * @return a list of error details, or an empty list if there are no compatibility problems + */ + public static List writeCompatibilityErrors(Schema readSchema, Schema writeSchema, Boolean checkOrdering) { + return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, checkOrdering, true)); } /** diff --git a/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java b/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java index cb5325d26d05..054b9b2f3905 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java +++ b/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java @@ -331,6 +331,22 @@ public void testIncompatibleListAndPrimitive() { errors.get(0).contains("list cannot be read as a string")); } + @Test + public void testDifferentFieldOrdering() { + // writes should not reorder fields + Schema read = new Schema(required(0, "nested", Types.StructType.of( + required(1, "field_a", Types.IntegerType.get()), + required(2, "field_b", Types.IntegerType.get()) + ))); + Schema write = new Schema(required(0, "nested", Types.StructType.of( + required(2, "field_b", Types.IntegerType.get()), + required(1, "field_a", Types.IntegerType.get()) + ))); + + List errors = CheckCompatibility.writeCompatibilityErrors(read, write, false); + Assert.assertEquals("Should produce 0 error message", 0, errors.size()); + } + @Test public void testStructWriteReordering() { // writes should not reorder fields diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 4c95acaf5800..1aa611e8d4c8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -101,7 +101,7 @@ public Optional createWriter(String jobId, StructType dsStruct Configuration conf = new Configuration(lazyBaseConf()); Table table = getTableAndResolveHadoopConfiguration(options, conf); Schema dsSchema = SparkSchemaUtil.convert(table.schema(), dsStruct); - validateWriteSchema(table.schema(), dsSchema, checkNullability(options)); + validateWriteSchema(table.schema(), dsSchema, checkNullability(options), checkOrdering(options)); validatePartitionTransforms(table.spec()); String appId = lazySparkSession().sparkContext().applicationId(); String wapId = lazySparkSession().conf().get("spark.wap.id", null); @@ -122,7 +122,7 @@ public StreamWriter createStreamWriter(String runId, StructType dsStruct, Configuration conf = new Configuration(lazyBaseConf()); Table table = getTableAndResolveHadoopConfiguration(options, conf); Schema dsSchema = SparkSchemaUtil.convert(table.schema(), dsStruct); - validateWriteSchema(table.schema(), dsSchema, checkNullability(options)); + validateWriteSchema(table.schema(), dsSchema, checkNullability(options), checkOrdering(options)); validatePartitionTransforms(table.spec()); // Spark 2.4.x passes runId to createStreamWriter instead of real queryId, // so we fetch it directly from sparkContext to make writes idempotent @@ -189,10 +189,11 @@ private static void mergeIcebergHadoopConfs( .forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key))); } - private void validateWriteSchema(Schema tableSchema, Schema dsSchema, Boolean checkNullability) { + private void validateWriteSchema( + Schema tableSchema, Schema dsSchema, Boolean checkNullability, Boolean checkOrdering) { List errors; if (checkNullability) { - errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema); + errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema, checkOrdering); } else { errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema); } @@ -228,6 +229,13 @@ private boolean checkNullability(DataSourceOptions options) { return sparkCheckNullability && dataFrameCheckNullability; } + private boolean checkOrdering(DataSourceOptions options) { + boolean sparkCheckOrdering = Boolean.parseBoolean(lazySpark.conf() + .get("spark.sql.iceberg.check-ordering", "true")); + boolean dataFrameCheckOrdering = options.getBoolean("check-ordering", true); + return sparkCheckOrdering && dataFrameCheckOrdering; + } + private FileIO fileIO(Table table) { if (table.io() instanceof HadoopFileIO) { // we need to use Spark's SerializableConfiguration to avoid issues with Kryo serialization diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java index 87ddcd23777c..098423716d85 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java @@ -48,7 +48,7 @@ class PartitionKey implements StructLike { private final Accessor[] accessors; @SuppressWarnings("unchecked") - PartitionKey(PartitionSpec spec) { + PartitionKey(PartitionSpec spec, Schema inputSchema) { this.spec = spec; List fields = spec.fields(); @@ -58,7 +58,7 @@ class PartitionKey implements StructLike { this.accessors = (Accessor[]) Array.newInstance(Accessor.class, size); Schema schema = spec.schema(); - Map> newAccessors = buildAccessors(schema); + Map> newAccessors = buildAccessors(inputSchema); for (int i = 0; i < size; i += 1) { PartitionField field = fields.get(i); Accessor accessor = newAccessors.get(field.sourceId()); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java index d067e507f0f0..34922b72b444 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -281,7 +281,7 @@ public DataWriter createDataWriter(int partitionId, long taskId, lo if (spec.fields().isEmpty()) { return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); } else { - return new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); + return new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, dsSchema); } } @@ -491,10 +491,10 @@ private static class PartitionedWriter extends BaseWriter { AppenderFactory appenderFactory, WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, - long targetFileSize) { + long targetFileSize, + Schema writeSchema) { super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize); - - this.key = new PartitionKey(spec); + this.key = new PartitionKey(spec, writeSchema); } @Override diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java index a37aed9e986e..fdca696a4e98 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java @@ -131,10 +131,10 @@ public void testNullPartitionValue() throws Exception { Dataset df = spark.createDataFrame(expected, SimpleRecord.class); try { - // TODO: incoming columns must be ordered according to the table's schema df.select("id", "data").write() .format("iceberg") .mode("append") + .option("check-ordering", "false") .save(location.toString()); Dataset result = spark.read() @@ -154,6 +154,50 @@ public void testNullPartitionValue() throws Exception { } } + @Test + public void testReorderedColumns() throws Exception { + String desc = "reorder_columns"; + File parent = temp.newFolder(desc); + File location = new File(parent, "test"); + File dataFolder = new File(location, "data"); + Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs()); + + HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf()); + Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString()); + table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); + + List expected = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + + Dataset df = spark.createDataFrame(expected, SimpleRecord.class); + + try { + df.select("data", "id").write() + .format("iceberg") + .mode("append") + .option("check-ordering", "false") + .save(location.toString()); + + Dataset result = spark.read() + .format("iceberg") + .load(location.toString()); + + List actual = result + .orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); + Assert.assertEquals("Result rows should match", expected, actual); + + } finally { + TestTables.clearTables(); + } + } + @Test public void testPartitionValueTypes() throws Exception { String[] columnNames = new String[] {