Permalink
Browse files

Detach iterated join values

Values being joined are typically re-used objects from a reducer's
iterator, meaning storing them in a local collection does not have
the desired behavior. The iterated values are now detached (i.e.
deep copied) in joins to get around this.
  • Loading branch information...
1 parent 0473d8e commit 3a3111546e788f4139fa291f018fe71bd3420d91 @gabrielreid gabrielreid committed Jul 6, 2012
@@ -59,7 +59,7 @@
* @return The joined result.
*/
public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right) {
- return join(left, right, new InnerJoinFn<K, U, V>());
+ return join(left, right, new InnerJoinFn<K, U, V>(left.getValueType()));
}
/**
@@ -75,7 +75,7 @@
* @return The joined result.
*/
public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right) {
- return join(left, right, new LeftOuterJoinFn<K, U, V>());
+ return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getValueType()));
}
/**
@@ -91,7 +91,7 @@
* @return The joined result.
*/
public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right) {
- return join(left, right, new RightOuterJoinFn<K, U, V>());
+ return join(left, right, new RightOuterJoinFn<K, U, V>(left.getValueType()));
}
/**
@@ -106,7 +106,7 @@
* @return The joined result.
*/
public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right) {
- return join(left, right, new FullOuterJoinFn<K, U, V>());
+ return join(left, right, new FullOuterJoinFn<K, U, V>(left.getValueType()));
}
public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right,
@@ -18,6 +18,7 @@
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PType;
import com.google.common.collect.Lists;
/**
@@ -33,6 +34,10 @@
private transient K lastKey;
private transient List<U> leftValues;
+ public FullOuterJoinFn(PType<U> leftValueType) {
+ super(leftValueType);
+ }
+
/** {@inheritDoc} */
@Override
public void initialize() {
@@ -58,7 +63,7 @@ public void join(K key, int id, Iterable<Pair<U, V>> pairs,
if (id == 0) {
for (Pair<U, V> pair : pairs) {
if (pair.first() != null)
- leftValues.add(pair.first());
+ leftValues.add(leftValueType.getDetachedValue(pair.first()));
}
} else {
for (Pair<U, V> pair : pairs) {
@@ -18,6 +18,7 @@
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PType;
import com.google.common.collect.Lists;
/**
@@ -28,15 +29,19 @@
* @param <V> Type of the second {@link com.cloudera.crunch.PTable}'s values
*/
public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
-
+
private transient K lastKey;
- private transient List<U> LeftValues;
+ private transient List<U> leftValues;
+
+ public InnerJoinFn(PType<U> leftValueType) {
+ super(leftValueType);
+ }
/** {@inheritDoc} */
@Override
public void initialize() {
lastKey = null;
- this.LeftValues = Lists.newArrayList();
+ this.leftValues = Lists.newArrayList();
}
/** {@inheritDoc} */
@@ -45,16 +50,16 @@ public void join(K key, int id, Iterable<Pair<U, V>> pairs,
Emitter<Pair<K, Pair<U, V>>> emitter) {
if (!key.equals(lastKey)) {
lastKey = key;
- LeftValues.clear();
+ leftValues.clear();
}
if (id == 0) { // from left
for (Pair<U, V> pair : pairs) {
if (pair.first() != null)
- LeftValues.add(pair.first());
+ leftValues.add(leftValueType.getDetachedValue(pair.first()));
}
} else { // from right
for (Pair<U, V> pair : pairs) {
- for (U u : LeftValues) {
+ for (U u : leftValues) {
emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
}
}
@@ -17,6 +17,7 @@
import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PType;
/**
* Represents a {@link com.cloudera.crunch.DoFn} for performing joins.
@@ -28,6 +29,19 @@
public abstract class JoinFn<K, U, V>
extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
+ protected PType<U> leftValueType;
+
+ /**
+ * Instantiate with the PType of the value of the left side of the join (used
+ * for creating deep copies of values).
+ *
+ * @param leftValueType
+ * The PType of the value type of the left side of the join
+ */
+ public JoinFn(PType<U> leftValueType) {
+ this.leftValueType = leftValueType;
+ }
+
/** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
public abstract String getJoinType();
@@ -18,6 +18,7 @@
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PType;
import com.google.common.collect.Lists;
/**
@@ -33,6 +34,10 @@
private transient K lastKey;
private transient List<U> leftValues;
+ public LeftOuterJoinFn(PType<U> leftValueType) {
+ super(leftValueType);
+ }
+
/** {@inheritDoc} */
@Override
public void initialize() {
@@ -58,7 +63,7 @@ public void join(K key, int id, Iterable<Pair<U, V>> pairs,
if (id == 0) {
for (Pair<U, V> pair : pairs) {
if (pair.first() != null)
- leftValues.add(pair.first());
+ leftValues.add(leftValueType.getDetachedValue(pair.first()));
}
} else {
for (Pair<U, V> pair : pairs) {
@@ -18,6 +18,7 @@
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PType;
import com.google.common.collect.Lists;
/**
@@ -28,10 +29,14 @@
* @param <V> Type of the second {@link com.cloudera.crunch.PTable}'s values
*/
public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-
+
private transient K lastKey;
private transient List<U> leftValues;
+ public RightOuterJoinFn(PType<U> leftValueType) {
+ super(leftValueType);
+ }
+
/** {@inheritDoc} */
@Override
public void initialize() {
@@ -50,7 +55,7 @@ public void join(K key, int id, Iterable<Pair<U, V>> pairs,
if (id == 0) {
for (Pair<U, V> pair : pairs) {
if (pair.first() != null)
- leftValues.add(pair.first());
+ leftValues.add(leftValueType.getDetachedValue(pair.first()));
}
} else {
for (Pair<U, V> pair : pairs) {
@@ -17,6 +17,7 @@
import static org.junit.Assert.assertTrue;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PTypeFamily;
public class FullOuterJoinTest extends JoinTester {
@Override
@@ -41,7 +42,7 @@ public void assertPassed(Iterable<Pair<String, Long>> lines) {
}
@Override
- protected JoinFn<String, Long, Long> getJoinFn() {
- return new FullOuterJoinFn<String, Long, Long>();
+ protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+ return new FullOuterJoinFn<String, Long, Long>(typeFamily.longs());
}
}
@@ -17,6 +17,7 @@
import static org.junit.Assert.assertTrue;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PTypeFamily;
public class InnerJoinTest extends JoinTester {
@Override
@@ -41,7 +42,7 @@ public void assertPassed(Iterable<Pair<String, Long>> lines) {
}
@Override
- protected JoinFn<String, Long, Long> getJoinFn() {
- return new InnerJoinFn<String, Long, Long>();
+ protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+ return new InnerJoinFn<String, Long, Long>(typeFamily.longs());
}
}
@@ -50,7 +50,7 @@ public void process(String input, Emitter<String> emitter) {
PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(), ptf.strings()));
PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(), ptf.strings()));
- PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn());
+ PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf));
PTable<String, Long> sums = join.parallelDo("cnt",
new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>() {
@@ -100,5 +100,5 @@ public void testAvroJoin() throws Exception {
/**
* @return The JoinFn to use.
*/
- protected abstract JoinFn<String, Long, Long> getJoinFn();
+ protected abstract JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily);
}
@@ -17,6 +17,7 @@
import static org.junit.Assert.assertTrue;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PTypeFamily;
public class LeftOuterJoinTest extends JoinTester {
@Override
@@ -41,7 +42,7 @@ public void assertPassed(Iterable<Pair<String, Long>> lines) {
}
@Override
- protected JoinFn<String, Long, Long> getJoinFn() {
- return new LeftOuterJoinFn<String, Long, Long>();
+ protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+ return new LeftOuterJoinFn<String, Long, Long>(typeFamily.longs());
}
}
@@ -17,6 +17,7 @@
import static org.junit.Assert.assertTrue;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PTypeFamily;
public class RightOuterJoinTest extends JoinTester {
@Override
@@ -41,7 +42,7 @@ public void assertPassed(Iterable<Pair<String, Long>> lines) {
}
@Override
- protected JoinFn<String, Long, Long> getJoinFn() {
- return new RightOuterJoinFn<String, Long, Long>();
+ protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+ return new RightOuterJoinFn<String, Long, Long>(typeFamily.longs());
}
}

0 comments on commit 3a31115

Please sign in to comment.