Skip to content

Commit

Permalink
Enhance ValueWriter to better error messages
Browse files Browse the repository at this point in the history
Improve ValueWriter interface to allow the object causing the
serialization to fail to be properly reported
relates #370
  • Loading branch information
costin committed Feb 2, 2015
1 parent 464ec6a commit 14ead2c
Show file tree
Hide file tree
Showing 15 changed files with 442 additions and 346 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public CascadingValueWriter(boolean writeUnknownTypes) {

@SuppressWarnings("unchecked")
@Override
public boolean write(SinkCall<Object[], ?> sinkCall, Generator generator) {
public Result write(SinkCall<Object[], ?> sinkCall, Generator generator) {
Tuple tuple = CascadingUtils.coerceToString(sinkCall);
// consider names (in case of aliases these are already applied)
List<String> names = (List<String>) sinkCall.getContext()[0];
Expand All @@ -58,18 +58,20 @@ public boolean write(SinkCall<Object[], ?> sinkCall, Generator generator) {
for (int i = 0; i < tuple.size(); i++) {
String name = (i < names.size() ? names.get(i) : "tuple" + i);
// filter out fields
if (shouldKeep(generator.getParentPath(), name)) {
if (shouldKeep(generator.getParentPath(), name)) {
generator.writeFieldName(name);
Object object = tuple.getObject(i);
if (!jdkWriter.write(object, generator)) {
if (!(object instanceof Writable) || !writableWriter.write((Writable) object, generator)) {
return false;
Result result = jdkWriter.write(object, generator);
if (!result.isSuccesful()) {
if (object instanceof Writable) {
return writableWriter.write((Writable) object, generator);
}
return Result.FAILED(object);
}
}
}
generator.writeEndObject();
return true;
return Result.SUCCESFUL();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class HiveValueWriter extends FilteringValueWriter<HiveType> {

private final boolean writeUnknownTypes;
private final HiveWritableValueWriter writableWriter;
private final HiveWritableValueWriter writableWriter;
private FieldAlias alias;

public HiveValueWriter() {
Expand All @@ -50,33 +50,29 @@ public HiveValueWriter() {
}

@Override
public boolean write(HiveType type, Generator generator) {
boolean result = write(type.getObject(), type.getObjectInspector(), generator);
return result;
public Result write(HiveType type, Generator generator) {
return write(type.getObject(), type.getObjectInspector(), generator);
}

private boolean write(Object data, ObjectInspector oi, Generator generator) {
private Result write(Object data, ObjectInspector oi, Generator generator) {
if (data == null) {
generator.writeNull();
return true;
return Result.SUCCESFUL();
}

switch (oi.getCategory()) {
case PRIMITIVE:
Writable writable = (Writable) ((PrimitiveObjectInspector) oi).getPrimitiveWritableObject(data);

if (!writableWriter.write(writable, generator)) {
return false;
}
break;
return writableWriter.write(writable, generator);

case LIST: // or ARRAY
ListObjectInspector loi = (ListObjectInspector) oi;
generator.writeBeginArray();

for (int i = 0; i < loi.getListLength(data); i++) {
if (!write(loi.getListElement(data, i), loi.getListElementObjectInspector(), generator)) {
return false;
Result result = write(loi.getListElement(data, i), loi.getListElementObjectInspector(), generator);
if (!result.isSuccesful()) {
return result;
}
}
generator.writeEndArray();
Expand All @@ -94,10 +90,11 @@ private boolean write(Object data, ObjectInspector oi, Generator generator) {
String fieldName = entry.getKey().toString();

// filter out fields
if (shouldKeep(generator.getParentPath(), fieldName)) {
if (shouldKeep(generator.getParentPath(), fieldName)) {
generator.writeFieldName(alias.toES(entry.getKey().toString()));
if (!write(entry.getValue(), moi.getMapValueObjectInspector(), generator)) {
return false;
Result result = write(entry.getValue(), moi.getMapValueObjectInspector(), generator);
if (!result.isSuccesful()) {
return result;
}
}
}
Expand All @@ -112,10 +109,12 @@ private boolean write(Object data, ObjectInspector oi, Generator generator) {

generator.writeBeginObject();
for (StructField structField : refs) {
if (shouldKeep(generator.getParentPath(), structField.getFieldName())) {
if (shouldKeep(generator.getParentPath(), structField.getFieldName())) {
generator.writeFieldName(alias.toES(structField.getFieldName()));
if (!write(soi.getStructFieldData(data, structField), structField.getFieldObjectInspector(), generator)) {
return false;
Result result = write(soi.getStructFieldData(data, structField),
structField.getFieldObjectInspector(), generator);
if (!result.isSuccesful()) {
return result;
}
}
}
Expand All @@ -131,21 +130,21 @@ private boolean write(Object data, ObjectInspector oi, Generator generator) {
if (writeUnknownTypes) {
return handleUnknown(data, oi, generator);
}
return false;
return Result.FAILED(data);
}

return true;
return Result.SUCCESFUL();
}


protected boolean handleUnknown(Object value, ObjectInspector oi, Generator generator) {
return false;
protected Result handleUnknown(Object value, ObjectInspector oi, Generator generator) {
return org.elasticsearch.hadoop.serialization.builder.ValueWriter.Result.FAILED(value);
}

@Override
public void setSettings(Settings settings) {
super.setSettings(settings);
super.setSettings(settings);
alias = HiveUtils.alias(settings);
writableWriter.setSettings(settings);
writableWriter.setSettings(settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public HiveWritableValueWriter(boolean writeUnknownTypes) {
}

@Override
public boolean write(Writable writable, Generator generator) {
public Result write(Writable writable, Generator generator) {
if (writable instanceof ByteWritable) {
generator.writeNumber(((ByteWritable) writable).get());
}
Expand Down Expand Up @@ -83,7 +83,7 @@ else if (writable != null && HiveConstants.CHAR_WRITABLE.equals(writable.getClas
return super.write(writable, generator);
}

return true;
return Result.SUCCESFUL();
}

// use nested class to efficiently get a hold of the underlying Date object (w/o doing reparsing, etc...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ public WritableValueWriter(boolean writeUnknownTypes) {
this.writeUnknownTypes = writeUnknownTypes;
}

@Override
@SuppressWarnings({ "unchecked", "deprecation" })
public boolean write(Writable writable, Generator generator) {
public Result write(Writable writable, Generator generator) {
if (writable == null || writable instanceof NullWritable) {
generator.writeNull();
}
Expand Down Expand Up @@ -104,8 +105,9 @@ else if (writable instanceof MD5Hash) {
else if (writable instanceof ArrayWritable) {
generator.writeBeginArray();
for (Writable wrt : ((ArrayWritable) writable).get()) {
if (!write(wrt, generator)) {
return false;
Result result = write(wrt, generator);
if (!result.isSuccesful()) {
return result;
}
}
generator.writeEndArray();
Expand All @@ -117,12 +119,14 @@ else if (writable instanceof AbstractMapWritable) {
generator.writeBeginObject();
// ignore handling sets (which are just maps with null values)
for (Entry<Writable, Writable> entry : map.entrySet()) {
String fieldName = entry.getKey().toString();
if (shouldKeep(generator.getParentPath(), fieldName)) {
generator.writeFieldName(fieldName);
if (!write(entry.getValue(), generator)) {
return false;
}
String fieldName = entry.getKey().toString();
if (shouldKeep(generator.getParentPath(), fieldName)) {
generator.writeFieldName(fieldName);
Result result = write(entry.getValue(), generator);

if (!result.isSuccesful()) {
return result;
}
}
}
generator.writeEndObject();
Expand All @@ -131,13 +135,13 @@ else if (writable instanceof AbstractMapWritable) {
if (writeUnknownTypes) {
return handleUnknown(writable, generator);
}
return false;
return Result.FAILED(writable);
}
return true;
return Result.SUCCESFUL();
}

protected boolean handleUnknown(Writable value, Generator generator) {
protected Result handleUnknown(Writable value, Generator generator) {
generator.writeBinary(WritableUtils.toByteArray(value));
return true;
return Result.SUCCESFUL();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.elasticsearch.hadoop.serialization.EsHadoopSerializationException;
import org.elasticsearch.hadoop.serialization.Generator;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter.Result;
import org.elasticsearch.hadoop.serialization.json.JacksonJsonGenerator;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.FastByteArrayOutputStream;
Expand Down Expand Up @@ -49,8 +50,17 @@ public static ContentBuilder generate(OutputStream bos, ValueWriter writer) {

@SuppressWarnings("unchecked")
public ContentBuilder value(Object value) {
if (!writer.write(value, generator)) {
throw new EsHadoopSerializationException(String.format("Cannot handle type [%s], instance [%s] using writer [%s]", value.getClass(), value, writer));
Result result = writer.write(value, generator);
if (!result.isSuccesful()) {
String message = null;
if (value == result.unknownValue) {
message = String.format("Cannot handle type [%s], instance [%s] using writer [%s]", value.getClass(), value, writer);
}
else {
message = String.format("Cannot handle type [%s] within type [%s], instance [%s] within instance [%s] using writer [%s]",
result.unknownValue.getClass(), value.getClass(), result.unknownValue, value, writer);
}
throw new EsHadoopSerializationException(message);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public JdkValueWriter(boolean writeUnknownTypes) {
}

@Override
public boolean write(Object value, Generator generator) {
return doWrite(value, generator, null);
}
public Result write(Object value, Generator generator) {
return doWrite(value, generator, null);
}

protected boolean doWrite(Object value, Generator generator, String parentField) {
protected Result doWrite(Object value, Generator generator, String parentField) {
if (value == null) {
generator.writeNull();
}
Expand Down Expand Up @@ -94,31 +94,34 @@ else if (value instanceof byte[]) {
else if (value.getClass().isArray()) {
generator.writeBeginArray();
for (Object o : (Object[]) value) {
if (!doWrite(o, generator, parentField)) {
return false;
Result result = doWrite(o, generator, parentField);
if (!result.isSuccesful()) {
return result;
}
}
generator.writeEndArray();
}
else if (value instanceof Map) {
generator.writeBeginObject();
for (Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
String fieldName = entry.getKey().toString();
// filter out fields
if (shouldKeep(parentField, fieldName)) {
generator.writeFieldName(fieldName);
if (!doWrite(entry.getValue(), generator, fieldName)) {
return false;
}
String fieldName = entry.getKey().toString();
// filter out fields
if (shouldKeep(parentField, fieldName)) {
generator.writeFieldName(fieldName);
Result result = doWrite(entry.getValue(), generator, fieldName);
if (!result.isSuccesful()) {
return result;
}
}
}
generator.writeEndObject();
}
else if (value instanceof Iterable) {
generator.writeBeginArray();
for (Object o : (Iterable<?>) value) {
if (!doWrite(o, generator, parentField)) {
return false;
Result result = doWrite(o, generator, parentField);
if (!result.isSuccesful()) {
return result;
}
}
generator.writeEndArray();
Expand All @@ -140,13 +143,13 @@ else if (value instanceof Timestamp) {
if (writeUnknownTypes) {
return handleUnknown(value, generator);
}
return false;
return Result.FAILED(value);
}
return true;
return Result.SUCCESFUL();
}

protected boolean handleUnknown(Object value, Generator generator) {
protected Result handleUnknown(Object value, Generator generator) {
generator.writeString(value.toString());
return true;
return Result.SUCCESFUL();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class NoOpValueWriter implements ValueWriter<Object> {

@Override
public boolean write(Object object, Generator generator) {
public Result write(Object object, Generator generator) {
throw new EsHadoopIllegalStateException("Incorrect configuration - NoOpValueWriter should not have been called");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,34 @@
*/
public interface ValueWriter<T> {

public final class Result {
private static final Result SUCCESFUL = new Result(null);

final Object unknownValue;

private Result(Object target) {
unknownValue = target;
}

public boolean isSuccesful() {
return SUCCESFUL == this;
}

public static Result SUCCESFUL() {
return SUCCESFUL;
}

public static Result FAILED(Object target) {
return new Result(target);
}
}

/**
* Returns true if the value was written, false otherwise.
*
* @param object
* @param generator
* @return true if the value was written, false otherwise
* @return {@link Result#SUCCESFUL()} if completed, {@link Result#FAILED(Object)} otherwise containing the failing target
*/
boolean write(T object, Generator generator);
Result write(T object, Generator generator);
}
Loading

0 comments on commit 14ead2c

Please sign in to comment.