Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FieldPartitioner Map and Nested Value Support #67

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@

public class DataUtils {

/**
* Regex pattern that respects fields with periods within them.
* Used to handle a field extraction of <code>"foo.bar"</code> differently than
* <code>foo.bar</code>, with the later being the <code>bar</code> field within the
* <code>foo</code> {@link Struct}
*/
private static final String QUOTED_DOT_PATTERN = "\\.(?=(?:[^\\\"]*\\\"[^\\\"]*\\\")*[^\\\"]*$)";

public static Object getField(Object structOrMap, String fieldName) {
if (structOrMap instanceof Struct) {
return ((Struct) structOrMap).get(fieldName);
Expand All @@ -45,8 +53,16 @@ public static Object getField(Object structOrMap, String fieldName) {
public static Object getNestedFieldValue(Object structOrMap, String fieldName) {
try {
Object innermost = structOrMap;
String[] tokens;
// Structs cannot contain fields with periods
if (structOrMap instanceof Struct) {
tokens = fieldName.split("\\.");
} else {
tokens = fieldName.split(QUOTED_DOT_PATTERN);
}
// Iterate down to final struct
for (String name : fieldName.split("\\.")) {
for (String name : tokens) {
name = name.replaceAll("\"", "");
innermost = getField(innermost, name);
}
return innermost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.confluent.connect.storage.partitioner;

import io.confluent.connect.storage.util.DataUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
Expand All @@ -24,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -45,43 +47,65 @@ public void configure(Map<String, Object> config) {
@Override
public String encodePartition(SinkRecord sinkRecord) {
Object value = sinkRecord.value();
if (value instanceof Struct) {
final Schema valueSchema = sinkRecord.valueSchema();
final Struct struct = (Struct) value;
Map<String, String> partitionValueMap = new LinkedHashMap<>();
String partitionValue;

StringBuilder builder = new StringBuilder();
for (String fieldName : fieldNames) {
if (builder.length() > 0) {
builder.append(this.delim);
}
for (String fieldName : fieldNames) {
log.debug("Extracting partition field '{}'.", fieldName);
if (value instanceof Struct) {
final Schema valueSchema = sinkRecord.valueSchema();
log.trace("Extracting partition field '{}' from struct '{}'.", fieldName, valueSchema);
partitionValue = getPartitionValue((Struct) value, fieldName, valueSchema);
} else if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
log.trace("Extracting partition field '{}' from map '{}'.", fieldName, map);
partitionValue = getPartitionValue(map, fieldName, null);
} else {
log.error("Value is not of Struct or Map type.");
throw new PartitionException("Error encoding partition.");
}
partitionValueMap.put(fieldName, partitionValue);
}
return Utils.mkString(partitionValueMap, "", "", "=", delim);
}

Object partitionKey = struct.get(fieldName);
Type type = valueSchema.field(fieldName).schema().type();
switch (type) {
case INT8:
case INT16:
case INT32:
case INT64:
Number record = (Number) partitionKey;
builder.append(fieldName + "=" + record.toString());
break;
case STRING:
builder.append(fieldName + "=" + (String) partitionKey);
break;
case BOOLEAN:
boolean booleanRecord = (boolean) partitionKey;
builder.append(fieldName + "=" + Boolean.toString(booleanRecord));
break;
default:
log.error("Type {} is not supported as a partition key.", type.getName());
throw new PartitionException("Error encoding partition.");
}
private String getPartitionValue(Object structOrMap, String fieldName, Schema valueSchema) {
Object partitionValue = DataUtils.getNestedFieldValue(structOrMap, fieldName);

Type type = null;
if (valueSchema != null) {
Schema fieldSchema = DataUtils.getNestedField(valueSchema, fieldName).schema();
type = fieldSchema.type();
}

String partitionValueString = partitionValueToString(partitionValue, type);
if (partitionValueString == null) {
String typeName = null;
if (partitionValue != null) {
typeName = (type != null ? type.getName() : partitionValue.getClass().getCanonicalName());
}
return builder.toString();
} else {
log.error("Value is not Struct type.");
log.error("Type {} is not supported as a partition key.", typeName);
throw new PartitionException("Error encoding partition.");
}
return partitionValueString;
}

@SuppressWarnings("ConstantConditions")
private String partitionValueToString(Object partitionValue, Type type) {
boolean isNumericType = type == Type.INT8
|| type == Type.INT16
|| type == Type.INT32
|| type == Type.INT64;
if (partitionValue instanceof Number || isNumericType) {
Number record = (Number) partitionValue;
return record.toString();
} else if (partitionValue instanceof String || type == Type.STRING) {
return (String) partitionValue;
} else if (partitionValue instanceof Boolean || type == Type.BOOLEAN) {
Boolean booleanRecord = (Boolean) partitionValue;
return Boolean.toString(booleanRecord);
}
return null;
}

@Override
Expand Down