Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Fix materialize() on empty PCollection #30

Merged
merged 1 commit into from

2 participants

@gabrielreid

Fix a bug whereby empty non-input PCollections throw an IOException. New
behavior is to log a warning that an empty collection is being
materialized and then materialize without errors.

@gabrielreid gabrielreid Fix materialize() on empty PCollection
Fix a bug whereby empty non-input PCollections throw an IOException. New
behavior is to log a warning that an empty collection is being
materialized and then materialize without errors.
6bf055f
@jwills jwills merged commit 6fce6c0 into cloudera:master
@tzolov tzolov referenced this pull request from a commit in tzolov/crunch
@tzolov tzolov merge with the pull #30 bface96
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 24, 2012
  1. @gabrielreid

    Fix materialize() on empty PCollection

    gabrielreid authored
    Fix a bug whereby empty non-input PCollections throw an IOException. New
    behavior is to log a warning that an empty collection is being
    materialized and then materialize without errors.
This page is out of date. Refresh to see the latest.
View
367 src/main/java/com/cloudera/crunch/impl/mr/collect/PCollectionImpl.java
@@ -14,8 +14,12 @@
*/
package com.cloudera.crunch.impl.mr.collect;
+import java.util.Collections;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.FilterFn;
import com.cloudera.crunch.MapFn;
@@ -38,184 +42,187 @@
public abstract class PCollectionImpl<S> implements PCollection<S> {
- private final String name;
- protected MRPipeline pipeline;
- private SourceTarget<S> materializedAt;
-
- public PCollectionImpl(String name) {
- this.name = name;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public String toString() {
- return getName();
- }
-
- @Override
- public PCollection<S> union(PCollection<S>... collections) {
- List<PCollectionImpl<S>> internal = Lists.newArrayList();
- internal.add(this);
- for (PCollection<S> collection : collections) {
- internal.add((PCollectionImpl<S>) collection);
- }
- return new UnionCollection<S>(internal);
- }
-
- @Override
- public <T> PCollection<T> parallelDo(DoFn<S, T> fn, PType<T> type) {
- MRPipeline pipeline = (MRPipeline) getPipeline();
- return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
- }
-
- @Override
- public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn,
- PType<T> type) {
- return new DoCollectionImpl<T>(name, this, fn, type);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn,
- PTableType<K, V> type) {
- MRPipeline pipeline = (MRPipeline) getPipeline();
- return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn,
- PTableType<K, V> type) {
- return new DoTableImpl<K, V>(name, this, fn, type);
- }
-
- @Override
- public PCollection<S> write(Target target) {
- getPipeline().write(this, target);
- return this;
- }
-
- @Override
- public Iterable<S> materialize() {
- return getPipeline().materialize(this);
- }
-
- public SourceTarget<S> getMaterializedAt() {
- return materializedAt;
- }
-
- public void materializeAt(SourceTarget<S> sourceTarget) {
- this.materializedAt = sourceTarget;
- }
-
- @Override
- public PCollection<S> filter(FilterFn<S> filterFn) {
- return parallelDo(filterFn, getPType());
- }
-
- @Override
- public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
- return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
- }
-
- @Override
- public PCollection<S> sort(boolean ascending) {
- return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING);
- }
-
- @Override
- public PTable<S, Long> count() {
- return Aggregate.count(this);
- }
-
- @Override
- public PCollection<S> max() {
- return Aggregate.max(this);
- }
-
- @Override
- public PCollection<S> min() {
- return Aggregate.min(this);
- }
-
- @Override
- public PCollection<S> sample(double acceptanceProbability) {
- return Sample.sample(this, acceptanceProbability);
- }
-
- @Override
- public PCollection<S> sample(double acceptanceProbability, long seed) {
- return Sample.sample(this, seed, acceptanceProbability);
- }
-
- @Override
- public PTypeFamily getTypeFamily() {
- return getPType().getFamily();
- }
-
- public abstract DoNode createDoNode();
-
- public abstract List<PCollectionImpl<?>> getParents();
-
- public PCollectionImpl<?> getOnlyParent() {
- List<PCollectionImpl<?>> parents = getParents();
- if (parents.size() != 1) {
- throw new IllegalArgumentException("Expected exactly one parent PCollection");
- }
- return parents.get(0);
- }
-
- @Override
- public Pipeline getPipeline() {
- if (pipeline == null) {
- pipeline = (MRPipeline) getParents().get(0).getPipeline();
- }
- return pipeline;
- }
-
- public int getDepth() {
- int parentMax = 0;
- for (PCollectionImpl parent : getParents()) {
- parentMax = Math.max(parent.getDepth(), parentMax);
- }
- return 1 + parentMax;
- }
-
- public interface Visitor {
- void visitInputCollection(InputCollection<?> collection);
-
- void visitUnionCollection(UnionCollection<?> collection);
-
- void visitDoFnCollection(DoCollectionImpl<?> collection);
-
- void visitDoTable(DoTableImpl<?, ?> collection);
-
- void visitGroupedTable(PGroupedTableImpl<?, ?> collection);
- }
-
- public void accept(Visitor visitor) {
- if (materializedAt != null) {
- visitor.visitInputCollection(new InputCollection<S>(materializedAt,
- (MRPipeline) getPipeline()));
- } else {
- acceptInternal(visitor);
- }
- }
-
- protected abstract void acceptInternal(Visitor visitor);
-
- @Override
- public long getSize() {
- if (materializedAt != null) {
- long sz = materializedAt.getSize(getPipeline().getConfiguration());
- if (sz > 0) {
- return sz;
- }
- }
- return getSizeInternal();
- }
-
- protected abstract long getSizeInternal();
+ private static final Log LOG = LogFactory.getLog(PCollectionImpl.class);
+
+ private final String name;
+ protected MRPipeline pipeline;
+ private SourceTarget<S> materializedAt;
+
+ public PCollectionImpl(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ @Override
+ public PCollection<S> union(PCollection<S>... collections) {
+ List<PCollectionImpl<S>> internal = Lists.newArrayList();
+ internal.add(this);
+ for (PCollection<S> collection : collections) {
+ internal.add((PCollectionImpl<S>) collection);
+ }
+ return new UnionCollection<S>(internal);
+ }
+
+ @Override
+ public <T> PCollection<T> parallelDo(DoFn<S, T> fn, PType<T> type) {
+ MRPipeline pipeline = (MRPipeline) getPipeline();
+ return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
+ }
+
+ @Override
+ public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
+ return new DoCollectionImpl<T>(name, this, fn, type);
+ }
+
+ @Override
+ public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
+ MRPipeline pipeline = (MRPipeline) getPipeline();
+ return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
+ }
+
+ @Override
+ public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
+ return new DoTableImpl<K, V>(name, this, fn, type);
+ }
+
+ @Override
+ public PCollection<S> write(Target target) {
+ getPipeline().write(this, target);
+ return this;
+ }
+
+ @Override
+ public Iterable<S> materialize() {
+ if (getSize() == 0) {
+ LOG.warn("Materializing an empty PCollection: " + this.getName());
+ return Collections.emptyList();
+ }
+ return getPipeline().materialize(this);
+ }
+
+ public SourceTarget<S> getMaterializedAt() {
+ return materializedAt;
+ }
+
+ public void materializeAt(SourceTarget<S> sourceTarget) {
+ this.materializedAt = sourceTarget;
+ }
+
+ @Override
+ public PCollection<S> filter(FilterFn<S> filterFn) {
+ return parallelDo(filterFn, getPType());
+ }
+
+ @Override
+ public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
+ return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
+ }
+
+ @Override
+ public PCollection<S> sort(boolean ascending) {
+ return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING);
+ }
+
+ @Override
+ public PTable<S, Long> count() {
+ return Aggregate.count(this);
+ }
+
+ @Override
+ public PCollection<S> max() {
+ return Aggregate.max(this);
+ }
+
+ @Override
+ public PCollection<S> min() {
+ return Aggregate.min(this);
+ }
+
+ @Override
+ public PCollection<S> sample(double acceptanceProbability) {
+ return Sample.sample(this, acceptanceProbability);
+ }
+
+ @Override
+ public PCollection<S> sample(double acceptanceProbability, long seed) {
+ return Sample.sample(this, seed, acceptanceProbability);
+ }
+
+ @Override
+ public PTypeFamily getTypeFamily() {
+ return getPType().getFamily();
+ }
+
+ public abstract DoNode createDoNode();
+
+ public abstract List<PCollectionImpl<?>> getParents();
+
+ public PCollectionImpl<?> getOnlyParent() {
+ List<PCollectionImpl<?>> parents = getParents();
+ if (parents.size() != 1) {
+ throw new IllegalArgumentException("Expected exactly one parent PCollection");
+ }
+ return parents.get(0);
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ if (pipeline == null) {
+ pipeline = (MRPipeline) getParents().get(0).getPipeline();
+ }
+ return pipeline;
+ }
+
+ public int getDepth() {
+ int parentMax = 0;
+ for (PCollectionImpl parent : getParents()) {
+ parentMax = Math.max(parent.getDepth(), parentMax);
+ }
+ return 1 + parentMax;
+ }
+
+ public interface Visitor {
+ void visitInputCollection(InputCollection<?> collection);
+
+ void visitUnionCollection(UnionCollection<?> collection);
+
+ void visitDoFnCollection(DoCollectionImpl<?> collection);
+
+ void visitDoTable(DoTableImpl<?, ?> collection);
+
+ void visitGroupedTable(PGroupedTableImpl<?, ?> collection);
+ }
+
+ public void accept(Visitor visitor) {
+ if (materializedAt != null) {
+ visitor.visitInputCollection(new InputCollection<S>(materializedAt,
+ (MRPipeline) getPipeline()));
+ } else {
+ acceptInternal(visitor);
+ }
+ }
+
+ protected abstract void acceptInternal(Visitor visitor);
+
+ @Override
+ public long getSize() {
+ if (materializedAt != null) {
+ long sz = materializedAt.getSize(getPipeline().getConfiguration());
+ if (sz > 0) {
+ return sz;
+ }
+ }
+ return getSizeInternal();
+ }
+
+ protected abstract long getSizeInternal();
}
View
90 src/test/java/com/cloudera/crunch/MaterializeTest.java
@@ -0,0 +1,90 @@
+package com.cloudera.crunch;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.impl.mem.MemPipeline;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.test.FileHelper;
+import com.cloudera.crunch.types.PTypeFamily;
+import com.cloudera.crunch.types.avro.AvroTypeFamily;
+import com.cloudera.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.Lists;
+
+public class MaterializeTest {
+
+ /** Filter that rejects everything. */
+ @SuppressWarnings("serial")
+ private static class FalseFilterFn extends FilterFn<String> {
+
+ @Override
+ public boolean accept(final String input) {
+ return false;
+ }
+ }
+
+ @Test
+ public void testMaterializeInput_Writables() throws IOException {
+ runMaterializeInput(new MRPipeline(MaterializeTest.class), WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testMaterializeInput_Avro() throws IOException {
+ runMaterializeInput(new MRPipeline(MaterializeTest.class), AvroTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testMaterializeInput_InMemoryWritables() throws IOException {
+ runMaterializeInput(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testMaterializeInput_InMemoryAvro() throws IOException {
+ runMaterializeInput(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testMaterializeEmptyIntermediate_Writables() throws IOException {
+ runMaterializeEmptyIntermediate(new MRPipeline(MaterializeTest.class),
+ WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testMaterializeEmptyIntermediate_Avro() throws IOException {
+ runMaterializeEmptyIntermediate(new MRPipeline(MaterializeTest.class),
+ AvroTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
+ runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
+ runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+ }
+
+ public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+ List<String> expectedContent = Lists.newArrayList("b", "c", "a", "e");
+ String inputPath = FileHelper.createTempCopyOf("set1.txt");
+
+ PCollection<String> lines = pipeline.readTextFile(inputPath);
+ assertEquals(expectedContent, Lists.newArrayList(lines.materialize()));
+ pipeline.done();
+ }
+
+ public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily)
+ throws IOException {
+ String inputPath = FileHelper.createTempCopyOf("set1.txt");
+ PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FalseFilterFn());
+
+ assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
+ pipeline.done();
+ }
+}
Something went wrong with that request. Please try again.