Skip to content

Commit

Permalink
Refactor Struct & Map Field partitions into one method
Browse files Browse the repository at this point in the history
  • Loading branch information
Jordan Moore committed May 20, 2018
1 parent f4651bc commit a717711
Showing 1 changed file with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ public String encodePartition(SinkRecord sinkRecord) {
if (value instanceof Struct) {
final Schema valueSchema = sinkRecord.valueSchema();
log.trace("Extracting partition field '{}' from struct '{}'.", fieldName, valueSchema);
partitionValue = getStructField(valueSchema, (Struct) value, fieldName);
partitionValue = getPartitionValue((Struct) value, fieldName, valueSchema);
} else if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
log.trace("Extracting partition field '{}' from map '{}'.", fieldName, map);
partitionValue = getMapField(map, fieldName);
partitionValue = getPartitionValue(map, fieldName, null);
} else {
log.error("Value is not of Struct or Map type.");
throw new PartitionException("Error encoding partition.");
Expand All @@ -68,42 +68,43 @@ public String encodePartition(SinkRecord sinkRecord) {
return Utils.mkString(partitionValueMap, "", "", "=", delim);
}

private String getStructField(Schema valueSchema, Struct struct, String fieldName) {
Object partitionKey = DataUtils.getNestedFieldValue(struct, fieldName);
Schema fieldSchema = DataUtils.getNestedField(valueSchema, fieldName).schema();
Type type = fieldSchema.type();
switch (type) {
case INT8:
case INT16:
case INT32:
case INT64:
Number record = (Number) partitionKey;
return String.valueOf(record);
case STRING:
return (String) partitionKey;
case BOOLEAN:
boolean booleanRecord = (boolean) partitionKey;
return Boolean.toString(booleanRecord);
default:
log.error("Type {} is not supported as a partition key.", type.getName());
throw new PartitionException("Error encoding partition.");
private String getPartitionValue(Object structOrMap, String fieldName, Schema valueSchema) {
Object partitionValue = DataUtils.getNestedFieldValue(structOrMap, fieldName);

Type type = null;
if (valueSchema != null) {
Schema fieldSchema = DataUtils.getNestedField(valueSchema, fieldName).schema();
type = fieldSchema.type();
}

String partitionValueString = partitionValueToString(partitionValue, type);
if (partitionValueString == null) {
String typeName = null;
if (partitionValue != null) {
typeName = (type != null ? type.getName() : partitionValue.getClass().getCanonicalName());
}
log.error("Type {} is not supported as a partition key.", typeName);
throw new PartitionException("Error encoding partition.");
}
return partitionValueString;
}

private String getMapField(Map<?, ?> map, String fieldName) {
Object partitionKey = DataUtils.getNestedFieldValue(map, fieldName);
if (partitionKey instanceof Number) {
Number record = (Number) partitionKey;
return String.valueOf(record);
} else if (partitionKey == null || partitionKey instanceof String) {
return (String) partitionKey;
} else if (partitionKey instanceof Boolean) {
boolean booleanRecord = (boolean) partitionKey;
@SuppressWarnings("ConstantConditions")
private String partitionValueToString(Object partitionValue, Type type) {
boolean isNumericType = type == Type.INT8
|| type == Type.INT16
|| type == Type.INT32
|| type == Type.INT64;
if (partitionValue instanceof Number || isNumericType) {
Number record = (Number) partitionValue;
return record.toString();
} else if (partitionValue instanceof String || type == Type.STRING) {
return (String) partitionValue;
} else if (partitionValue instanceof Boolean || type == Type.BOOLEAN) {
Boolean booleanRecord = (Boolean) partitionValue;
return Boolean.toString(booleanRecord);
} else {
log.error("Type {} is not supported as a partition key.", partitionKey.getClass());
throw new PartitionException("Error encoding partition.");
}
return null;
}

@Override
Expand Down

0 comments on commit a717711

Please sign in to comment.