Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CascadingValueWriter should also handle hadoop value types and not just jdk value types #110

Closed
jeoffreylim opened this issue Nov 26, 2013 · 2 comments

Comments

@jeoffreylim
Copy link

In some cases cascading tuple object can contain hadoop value types instead of plain jdk types. I encountered this in my cascading project and threw an exception of:

Caused by: org.elasticsearch.hadoop.serialization.SerializationException: Cannot handle type [class cascading.scheme.ConcreteCall], instance [cascading.scheme.ConcreteCall@5694fe42] using writer [org.elasticsearch.hadoop.cascading.CascadingValueWriter@4fc0cb76]

After some investigation, the problem is the JdkValueWriter does not successfully write the object to the sink due to the actual object from the tuple is of type class org.apache.hadoop.io.Text which should can be handled by WritableValueWriter.

With some minor fixes in the CascadingValueWriter this solved my problem:

public class CascadingValueWriter implements ValueWriter<SinkCall<Object[], ?>> {

private final ValueWriter<Object> jdkWriter;
private final ValueWriter<Writable> hadoopWriter;

public CascadingValueWriter() {
    this(false);
}

public CascadingValueWriter(boolean writeUnknownTypes) {
    jdkWriter = new JdkValueWriter(writeUnknownTypes);
    hadoopWriter = new WritableValueWriter(writeUnknownTypes);
}

@SuppressWarnings("unchecked")
@Override
public boolean write(SinkCall<Object[], ?> sinkCall, Generator generator) {
    Tuple tuple = sinkCall.getOutgoingEntry().getTuple();
    List<String> names = (List<String>) sinkCall.getContext()[0];

    generator.writeBeginObject();
    for (int i = 0; i < tuple.size(); i++) {
        String name = (i < names.size() ? names.get(i) : "tuple" + i);
        generator.writeFieldName(name);
        if (!jdkWriter.write(tuple.getObject(i), generator)) {
            //System.out.println("UNABLE TO WRITE - " + tuple.getObject(i) + " of type class=" + tuple.getObject(i).getClass());
            Object obj = tuple.getObject(i);
            if (obj instanceof Writable && hadoopWriter.write((Writable)obj, generator)) {
                continue;
            }
            return false;
        }
    }
    generator.writeEndObject();
    return true;
}

}

@costin costin closed this as completed in 113ab42 Nov 26, 2013
@costin
Copy link
Member

costin commented Nov 26, 2013

Thanks for catching and reporting this. I've pushed a fix in master and also a nightly build (#93).
Please try it out and let me know how it works for you.

Cheers,

@jeoffreylim
Copy link
Author

Works like a charm, thanks a lot :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants