Skip to content

Commit

Permalink
Avro: Add name to ID mapping for files with name-based schemas (#207)
Browse files Browse the repository at this point in the history
Fixes #40.
  • Loading branch information
rdsr authored and rdblue committed Oct 22, 2019
1 parent 706c777 commit 9f1598e
Show file tree
Hide file tree
Showing 9 changed files with 539 additions and 42 deletions.
9 changes: 8 additions & 1 deletion core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mapping.NameMapping;

import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_DEFAULT;
Expand Down Expand Up @@ -170,6 +171,7 @@ public static class ReadBuilder {
private final ClassLoader defaultLoader = Thread.currentThread().getContextClassLoader();
private final InputFile file;
private final Map<String, String> renames = Maps.newLinkedHashMap();
private NameMapping nameMapping;
private boolean reuseContainers = false;
private org.apache.iceberg.Schema schema = null;
private Function<Schema, DatumReader<?>> createReaderFunc = readSchema -> {
Expand Down Expand Up @@ -223,10 +225,15 @@ public ReadBuilder rename(String fullName, String newName) {
return this;
}

public ReadBuilder nameMapping(NameMapping newNameMapping) {
this.nameMapping = newNameMapping;
return this;
}

public <D> AvroIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
return new AvroIterable<>(file,
new ProjectionDatumReader<>(createReaderFunc, schema, renames),
new ProjectionDatumReader<>(createReaderFunc, schema, renames, nameMapping),
start, length, reuseContainers);
}
}
Expand Down
82 changes: 74 additions & 8 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -83,8 +86,8 @@ public static Map<Type, Schema> convertTypes(Types.StructType type, String name)
return ImmutableMap.copyOf(converter.getConversionMap());
}

