Skip to content

Commit

Permalink
Schema Evolution: [LIHADOOP-49587] Update ReassignIds to assign ids t…
Browse files Browse the repository at this point in the history
…o an incoming schema as per the table schema (apache#9)

Disable test broken due to LIHADOOP-49587
  • Loading branch information
rdsr authored and HotSushi committed Jul 31, 2020
1 parent a680912 commit bf1bbf6
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 54 deletions.
41 changes: 32 additions & 9 deletions api/src/main/java/org/apache/iceberg/types/ReassignIds.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.types;

import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import org.apache.iceberg.Schema;
Expand All @@ -28,9 +29,14 @@
class ReassignIds extends TypeUtil.CustomOrderSchemaVisitor<Type> {
private final Schema sourceSchema;
private Type sourceType;
private int nextId;
private boolean newField;

ReassignIds(Schema sourceSchema) {
this.sourceSchema = sourceSchema;
//TODO: this will fail if we allow dropping columns. We need to update this so that new ids > lastColumnId are used
nextId = Collections.max(TypeUtil.indexById(sourceSchema.asStruct()).keySet()) + 1;
newField = false;
}

@Override
Expand All @@ -56,32 +62,43 @@ public Type struct(Types.StructType struct, Iterable<Type> fieldTypes) {
List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(length);
for (int i = 0; i < length; i += 1) {
Types.NestedField field = fields.get(i);
int sourceFieldId = sourceStruct.field(field.name()).fieldId();
Types.NestedField sourceField = sourceStruct.field(field.name());
int fieldId = fieldId(sourceField);
if (field.isRequired()) {
newFields.add(Types.NestedField.required(sourceFieldId, field.name(), types.get(i), field.doc()));
newFields.add(Types.NestedField.required(fieldId, field.name(), types.get(i), field.doc()));
} else {
newFields.add(Types.NestedField.optional(sourceFieldId, field.name(), types.get(i), field.doc()));
newFields.add(Types.NestedField.optional(fieldId, field.name(), types.get(i), field.doc()));
}
}

return Types.StructType.of(newFields);
}

private int fieldId(Types.NestedField sourceField) {
if (sourceField == null || newField) {
return allocateId();
}
return sourceField.fieldId();
}

@Override
public Type field(Types.NestedField field, Supplier<Type> future) {
Preconditions.checkArgument(sourceType.isStructType(), "Not a struct: %s", sourceType);

Types.StructType sourceStruct = sourceType.asStructType();
Types.NestedField sourceField = sourceStruct.field(field.name());
boolean previousValue = newField;
if (sourceField == null) {
throw new IllegalArgumentException("Field " + field.name() + " not found in source schema");
sourceType = field.type();
newField = true;
} else {
sourceType = sourceField.type();
}

this.sourceType = sourceField.type();
try {
return future.get();
} finally {
sourceType = sourceStruct;
newField = previousValue;
}
}

Expand All @@ -90,7 +107,7 @@ public Type list(Types.ListType list, Supplier<Type> elementTypeFuture) {
Preconditions.checkArgument(sourceType.isListType(), "Not a list: %s", sourceType);

Types.ListType sourceList = sourceType.asListType();
int sourceElementId = sourceList.elementId();
int sourceElementId = newField ? allocateId() : sourceList.elementId();

this.sourceType = sourceList.elementType();
try {
Expand All @@ -110,8 +127,8 @@ public Type map(Types.MapType map, Supplier<Type> keyTypeFuture, Supplier<Type>
Preconditions.checkArgument(sourceType.isMapType(), "Not a map: %s", sourceType);

Types.MapType sourceMap = sourceType.asMapType();
int sourceKeyId = sourceMap.keyId();
int sourceValueId = sourceMap.valueId();
int sourceKeyId = newField ? allocateId() : sourceMap.keyId();
int sourceValueId = newField ? allocateId() : sourceMap.valueId();

try {
this.sourceType = sourceMap.keyType();
Expand All @@ -135,4 +152,10 @@ public Type map(Types.MapType map, Supplier<Type> keyTypeFuture, Supplier<Type>
public Type primitive(Type.PrimitiveType primitive) {
return primitive; // nothing to reassign
}

private int allocateId() {
int current = nextId;
nextId += 1;
return current;
}
}
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ public static Schema assignIncreasingFreshIds(Schema schema) {

/**
* Reassigns ids in a schema from another schema.
* For newer fields assigns increasing unique ids
* TODO: Newer fields can be assigned duplicate ids if an existing field was dropped,
* but since we never drop fields, this should not affect us. This does still need
* fixing once we decide to push this upstream
*
* <p>
* Ids are determined by field names. If a field in the schema cannot be found in the source
* schema, this will throw IllegalArgumentException.
Expand Down
81 changes: 81 additions & 0 deletions api/src/test/java/org/apache/iceberg/types/TestReassignIds.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.types;

import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;

public class TestReassignIds {
@Test
public void testReassignIds() {
Schema schema = TypeUtil.reassignIds(
new Schema(
Types.NestedField.required(10, "a", Types.LongType.get()),
Types.NestedField.required(11, "b", Types.StringType.get())
),
new Schema(
Types.NestedField.required(0, "a", Types.LongType.get())));

check(ImmutableMap.of("b", 1, "a", 0), schema);

schema = TypeUtil.reassignIds(
new Schema(
Types.NestedField.required(20, "a", Types.MapType.ofRequired(21, 22,
Types.StringType.get(),
Types.StructType.of(
Types.NestedField.required(25, "b", Types.StringType.get()),
Types.NestedField.required(27, "c", Types.StructType.of(
Types.NestedField.required(28, "d", Types.FloatType.get()),
Types.NestedField.required(29, "e", Types.FloatType.get())
)))))),
new Schema(
Types.NestedField.required(1, "a", Types.MapType.ofRequired(2, 3,
Types.StringType.get(),
Types.StructType.of(
Types.NestedField.required(4, "b", Types.StringType.get()))))));

check(ImmutableMap.of("a.c", 7, "a.c.d", 5, "a.c.e", 6), schema);

schema = TypeUtil.reassignIds(
new Schema(
Types.NestedField.required(9, "e", Types.IntegerType.get()),
Types.NestedField.required(10, "a",
Types.StructType.of(
Types.NestedField.required(11, "b",
Types.StructType.of(
Types.NestedField.required(12, "c",
Types.StructType.of(
Types.NestedField.required(13, "d", Types.IntegerType.get())))))))),

new Schema(
Types.NestedField.required(0, "e", Types.IntegerType.get())));

check(ImmutableMap.of("a.b.c.d", 1, "a.b.c", 2, "a.b", 3, "a", 4, "e", 0), schema);
}

private void check(Map<String, Integer> expectedNameToId, Schema schema) {
for (Map.Entry<String, Integer> e : expectedNameToId.entrySet()) {
Assert.assertEquals(e.getValue().intValue(), schema.findField(e.getKey()).fieldId());
}
}
}
13 changes: 1 addition & 12 deletions api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,6 @@ public void testReassignIdsDuplicateColumns() {
Assert.assertEquals(sourceSchema.asStruct(), actualSchema.asStruct());
}

@Test(expected = IllegalArgumentException.class)
public void testReassignIdsIllegalArgumentException() {
Schema schema = new Schema(
required(1, "a", Types.IntegerType.get()),
required(2, "b", Types.IntegerType.get())
);
Schema sourceSchema = new Schema(
required(1, "a", Types.IntegerType.get())
);
TypeUtil.reassignIds(schema, sourceSchema);
}

@Test(expected = RuntimeException.class)
public void testValidateSchemaViaIndexByName() {
Types.NestedField nestedType = Types.NestedField
Expand All @@ -68,3 +56,4 @@ public void testValidateSchemaViaIndexByName() {
TypeUtil.indexByName(Types.StructType.of(nestedType));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -104,39 +104,39 @@ public void testSparkReadSchemaIsHonored() throws IOException {
Assert.assertEquals("Row content matches data", 1, results[0].getInt(0));
}

@Test
public void testFailIfSparkReadSchemaIsOff() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
tables.create(SCHEMA, spec, null, tableLocation);

List<SimpleRecord> expectedRecords = Lists.newArrayList(
new SimpleRecord(1, "a")
);
Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
originalDf.select("id", "data").write()
.format("iceberg")
.mode("append")
.save(tableLocation);

StructType sparkReadSchema =
new StructType(
new StructField[] {
new StructField("idd", DataTypes.IntegerType, true, Metadata.empty()) // wrong field name
}
);

AssertHelpers.assertThrows("Iceberg should not allow a projection that contain unknown fields",
java.lang.IllegalArgumentException.class, "Field idd not found in source schema",
() ->
spark.read()
.schema(sparkReadSchema)
.format("iceberg")
.load(tableLocation)
);
}
// @Test
// public void testFailIfSparkReadSchemaIsOff() throws IOException {
// String tableLocation = temp.newFolder("iceberg-table").toString();

// HadoopTables tables = new HadoopTables(CONF);
// PartitionSpec spec = PartitionSpec.unpartitioned();
// tables.create(SCHEMA, spec, null, tableLocation);

// List<SimpleRecord> expectedRecords = Lists.newArrayList(
// new SimpleRecord(1, "a")
// );
// Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
// originalDf.select("id", "data").write()
// .format("iceberg")
// .mode("append")
// .save(tableLocation);

// StructType sparkReadSchema =
// new StructType(
// new StructField[] {
// new StructField("idd", DataTypes.IntegerType, true, Metadata.empty()) // wrong field name
// }
// );

// AssertHelpers.assertThrows("Iceberg should not allow a projection that contain unknown fields",
// java.lang.IllegalArgumentException.class, "Field idd not found in source schema",
// () ->
// spark.read()
// .schema(sparkReadSchema)
// .format("iceberg")
// .load(tableLocation)
// );
// }

@Test
public void testSparkReadSchemaCombinedWithProjection() throws IOException {
Expand Down

0 comments on commit bf1bbf6

Please sign in to comment.