Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add Ptype#getDetachedValue

Add getDetachedValue to PType to allow creating deep copies of
values in reducer-based DoFns. A side-effect of this is that PType
now extends Serializable.

Also fixes the bug in Aggregate#collectValues that caused the same
value to be collected multiple times in the case of custom
Writables or AvroTypes.
  • Loading branch information...
commit 7397d98a59f64df3bf7fbda57e1ddcfdf37b9487 1 parent a69bda8
@gabrielreid gabrielreid authored
Showing with 744 additions and 26 deletions.
  1. +4 −0 src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
  2. +7 −2 src/main/java/com/cloudera/crunch/lib/Aggregate.java
  3. +51 −4 src/main/java/com/cloudera/crunch/lib/PTables.java
  4. +27 −9 src/main/java/com/cloudera/crunch/types/PType.java
  5. +134 −0 src/main/java/com/cloudera/crunch/types/avro/AvroDeepCopier.java
  6. +6 −0 src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java
  7. +6 −0 src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
  8. +27 −1 src/main/java/com/cloudera/crunch/types/avro/AvroType.java
  9. +4 −0 src/main/java/com/cloudera/crunch/types/avro/Avros.java
  10. +6 −0 src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java
  11. +6 −0 src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java
  12. +16 −1 src/main/java/com/cloudera/crunch/types/writable/WritableType.java
  13. +36 −4 src/main/java/com/cloudera/crunch/types/writable/Writables.java
  14. +104 −0 src/test/java/com/cloudera/crunch/lib/AggregateTest.java
  15. +58 −0 src/test/java/com/cloudera/crunch/types/avro/AvroDeepCopierTest.java
  16. +41 −0 src/test/java/com/cloudera/crunch/types/avro/AvroGroupedTableTypeTest.java
  17. +35 −0 src/test/java/com/cloudera/crunch/types/avro/AvroTableTypeTest.java
  18. +52 −0 src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
  19. +17 −3 src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
  20. +37 −0 src/test/java/com/cloudera/crunch/types/writable/WritableGroupedTableTypeTest.java
  21. +30 −0 src/test/java/com/cloudera/crunch/types/writable/WritableTableTypeTest.java
  22. +29 −0 src/test/java/com/cloudera/crunch/types/writable/WritableTypeTest.java
  23. +11 −2 src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java