public static Schema pruneColumns(Schema schema, Set<Integer> selectedIds) {
return new PruneColumns(selectedIds).rootSchema(schema);
public static Schema pruneColumns(Schema schema, Set<Integer> selectedIds, NameMapping nameMapping) {
return new PruneColumns(selectedIds, nameMapping).rootSchema(schema);
}

public static Schema buildAvroProjection(Schema schema, org.apache.iceberg.Schema expected,
Expand Down Expand Up @@ -196,15 +199,35 @@ static Schema createProjectionMap(String recordName,
return LogicalMap.get().addToSchema(Schema.createArray(keyValueRecord));
}

private static int getId(Schema schema, String propertyName) {
private static Integer getId(Schema schema, String propertyName) {
Integer id = getId(schema, propertyName, null, null);
Preconditions.checkNotNull(id, "Missing expected '%s' property", propertyName);
return id;
}

private static Integer getId(Schema schema, String propertyName, NameMapping nameMapping, List<String> names) {
if (schema.getType() == UNION) {
return getId(fromOption(schema), propertyName);
return getId(fromOption(schema), propertyName, nameMapping, names);
}

Object id = schema.getObjectProp(propertyName);
Preconditions.checkNotNull(id, "Missing expected '%s' property", propertyName);
if (id != null) {
return toInt(id);
} else if (nameMapping != null) {
MappedField mappedField = nameMapping.find(names);
if (mappedField != null) {
return mappedField.id();
}
}

return toInt(id);
return null;
}

static boolean hasProperty(Schema schema, String propertyName) {
if (schema.getType() == UNION) {
return hasProperty(fromOption(schema), propertyName);
}
return schema.getObjectProp(propertyName) != null;
}

public static int getKeyId(Schema schema) {
Expand All @@ -213,23 +236,66 @@ public static int getKeyId(Schema schema) {
return getId(schema, KEY_ID_PROP);
}

static Integer getKeyId(Schema schema, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Preconditions.checkArgument(schema.getType() == MAP,
"Cannot get map key id for non-map schema: %s", schema);
List<String> names = Lists.newArrayList(parentFieldNames);
names.add("key");
return getId(schema, KEY_ID_PROP, nameMapping, names);
}

public static int getValueId(Schema schema) {
Preconditions.checkArgument(schema.getType() == MAP,
"Cannot get map value id for non-map schema: %s", schema);
return getId(schema, VALUE_ID_PROP);
}

static Integer getValueId(Schema schema, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Preconditions.checkArgument(schema.getType() == MAP,
"Cannot get map value id for non-map schema: %s", schema);
List<String> names = Lists.newArrayList(parentFieldNames);
names.add("value");
return getId(schema, VALUE_ID_PROP, nameMapping, names);
}

public static int getElementId(Schema schema) {
Preconditions.checkArgument(schema.getType() == ARRAY,
"Cannot get array element id for non-array schema: %s", schema);
return getId(schema, ELEMENT_ID_PROP);
}

static Integer getElementId(Schema schema, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Preconditions.checkArgument(schema.getType() == ARRAY,
"Cannot get array element id for non-array schema: %s", schema);
List<String> names = Lists.newArrayList(parentFieldNames);
names.add("element");
return getId(schema, ELEMENT_ID_PROP, nameMapping, names);
}

public static int getFieldId(Schema.Field field) {
Object id = field.getObjectProp(FIELD_ID_PROP);
Integer id = getFieldId(field, null, null);
Preconditions.checkNotNull(id, "Missing expected '%s' property", FIELD_ID_PROP);
return id;
}

static Integer getFieldId(Schema.Field field, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Object id = field.getObjectProp(FIELD_ID_PROP);
if (id != null) {
return toInt(id);
} else if (nameMapping != null) {
List<String> names = Lists.newArrayList(parentFieldNames);
names.add(field.name());
MappedField mappedField = nameMapping.find(names);
if (mappedField != null) {
return mappedField.id();
}
}

return null;
}

return toInt(id);
public static boolean hasFieldId(Schema.Field field) {
return field.getObjectProp(FIELD_ID_PROP) != null;
}

private static int toInt(Object value) {
Expand Down
25 changes: 22 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public static <T> T visit(Schema schema, AvroSchemaVisitor<T> visitor) {
List<T> results = Lists.newArrayListWithExpectedSize(fields.size());
for (Schema.Field field : schema.getFields()) {
names.add(field.name());
results.add(visit(field.schema(), visitor));
T result = visitWithName(field.name(), field.schema(), visitor);
results.add(result);
}

visitor.recordLevels.pop();
Expand All @@ -57,17 +58,35 @@ public static <T> T visit(Schema schema, AvroSchemaVisitor<T> visitor) {
return visitor.union(schema, options);

case ARRAY:
return visitor.array(schema, visit(schema.getElementType(), visitor));
if (schema.getLogicalType() instanceof LogicalMap || AvroSchemaUtil.isKeyValueSchema(schema.getElementType())) {
return visitor.array(schema, visit(schema.getElementType(), visitor));
} else {
return visitor.array(schema, visitWithName("element", schema.getElementType(), visitor));
}

case MAP:
return visitor.map(schema, visit(schema.getValueType(), visitor));
return visitor.map(schema, visitWithName("value", schema.getValueType(), visitor));

default:
return visitor.primitive(schema);
}
}

private Deque<String> recordLevels = Lists.newLinkedList();
private Deque<String> fieldNames = Lists.newLinkedList();

protected Deque<String> fieldNames() {
return fieldNames;
}

private static <T> T visitWithName(String name, Schema schema, AvroSchemaVisitor<T> visitor) {
try {
visitor.fieldNames.addLast(name);
return visit(schema, visitor);
} finally {
visitor.fieldNames.removeLast();
}
}

public T record(Schema record, List<String> names, List<T> fields) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s

} else {
Preconditions.checkArgument(field.isOptional(), "Missing required field: %s", field.name());
// create a field that will be defaulted to null
// Create a field that will be defaulted to null. We assign a unique suffix to the field
// to make sure that even if records in the file have the field it is not projected.
Schema.Field newField = new Schema.Field(
field.name(),
field.name() + "_r" + field.fieldId(),
AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())), null, JsonProperties.NULL_VALUE);
newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId());
updatedFields.add(newField);
Expand All @@ -115,7 +116,7 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
public Schema.Field field(Schema.Field field, Supplier<Schema> fieldResult) {
Types.StructType struct = current.asNestedType().asStructType();
int fieldId = AvroSchemaUtil.getFieldId(field);
Types.NestedField expectedField = struct.field(fieldId); // TODO: what if there are no ids?
Types.NestedField expectedField = struct.field(fieldId);

// if the field isn't present, it was not selected
if (expectedField == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,33 @@
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.types.TypeUtil;

public class ProjectionDatumReader<D> implements DatumReader<D> {
private final Function<Schema, DatumReader<?>> getReader;
private final org.apache.iceberg.Schema expectedSchema;
private final Map<String, String> renames;
private final NameMapping nameMapping;
private Schema readSchema = null;
private Schema fileSchema = null;
private DatumReader<D> wrapped = null;

public ProjectionDatumReader(Function<Schema, DatumReader<?>> getReader,
org.apache.iceberg.Schema expectedSchema,
Map<String, String> renames) {
Map<String, String> renames,
NameMapping nameMapping) {
this.getReader = getReader;
this.expectedSchema = expectedSchema;
this.renames = renames;
this.nameMapping = nameMapping;
}

@Override
public void setSchema(Schema newFileSchema) {
this.fileSchema = newFileSchema;
Set<Integer> projectedIds = TypeUtil.getProjectedIds(expectedSchema);
Schema prunedSchema = AvroSchemaUtil.pruneColumns(newFileSchema, projectedIds);
Schema prunedSchema = AvroSchemaUtil.pruneColumns(newFileSchema, projectedIds, nameMapping);
this.readSchema = AvroSchemaUtil.buildAvroProjection(prunedSchema, expectedSchema, renames);
this.wrapped = newDatumReader();
}
Expand Down

0 comments on commit 9f1598e

Please sign in to comment.