Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
Expand All @@ -31,11 +30,8 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,9 +59,9 @@ static Schema convert(List<String> names, List<TypeInfo> typeInfos, List<String>
return new Schema(converter.convertInternal(names, typeInfos, defaultValues, comments));
}

static Type convert(TypeInfo typeInfo, boolean autoConvert) {
public static Type convert(TypeInfo typeInfo, boolean autoConvert, String defaultValue) {
HiveSchemaConverter converter = new HiveSchemaConverter(autoConvert);
return converter.convertType(typeInfo, null);
return converter.convertType(typeInfo, defaultValue);
}

List<Types.NestedField> convertInternal(List<String> names, List<TypeInfo> typeInfos,
Expand All @@ -86,7 +82,7 @@ List<Types.NestedField> convertInternal(List<String> names, List<TypeInfo> typeI

if (defaultValues.containsKey(columnName)) {
if (type.isPrimitiveType()) {
Object icebergDefaultValue = getDefaultValue(defaultValues.get(columnName), type);
Object icebergDefaultValue = HiveSchemaUtil.getDefaultValue(defaultValues.get(columnName), type);
fieldBuilder.withWriteDefault(Expressions.lit(icebergDefaultValue));
} else if (!type.isStructType()) {
throw new UnsupportedOperationException(
Expand All @@ -99,13 +95,6 @@ List<Types.NestedField> convertInternal(List<String> names, List<TypeInfo> typeI
return result;
}

private static Object getDefaultValue(String defaultValue, Type type) {
return switch (type.typeId()) {
case DATE, TIME, TIMESTAMP, TIMESTAMP_NANO -> Literal.of(stripQuotes(defaultValue)).to(type).value();
default -> Conversions.fromPartitionString(type, stripQuotes(defaultValue));
};
}

Type convertType(TypeInfo typeInfo, String defaultValue) {
switch (typeInfo.getCategory()) {
case PRIMITIVE:
Expand Down Expand Up @@ -162,7 +151,7 @@ Type convertType(TypeInfo typeInfo, String defaultValue) {
StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
List<Types.NestedField> fields =
convertInternal(structTypeInfo.getAllStructFieldNames(), structTypeInfo.getAllStructFieldTypeInfos(),
getDefaultValuesMap(defaultValue), Collections.emptyList());
HiveSchemaUtil.getDefaultValuesMap(defaultValue), Collections.emptyList());
return Types.StructType.of(fields);
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
Expand All @@ -182,20 +171,4 @@ Type convertType(TypeInfo typeInfo, String defaultValue) {
throw new IllegalArgumentException("Unknown type " + typeInfo.getCategory());
}
}

private static Map<String, String> getDefaultValuesMap(String defaultValue) {
if (StringUtils.isEmpty(defaultValue)) {
return Collections.emptyMap();
}
// For Struct, the default value is expected to be in key:value format
return Splitter.on(',').trimResults().withKeyValueSeparator(':').split(stripQuotes(defaultValue));
}

public static String stripQuotes(String val) {
if (val.charAt(0) == '\'' && val.charAt(val.length() - 1) == '\'' ||
val.charAt(0) == '"' && val.charAt(val.length() - 1) == '"') {
return val.substring(1, val.length() - 1);
}
return val;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
Expand Down Expand Up @@ -140,23 +144,29 @@ public static TypeInfo convert(Type type) {

/**
* Converts a Hive typeInfo object to an Iceberg type.
* @param typeInfo The Hive type
*
* @param typeInfo The Hive type
* @param defaultValue the default value for the column, if any
* @return The Iceberg type
*/
public static Type convert(TypeInfo typeInfo) {
return HiveSchemaConverter.convert(typeInfo, false);
public static Type convert(TypeInfo typeInfo, String defaultValue) {
return HiveSchemaConverter.convert(typeInfo, false, defaultValue);
}

/**
* Returns a SchemaDifference containing those fields which are present in only one of the collections, as well as
* those fields which are present in both (in terms of the name) but their type or comment has changed.
* @param minuendCollection Collection of fields to subtract from
*
* @param minuendCollection Collection of fields to subtract from
* @param subtrahendCollection Collection of fields to subtract
* @param bothDirections Whether or not to compute the missing fields from the minuendCollection as well
* @param schema the iceberg table schema, if available. Used to compare default values
* @param defaultValues the column default values
* @param bothDirections Whether or not to compute the missing fields from the minuendCollection as well
* @return the difference between the two schemas
*/
public static SchemaDifference getSchemaDiff(Collection<FieldSchema> minuendCollection,
Collection<FieldSchema> subtrahendCollection, boolean bothDirections) {
Collection<FieldSchema> subtrahendCollection, Schema schema, Map<String, String> defaultValues,
boolean bothDirections) {
SchemaDifference difference = new SchemaDifference();

for (FieldSchema first : minuendCollection) {
Expand All @@ -178,13 +188,61 @@ public static SchemaDifference getSchemaDiff(Collection<FieldSchema> minuendColl
}

if (bothDirections) {
SchemaDifference otherWay = getSchemaDiff(subtrahendCollection, minuendCollection, false);
SchemaDifference otherWay = getSchemaDiff(subtrahendCollection, minuendCollection, null, defaultValues, false);
otherWay.getMissingFromSecond().forEach(difference::addMissingFromFirst);
}

if (schema != null) {
for (Types.NestedField field : schema.columns()) {
if (!isRemovedField(field, difference.getMissingFromFirst())) {
getDefaultValDiff(field, defaultValues, difference);
}
}
}

return difference;
}

private static boolean isRemovedField(Types.NestedField field, List<FieldSchema> missingFields) {
for (FieldSchema fieldSchema : missingFields) {
if (fieldSchema.getName().equalsIgnoreCase(field.name())) {
return true;
}
}
return false;
}

/**
* Computes whether the default value has changed for the given field.
* @param field the field to check for default value change
* @param defaultValues the default values for the table schema, if available. Used to compare default values
* @param difference the SchemaDifference object to update with the default value change if any
*/
private static void getDefaultValDiff(Types.NestedField field, Map<String, String> defaultValues,
SchemaDifference difference) {

String defaultStr = defaultValues.get(field.name());

// Skip if no default at all
if (defaultStr == null && field.writeDefault() == null) {
return;
}

if (field.type().isPrimitiveType()) {
Object expectedDefault = HiveSchemaUtil.getDefaultValue(defaultStr, field.type());
if (!Objects.equals(expectedDefault, field.writeDefault())) {
difference.addDefaultChanged(field, expectedDefault);
}
} else if (field.type().isStructType()) {
Map<String, String> structDefaults = getDefaultValuesMap(defaultStr);

for (Types.NestedField nested : field.type().asStructType().fields()) {
getDefaultValDiff(nested, structDefaults, difference);
}
}
}


/**
* Compares two lists of columns to each other to find the (singular) column that was moved. This works ideally for
* identifying the column that was moved by an ALTER TABLE ... CHANGE COLUMN command.
Expand Down Expand Up @@ -248,6 +306,7 @@ public static class SchemaDifference {
private final List<FieldSchema> missingFromSecond = Lists.newArrayList();
private final List<FieldSchema> typeChanged = Lists.newArrayList();
private final List<FieldSchema> commentChanged = Lists.newArrayList();
private final Map<Types.NestedField, Object> defaultChanged = Maps.newHashMap();

public List<FieldSchema> getMissingFromFirst() {
return missingFromFirst;
Expand All @@ -265,9 +324,13 @@ public List<FieldSchema> getCommentChanged() {
return commentChanged;
}

public Map<Types.NestedField, Object> getDefaultChanged() {
return defaultChanged;
}

public boolean isEmpty() {
return missingFromFirst.isEmpty() && missingFromSecond.isEmpty() && typeChanged.isEmpty() &&
commentChanged.isEmpty();
commentChanged.isEmpty() && defaultChanged.isEmpty();
}

void addMissingFromFirst(FieldSchema field) {
Expand All @@ -285,6 +348,10 @@ void addTypeChanged(FieldSchema field) {
void addCommentChanged(FieldSchema field) {
commentChanged.add(field);
}

void addDefaultChanged(Types.NestedField field, Object defaultValue) {
defaultChanged.put(field, defaultValue);
}
}


Expand Down Expand Up @@ -408,4 +475,35 @@ public static Object convertToWriteType(Object value, Type type) {

return value; // fallback
}

public static Map<String, String> getDefaultValuesMap(String defaultValue) {
if (StringUtils.isEmpty(defaultValue)) {
return Collections.emptyMap();
}
// For Struct, the default value is expected to be in key:value format
return Splitter.on(',').trimResults().withKeyValueSeparator(':').split(stripQuotes(defaultValue));
}

public static String stripQuotes(String val) {
if (val.charAt(0) == '\'' && val.charAt(val.length() - 1) == '\'' ||
val.charAt(0) == '"' && val.charAt(val.length() - 1) == '"') {
return val.substring(1, val.length() - 1);
}
return val;
}

public static Object getDefaultValue(String defaultValue, Type type) {
if (defaultValue == null) {
return null;
}
return switch (type.typeId()) {
case DATE, TIME, TIMESTAMP, TIMESTAMP_NANO ->
Literal.of(stripQuotes(defaultValue)).to(type).value();
default -> Conversions.fromPartitionString(type, stripQuotes(defaultValue));
};
}

public static Type getStructType(TypeInfo typeInfo, String defaultValue) {
return HiveSchemaConverter.convert(typeInfo, false, defaultValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private void checkConvert(TypeInfo typeInfo, Type type) {
// Convert to TypeInfo
assertThat(HiveSchemaUtil.convert(type)).isEqualTo(typeInfo);
// Convert to Type
assertEquals(type, HiveSchemaUtil.convert(typeInfo));
assertEquals(type, HiveSchemaUtil.convert(typeInfo, null));
}

/**
Expand Down
Loading