View
4 src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
@@ -12,6 +12,10 @@ public CrunchRuntimeException(Exception e) {
super(e);
}
+ public CrunchRuntimeException(String msg, Exception e) {
+ super(msg, e);
+ }
+
public boolean wasLogged() {
return logged;
}
View
9 src/main/java/com/cloudera/crunch/lib/Aggregate.java
@@ -224,9 +224,14 @@ public void process(Pair<Boolean, Iterable<S>> input,
public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) {
PTypeFamily tf = collect.getTypeFamily();
+ final PType<V> valueType = collect.getValueType();
return collect.groupByKey().parallelDo("collect", new MapValuesFn<K, Iterable<V>, Collection<V>>() {
- public Collection<V> map(Iterable<V> v) {
- return Lists.newArrayList(v);
+ public Collection<V> map(Iterable<V> values) {
+ List<V> collected = Lists.newArrayList();
+ for (V value : values) {
+ collected.add(valueType.getDetachedValue(value));
+ }
+ return collected;
}
}, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType())));
}
View
55 src/main/java/com/cloudera/crunch/lib/PTables.java
@@ -14,15 +14,23 @@
*/
package com.cloudera.crunch.lib;
+import java.util.List;
+
+import org.apache.hadoop.thirdparty.guava.common.collect.Lists;
+
import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.PGroupedTable;
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PGroupedTableType;
+import com.cloudera.crunch.types.PTableType;
+import com.cloudera.crunch.types.PType;
/**
* Methods for performing common operations on PTables.
- *
+ *
*/
public class PTables {
@@ -31,16 +39,55 @@
@Override
public void process(Pair<K, V> input, Emitter<K> emitter) {
emitter.emit(input.first());
- }
+ }
}, ptable.getKeyType());
}
-
+
public static <K, V> PCollection<V> values(PTable<K, V> ptable) {
return ptable.parallelDo("PTables.values", new DoFn<Pair<K, V>, V>() {
@Override
public void process(Pair<K, V> input, Emitter<V> emitter) {
emitter.emit(input.second());
- }
+ }
}, ptable.getValueType());
}
+
+ /**
+ * Create a detached value for a table {@link Pair}.
+ *
+ * @param tableType
+ * The table type
+ * @param value
+ * The value from which a detached value is to be created
+ * @return The detached value
+ * @see PType#getDetachedValue(Object)
+ */
+ public static <K, V> Pair<K, V> getDetachedValue(PTableType<K, V> tableType, Pair<K, V> value) {
+ return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), tableType.getValueType()
+ .getDetachedValue(value.second()));
+ }
+
+ /**
+ * Created a detached value for a {@link PGroupedTable} value.
+ *
+ *
+ * @param groupedTableType
+ * The grouped table type
+ * @param value
+ * The value from which a detached value is to be created
+ * @return The detached value
+ * @see PType#getDetachedValue(Object)
+ */
+ public static <K, V> Pair<K, Iterable<V>> getGroupedDetachedValue(
+ PGroupedTableType<K, V> groupedTableType, Pair<K, Iterable<V>> value) {
+
+ PTableType<K, V> tableType = groupedTableType.getTableType();
+ List<V> detachedIterable = Lists.newArrayList();
+ PType<V> valueType = tableType.getValueType();
+ for (V v : value.second()) {
+ detachedIterable.add(valueType.getDetachedValue(v));
+ }
+ return Pair.of(tableType.getKeyType().getDetachedValue(value.first()),
+ (Iterable<V>) detachedIterable);
+ }
}
View
36 src/main/java/com/cloudera/crunch/types/PType.java
@@ -15,10 +15,12 @@
package com.cloudera.crunch.types;
+import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.fs.Path;
+import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.SourceTarget;
@@ -29,34 +31,50 @@
* read/write data from/to HDFS. Every {@link PCollection} has an associated
* {@code PType} that tells Crunch how to read/write data from that
* {@code PCollection}.
- *
+ *
*/
-public interface PType<T> {
+public interface PType<T> extends Serializable {
/**
* Returns the Java type represented by this {@code PType}.
*/
Class<T> getTypeClass();
-
+
/**
* Returns the {@code PTypeFamily} that this {@code PType} belongs to.
*/
PTypeFamily getFamily();
MapFn<Object, T> getInputMapFn();
-
+
MapFn<T, Object> getOutputMapFn();
-
+
Converter getConverter();
-
+
+ /**
+ * Returns a copy of a value (or the value itself) that can safely be
+ * retained.
+ * <p>
+ * This is useful when iterable values being processed in a DoFn (via a
+ * reducer) need to be held on to for more than the scope of a single
+ * iteration, as a reducer (and therefore also a DoFn that has an Iterable as
+ * input) re-use deserialized values. More information on object reuse is
+ * available in the {@link DoFn} class documentation.
+ *
+ * @param value
+ * The value to be deep-copied
+ * @return A deep copy of the input value
+ */
+ T getDetachedValue(T value);
+
/**
* Returns a {@code SourceTarget} that is able to read/write data using the
* serialization format specified by this {@code PType}.
*/
SourceTarget<T> getDefaultFileSource(Path path);
-
+
/**
- * Returns the sub-types that make up this PType if it is a composite instance,
- * such as a tuple.
+ * Returns the sub-types that make up this PType if it is a composite
+ * instance, such as a tuple.
*/
List<PType> getSubTypes();
}
View
134 src/main/java/com/cloudera/crunch/types/avro/AvroDeepCopier.java
@@ -0,0 +1,134 @@
+package com.cloudera.crunch.types.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
+
+/**
+ * Performs deep copies of Avro-serializable objects.
+ * <p>
+ * <b>Warning:</b> Methods in this class are not thread-safe. This shouldn't be
+ * a problem when running in a map-reduce context where each mapper/reducer is
+ * running in its own JVM, but it may well be a problem in any other kind of
+ * multi-threaded context.
+ */
+public abstract class AvroDeepCopier<T> implements Serializable {
+
+ private BinaryEncoder binaryEncoder;
+ private BinaryDecoder binaryDecoder;
+ protected DatumWriter<T> datumWriter;
+ protected DatumReader<T> datumReader;
+
+ protected AvroDeepCopier(DatumWriter<T> datumWriter, DatumReader<T> datumReader) {
+ this.datumWriter = datumWriter;
+ this.datumReader = datumReader;
+ }
+
+ protected abstract T createCopyTarget();
+
+ /**
+ * Deep copier for Avro specific data objects.
+ */
+ public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> {
+
+ private Class<T> valueClass;
+
+ public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
+ super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader(schema));
+ this.valueClass = valueClass;
+ }
+
+ @Override
+ protected T createCopyTarget() {
+ return createNewInstance(valueClass);
+ }
+
+ }
+
+ /**
+ * Deep copier for Avro generic data objects.
+ */
+ public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
+
+ private Schema schema;
+
+ public AvroGenericDeepCopier(Schema schema) {
+ super(new GenericDatumWriter<Record>(schema), new GenericDatumReader<Record>(schema));
+ this.schema = schema;
+ }
+
+ @Override
+ protected Record createCopyTarget() {
+ return new GenericData.Record(schema);
+ }
+ }
+
+ /**
+ * Deep copier for Avro reflect data objects.
+ */
+ public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
+ private Class<T> valueClass;
+
+ public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
+ super(new ReflectDatumWriter<T>(schema), new ReflectDatumReader<T>(schema));
+ this.valueClass = valueClass;
+ }
+
+ @Override
+ protected T createCopyTarget() {
+ return createNewInstance(valueClass);
+ }
+ }
+
+ /**
+ * Create a deep copy of an Avro value.
+ *
+ * @param source
+ * The value to be copied
+ * @return The deep copy of the value
+ */
+ public T deepCopy(T source) {
+ ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+ binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
+ T target = createCopyTarget();
+ try {
+ datumWriter.write(source, binaryEncoder);
+ binaryEncoder.flush();
+ binaryDecoder = DecoderFactory.get()
+ .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+ datumReader.read(target, binaryDecoder);
+ } catch (Exception e) {
+ throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
+ }
+
+ return target;
+ }
+
+ protected T createNewInstance(Class<T> targetClass) {
+ try {
+ return targetClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new CrunchRuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+
+}
View
6 src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java
@@ -27,6 +27,7 @@
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.Pair;
import com.cloudera.crunch.fn.PairMapFn;
+import com.cloudera.crunch.lib.PTables;
import com.cloudera.crunch.types.Converter;
import com.cloudera.crunch.types.PGroupedTableType;
@@ -71,6 +72,11 @@ public MapFn getOutputMapFn() {
}
@Override
+ public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
+ return PTables.getGroupedDetachedValue(this, value);
+ }
+
+ @Override
public void configureShuffle(Job job, GroupingOptions options) {
AvroTableType<K, V> att = (AvroTableType<K, V>) tableType;
String schemaJson = att.getSchema().toString();
View
6 src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
@@ -20,6 +20,7 @@
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.lib.PTables;
import com.cloudera.crunch.types.PGroupedTableType;
import com.cloudera.crunch.types.PTableType;
import com.cloudera.crunch.types.PType;
@@ -155,4 +156,9 @@ public boolean isGeneric() {
public PGroupedTableType<K, V> getGroupedTableType() {
return new AvroGroupedTableType<K, V>(this);
}
+
+ @Override
+ public Pair<K, V> getDetachedValue(Pair<K, V> value) {
+ return PTables.getDetachedValue(this, value);
+ }
}
View
28 src/main/java/com/cloudera/crunch/types/avro/AvroType.java
@@ -41,10 +41,12 @@
private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
private final Class<T> typeClass;
- private final Schema schema;
+ private final String schemaString;
+ private transient Schema schema;
private final MapFn baseInputMapFn;
private final MapFn baseOutputMapFn;
private final List<PType> subTypes;
+ private AvroDeepCopier<T> deepCopier;
public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
this(typeClass, schema, IdentityFn.getInstance(), IdentityFn
@@ -55,6 +57,7 @@ public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn,
MapFn outputMapFn, PType... ptypes) {
this.typeClass = typeClass;
this.schema = Preconditions.checkNotNull(schema);
+ this.schemaString = schema.toString();
this.baseInputMapFn = inputMapFn;
this.baseOutputMapFn = outputMapFn;
this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
@@ -76,6 +79,9 @@ public PTypeFamily getFamily() {
}
public Schema getSchema() {
+ if (schema == null) {
+ schema = new Schema.Parser().parse(schemaString);
+ }
return schema;
}
@@ -123,6 +129,26 @@ public Converter getConverter() {
return new AvroFileSourceTarget<T>(path, this);
}
+ private AvroDeepCopier<T> getDeepCopier() {
+ if (deepCopier == null) {
+ if (isSpecific()) {
+ deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema());
+ } else if (isGeneric()) {
+ deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema());
+ } else {
+ deepCopier = new AvroDeepCopier.AvroReflectDeepCopier<T>(typeClass, getSchema());
+ }
+ }
+ return deepCopier;
+ }
+
+ public T getDetachedValue(T value) {
+ if (this.baseInputMapFn instanceof IdentityFn && !Avros.isPrimitive(this)) {
+ return getDeepCopier().deepCopy(value);
+ }
+ return value;
+ }
+
@Override
public boolean equals(Object other) {
if (other == null || !(other instanceof AvroType)) {
View
4 src/main/java/com/cloudera/crunch/types/avro/Avros.java
@@ -142,6 +142,10 @@ public ByteBuffer map(Object input) {
return (PType<T>) PRIMITIVES.get(clazz);
}
+ static <T> boolean isPrimitive(AvroType<T> avroType) {
+ return PRIMITIVES.containsKey(avroType.getTypeClass());
+ }
+
private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
return new AvroType<T>(clazz, Schema.create(schemaType));
}
View
6 src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java
@@ -19,6 +19,7 @@
import com.cloudera.crunch.GroupingOptions;
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.lib.PTables;
import com.cloudera.crunch.types.Converter;
import com.cloudera.crunch.types.PGroupedTableType;
@@ -60,6 +61,11 @@ public MapFn getOutputMapFn() {
}
@Override
+ public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
+ return PTables.getGroupedDetachedValue(this, value);
+ }
+
+ @Override
public void configureShuffle(Job job, GroupingOptions options) {
if (options != null) {
options.configure(job);
View
6 src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java
@@ -25,6 +25,7 @@
import com.cloudera.crunch.SourceTarget;
import com.cloudera.crunch.fn.PairMapFn;
import com.cloudera.crunch.io.seq.SeqFileTableSourceTarget;
+import com.cloudera.crunch.lib.PTables;
import com.cloudera.crunch.types.Converter;
import com.cloudera.crunch.types.PGroupedTableType;
import com.cloudera.crunch.types.PTableType;
@@ -101,6 +102,11 @@ public PTypeFamily getFamily() {
}
@Override
+ public Pair<K, V> getDetachedValue(Pair<K, V> value) {
+ return PTables.getDetachedValue(this, value);
+ }
+
+ @Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof WritableTableType)) {
return false;
View
17 src/main/java/com/cloudera/crunch/types/writable/WritableType.java
@@ -18,16 +18,18 @@
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.SourceTarget;
+import com.cloudera.crunch.fn.IdentityFn;
import com.cloudera.crunch.io.seq.SeqFileSourceTarget;
import com.cloudera.crunch.types.Converter;
import com.cloudera.crunch.types.PType;
import com.cloudera.crunch.types.PTypeFamily;
import com.google.common.collect.ImmutableList;
-public class WritableType<T, W> implements PType<T> {
+public class WritableType<T, W extends Writable> implements PType<T> {
private final Class<T> typeClass;
private final Class<W> writableClass;
@@ -95,6 +97,19 @@ public boolean equals(Object obj) {
subTypes.equals(wt.subTypes));
}
+ // Unchecked warnings are suppressed because we know that W and T are the same
+ // type (due to the IdentityFn being used)
+ @SuppressWarnings("unchecked")
+ @Override
+ public T getDetachedValue(T value) {
+ if (this.inputFn.getClass().equals(IdentityFn.class)) {
+ W writableValue = (W) value;
+ return (T) Writables.deepCopy(writableValue, this.writableClass);
+ } else {
+ return value;
+ }
+ }
+
@Override
public int hashCode() {
HashCodeBuilder hcb = new HashCodeBuilder();
View
40 src/main/java/com/cloudera/crunch/types/writable/Writables.java
@@ -14,6 +14,11 @@
*/
package com.cloudera.crunch.types.writable;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
@@ -39,6 +44,7 @@
import com.cloudera.crunch.TupleN;
import com.cloudera.crunch.fn.CompositeMapFn;
import com.cloudera.crunch.fn.IdentityFn;
+import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
import com.cloudera.crunch.types.PType;
import com.cloudera.crunch.types.TupleFactory;
import com.cloudera.crunch.util.PTypes;
@@ -207,7 +213,7 @@ public BytesWritable map(ByteBuffer input) {
return (PType<T>) PRIMITIVES.get(clazz);
}
- public static <T> void register(Class<T> clazz, WritableType<T, ?> ptype) {
+ public static <T> void register(Class<T> clazz, WritableType<T, ? extends Writable> ptype) {
EXTENSIONS.put(clazz, ptype);
}
@@ -243,11 +249,11 @@ public BytesWritable map(ByteBuffer input) {
return bytes;
}
- public static final <T> WritableType<T, T> records(Class<T> clazz) {
+ public static final <T, W extends Writable> WritableType<T, W> records(Class<T> clazz) {
if (EXTENSIONS.containsKey(clazz)) {
- return (WritableType<T, T>) EXTENSIONS.get(clazz);
+ return (WritableType<T, W>) EXTENSIONS.get(clazz);
}
- return (WritableType<T, T>) writables(clazz.asSubclass(Writable.class));
+ return (WritableType<T, W>) writables(clazz.asSubclass(Writable.class));
}
public static <W extends Writable> WritableType<W, W> writables(Class<W> clazz) {
@@ -593,6 +599,32 @@ public void initialize() {
return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());
}
+ /**
+ * Perform a deep copy of a writable value.
+ *
+ * @param value
+ * The value to be copied
+ * @param writableClass
+ * The Writable class of the value to be copied
+ * @return A fully detached deep copy of the input value
+ */
+ public static <T extends Writable> T deepCopy(T value, Class<T> writableClass) {
+ ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(byteOutStream);
+ T copiedValue = null;
+ try {
+ value.write(dataOut);
+ dataOut.flush();
+ ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+ DataInput dataInput = new DataInputStream(byteInStream);
+ copiedValue = writableClass.newInstance();
+ copiedValue.readFields(dataInput);
+ } catch (Exception e) {
+ throw new CrunchRuntimeException("Error while deep copying " + value, e);
+ }
+ return copiedValue;
+ }
+
// Not instantiable
private Writables() {
}
View
104 src/test/java/com/cloudera/crunch/lib/AggregateTest.java
@@ -19,8 +19,11 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.Collection;
+import java.util.Map;
+import org.apache.hadoop.io.Text;
import org.junit.Test;
import com.cloudera.crunch.MapFn;
@@ -30,14 +33,17 @@
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.impl.mem.MemPipeline;
import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.test.Employee;
import com.cloudera.crunch.test.FileHelper;
import com.cloudera.crunch.types.PTableType;
import com.cloudera.crunch.types.PTypeFamily;
import com.cloudera.crunch.types.avro.AvroTypeFamily;
import com.cloudera.crunch.types.avro.Avros;
import com.cloudera.crunch.types.writable.WritableTypeFamily;
+import com.cloudera.crunch.types.writable.Writables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
public class AggregateTest {
@@ -122,4 +128,102 @@ public Integer map(Integer input) {
PTable<String, Integer> bottom2 = Aggregate.top(counts, 2, false);
assertEquals(ImmutableList.of(Pair.of("foo", 12), Pair.of("bar", 17)), bottom2.materialize());
}
+
+ @Test
+ public void testCollectValues_Writables() throws IOException {
+ Pipeline pipeline = new MRPipeline(AggregateTest.class);
+ Map<Integer, Collection<Text>> collectionMap = pipeline
+ .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+ .parallelDo(new MapStringToTextPair(),
+ Writables.tableOf(Writables.ints(), Writables.writables(Text.class))
+ ).collectValues().materializeToMap();
+
+ assertEquals(1, collectionMap.size());
+
+ assertEquals(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a")),
+ collectionMap.get(1));
+ }
+
+ @Test
+ public void testCollectValues_Avro() throws IOException {
+
+ MapStringToEmployeePair mapFn = new MapStringToEmployeePair();
+ Pipeline pipeline = new MRPipeline(AggregateTest.class);
+ Map<Integer, Collection<Employee>> collectionMap = pipeline
+ .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+ .parallelDo(mapFn,
+ Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues()
+ .materializeToMap();
+
+ assertEquals(1, collectionMap.size());
+
+ Employee empC = mapFn.map("c").second();
+ Employee empD = mapFn.map("d").second();
+ Employee empA = mapFn.map("a").second();
+
+ assertEquals(Lists.newArrayList(empC, empD, empA),
+ collectionMap.get(1));
+ }
+
+ private static class MapStringToTextPair extends MapFn<String, Pair<Integer, Text>> {
+ @Override
+ public Pair<Integer, Text> map(String input) {
+ return Pair.of(1, new Text(input));
+ }
+ }
+
+ private static class MapStringToEmployeePair extends MapFn<String, Pair<Integer, Employee>> {
+ @Override
+ public Pair<Integer, Employee> map(String input) {
+ Employee emp = new Employee();
+ emp.setName(input);
+ emp.setSalary(0);
+ emp.setDepartment("");
+ return Pair.of(1, emp);
+ }
+ }
+
+ public static class PojoText {
+ private String value;
+
+ public PojoText() {
+ this("");
+ }
+
+ public PojoText(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("PojoText<%s>", this.value);
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PojoText other = (PojoText) obj;
+ if (value == null) {
+ if (other.value != null)
+ return false;
+ } else if (!value.equals(other.value))
+ return false;
+ return true;
+ }
+
+ }
}
View
58 src/test/java/com/cloudera/crunch/types/avro/AvroDeepCopierTest.java
@@ -0,0 +1,58 @@
+package com.cloudera.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+
+import org.apache.avro.generic.GenericData.Record;
+import org.junit.Test;
+
+import com.cloudera.crunch.test.Person;
+import com.cloudera.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier;
+import com.google.common.collect.Lists;
+
+public class AvroDeepCopierTest {
+
+ @Test
+ public void testDeepCopySpecific() {
+ Person person = new Person();
+ person.setName("John Doe");
+ person.setAge(42);
+ person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+ Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
+ .deepCopy(person);
+
+ assertEquals(person, deepCopyPerson);
+ assertNotSame(person, deepCopyPerson);
+ }
+
+ @Test
+ public void testDeepCopyGeneric() {
+ Record record = new Record(Person.SCHEMA$);
+ record.put("name", "John Doe");
+ record.put("age", 42);
+ record.put("siblingnames", Lists.newArrayList());
+
+ Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$)
+ .deepCopy(record);
+
+ assertEquals(record, deepCopyRecord);
+ assertNotSame(record, deepCopyRecord);
+ }
+
+ @Test
+ public void testDeepCopyReflect() {
+ Person person = new Person();
+ person.setName("John Doe");
+ person.setAge(42);
+ person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+ Person deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<Person>(Person.class,
+ Person.SCHEMA$).deepCopy(person);
+
+ assertEquals(person, deepCopyPerson);
+ assertNotSame(person, deepCopyPerson);
+
+ }
+
+}
View
41 src/test/java/com/cloudera/crunch/types/avro/AvroGroupedTableTypeTest.java
@@ -0,0 +1,41 @@
+package com.cloudera.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.test.Person;
+import com.cloudera.crunch.types.PGroupedTableType;
+import com.google.common.collect.Lists;
+
+public class AvroGroupedTableTypeTest {
+
+ @Test
+ public void testGetDetachedValue() {
+ Integer integerValue = 42;
+ Person person = new Person();
+ person.setName("John Doe");
+ person.setAge(42);
+ person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+ Iterable<Person> inputPersonIterable = Lists.newArrayList(person);
+ Pair<Integer, Iterable<Person>> pair = Pair.of(integerValue, inputPersonIterable);
+
+ PGroupedTableType<Integer, Person> groupedTableType = Avros.tableOf(Avros.ints(),
+ Avros.reflects(Person.class)).getGroupedTableType();
+
+ Pair<Integer, Iterable<Person>> detachedPair = groupedTableType.getDetachedValue(pair);
+
+ assertSame(integerValue, detachedPair.first());
+ List<Person> personList = Lists.newArrayList(detachedPair.second());
+ assertEquals(inputPersonIterable, personList);
+ assertNotSame(person, personList.get(0));
+
+ }
+
+}
View
35 src/test/java/com/cloudera/crunch/types/avro/AvroTableTypeTest.java
@@ -0,0 +1,35 @@
+package com.cloudera.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.test.Person;
+import com.google.common.collect.Lists;
+
+public class AvroTableTypeTest {
+
+ @Test
+ public void testGetDetachedValue() {
+ Integer integerValue = 42;
+ Person person = new Person();
+ person.setName("John Doe");
+ person.setAge(42);
+ person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+ Pair<Integer, Person> pair = Pair.of(integerValue, person);
+
+ AvroTableType<Integer, Person> tableType = Avros.tableOf(Avros.ints(),
+ Avros.reflects(Person.class));
+
+ Pair<Integer, Person> detachedPair = tableType.getDetachedValue(pair);
+
+ assertSame(integerValue, detachedPair.first());
+ assertEquals(person, detachedPair.second());
+ assertNotSame(person, detachedPair.second());
+ }
+
+}
View
52 src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
@@ -1,8 +1,14 @@
package com.cloudera.crunch.types.avro;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.hadoop.thirdparty.guava.common.collect.Lists;
import org.junit.Test;
import com.cloudera.crunch.test.Person;
@@ -63,4 +69,50 @@ public void testIsGeneric_GenericAvroTable() {
Avros.generics(Person.SCHEMA$)).isGeneric());
}
+ @Test
+ public void testGetDetachedValue_AlreadyMappedAvroType() {
+ Integer value = 42;
+ Integer detachedValue = Avros.ints().getDetachedValue(value);
+ assertSame(value, detachedValue);
+ }
+
+ @Test
+ public void testGetDetachedValue_GenericAvroType() {
+ AvroType<Record> genericType = Avros.generics(Person.SCHEMA$);
+ GenericData.Record record = new GenericData.Record(Person.SCHEMA$);
+ record.put("name", "name value");
+ record.put("age", 42);
+ record.put("siblingnames", Lists.newArrayList());
+
+ Record detachedRecord = genericType.getDetachedValue(record);
+ assertEquals(record, detachedRecord);
+ assertNotSame(record, detachedRecord);
+ }
+
+ @Test
+ public void testGetDetachedValue_SpecificAvroType() {
+ AvroType<Person> specificType = Avros.records(Person.class);
+ Person person = new Person();
+ person.setName("name value");
+ person.setAge(42);
+ person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+ Person detachedPerson = specificType.getDetachedValue(person);
+ assertEquals(person, detachedPerson);
+ assertNotSame(person, detachedPerson);
+ }
+
+ @Test
+ public void testGetDetachedValue_ReflectAvroType() {
+ AvroType<Person> reflectType = Avros.reflects(Person.class);
+ Person person = new Person();
+ person.setName("name value");
+ person.setAge(42);
+ person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+ Person detachedPerson = reflectType.getDetachedValue(person);
+ assertEquals(person, detachedPerson);
+ assertNotSame(person, detachedPerson);
+ }
+
}
View
20 src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
@@ -15,7 +15,9 @@
package com.cloudera.crunch.types.avro;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.Collection;
@@ -31,6 +33,7 @@
import com.cloudera.crunch.Tuple3;
import com.cloudera.crunch.Tuple4;
import com.cloudera.crunch.TupleN;
+import com.cloudera.crunch.test.Person;
import com.cloudera.crunch.types.PTableType;
import com.cloudera.crunch.types.PType;
import com.google.common.collect.ImmutableList;
@@ -104,9 +107,9 @@ public void testCollections() throws Exception {
@Test
public void testNestedTables() throws Exception {
- PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), Avros.longs());
- String schema = Avros.tableOf(pll, Avros.strings()).getSchema().toString();
- assertNotNull(schema);
+ PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), Avros.longs());
+ String schema = Avros.tableOf(pll, Avros.strings()).getSchema().toString();
+ assertNotNull(schema);
}
@Test
@@ -204,4 +207,15 @@ protected static void testInputOutputFn(PType ptype, Object java, Object avro) {
assertEquals(java, ptype.getInputMapFn().map(avro));
assertEquals(avro, ptype.getOutputMapFn().map(java));
}
+
+ @Test
+ public void testIsPrimitive_True() {
+ assertTrue(Avros.isPrimitive(Avros.ints()));
+ }
+
+ @Test
+ public void testIsPrimitive_False() {
+ assertFalse(Avros.isPrimitive(Avros.reflects(Person.class)));
+ }
+
}
View
37 src/test/java/com/cloudera/crunch/types/writable/WritableGroupedTableTypeTest.java
@@ -0,0 +1,37 @@
+package com.cloudera.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PGroupedTableType;
+import com.google.common.collect.Lists;
+
+public class WritableGroupedTableTypeTest {
+
+ @Test
+ public void testGetDetachedValue() {
+ Integer integerValue = 42;
+ Text textValue = new Text("forty-two");
+ Iterable<Text> inputTextIterable = Lists.newArrayList(textValue);
+ Pair<Integer, Iterable<Text>> pair = Pair.of(integerValue, inputTextIterable);
+
+ PGroupedTableType<Integer, Text> groupedTableType = Writables.tableOf(Writables.ints(), Writables.writables(Text.class))
+ .getGroupedTableType();
+
+ Pair<Integer, Iterable<Text>> detachedPair = groupedTableType.getDetachedValue(pair);
+
+ assertSame(integerValue, detachedPair.first());
+ List<Text> textList = Lists.newArrayList(detachedPair.second());
+ assertEquals(inputTextIterable, textList);
+ assertNotSame(textValue, textList.get(0));
+
+ }
+
+}
View
30 src/test/java/com/cloudera/crunch/types/writable/WritableTableTypeTest.java
@@ -0,0 +1,30 @@
+package com.cloudera.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.cloudera.crunch.Pair;
+
+public class WritableTableTypeTest {
+
+ @Test
+ public void testGetDetachedValue() {
+ Integer integerValue = 42;
+ Text textValue = new Text("forty-two");
+ Pair<Integer, Text> pair = Pair.of(integerValue, textValue);
+
+ WritableTableType<Integer, Text> tableType = Writables.tableOf(Writables.ints(),
+ Writables.writables(Text.class));
+
+ Pair<Integer, Text> detachedPair = tableType.getDetachedValue(pair);
+
+ assertSame(integerValue, detachedPair.first());
+ assertEquals(textValue, detachedPair.second());
+ assertNotSame(textValue, detachedPair.second());
+ }
+
+}
View
29 src/test/java/com/cloudera/crunch/types/writable/WritableTypeTest.java
@@ -0,0 +1,29 @@
+package com.cloudera.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class WritableTypeTest {
+
+ @Test
+ public void testGetDetachedValue_AlreadyMappedWritable() {
+ WritableType<String, Text> stringType = Writables.strings();
+ String value = "test";
+ assertSame(value, stringType.getDetachedValue(value));
+ }
+
+ @Test
+ public void testGetDetachedValue_CustomWritable() {
+ WritableType<Text, Text> textWritableType = Writables.writables(Text.class);
+ Text value = new Text("test");
+
+ Text detachedValue = textWritableType.getDetachedValue(value);
+ assertEquals(value, detachedValue);
+ assertNotSame(value, detachedValue);
+ }
+
+}
View
13 src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java
@@ -16,7 +16,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
import java.io.DataInput;
import java.io.DataOutput;
@@ -254,7 +255,7 @@ public void testTableOf() throws Exception {
public void testRegister() throws Exception {
WritableType<TestWritable, TestWritable> wt = Writables.writables(TestWritable.class);
Writables.register(TestWritable.class, wt);
- assertTrue(Writables.records(TestWritable.class) == wt);
+ assertSame(Writables.records(TestWritable.class), wt);
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -264,4 +265,12 @@ protected static void testInputOutputFn(PType ptype, Object java, Object writabl
assertEquals(java, ptype.getInputMapFn().map(writable));
assertEquals(writable, ptype.getOutputMapFn().map(java));
}
+
+ @Test
+ public void testDeepCopy() {
+ Text text = new Text("Test");
+ Text copiedText = Writables.deepCopy(text, Text.class);
+ assertEquals(text, copiedText);
+ assertNotSame(text, copiedText);
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.