Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Extract setup for materialize

Pull out the setup method for doing a materialize on a PCollection
re-using it for in-memory mapside joins.
  • Loading branch information...
commit 6d701c13fde22a342508c11f0de351a689ea5752 1 parent 9367826
Gabriel Reid gabrielreid authored
163 src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
View
@@ -59,9 +59,9 @@
public class MRPipeline implements Pipeline {
private static final Log LOG = LogFactory.getLog(MRPipeline.class);
-
+
private static final Random RANDOM = new Random();
-
+
private final Class<?> jarClass;
private final String name;
private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
@@ -75,15 +75,15 @@
public MRPipeline(Class<?> jarClass) throws IOException {
this(jarClass, new Configuration());
}
-
- public MRPipeline(Class<?> jarClass, String name){
+
+ public MRPipeline(Class<?> jarClass, String name) {
this(jarClass, name, new Configuration());
}
-
+
public MRPipeline(Class<?> jarClass, Configuration conf) {
- this(jarClass, jarClass.getName(), conf);
+ this(jarClass, jarClass.getName(), conf);
}
-
+
public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
this.jarClass = jarClass;
this.name = name;
@@ -102,9 +102,9 @@ public Configuration getConfiguration() {
@Override
public void setConfiguration(Configuration conf) {
- this.conf = conf;
+ this.conf = conf;
}
-
+
@Override
public PipelineResult run() {
MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
@@ -125,8 +125,8 @@ public PipelineResult run() {
boolean materialized = false;
for (Target t : outputTargets.get(c)) {
if (!materialized && t instanceof Source) {
- c.materializeAt((SourceTarget) t);
- materialized = true;
+ c.materializeAt((SourceTarget) t);
+ materialized = true;
}
}
}
@@ -144,7 +144,7 @@ public PipelineResult done() {
cleanup();
return res;
}
-
+
public <S> PCollection<S> read(Source<S> source) {
return new InputCollection<S>(source, this);
}
@@ -160,85 +160,120 @@ public PipelineResult done() {
@SuppressWarnings("unchecked")
public void write(PCollection<?> pcollection, Target target) {
if (pcollection instanceof PGroupedTableImpl) {
- pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup();
+ pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
} else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
- pcollection = pcollection.parallelDo("UnionCollectionWrapper",
- (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());
+ pcollection = pcollection.parallelDo("UnionCollectionWrapper",
+ (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
}
addOutput((PCollectionImpl<?>) pcollection, target);
}
private void addOutput(PCollectionImpl<?> impl, Target target) {
if (!outputTargets.containsKey(impl)) {
- outputTargets.put(impl, Sets.<Target>newHashSet());
+ outputTargets.put(impl, Sets.<Target> newHashSet());
}
outputTargets.get(impl).add(target);
}
-
+
@Override
public <T> Iterable<T> materialize(PCollection<T> pcollection) {
-
- if (pcollection instanceof UnionCollection) {
- pcollection = pcollection.parallelDo("UnionCollectionWrapper",
- (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());
- }
- PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection;
+
+ PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
+ ReadableSourceTarget<T> srcTarget = getMaterializeSourceTarget(pcollectionImpl);
+
+ MaterializableIterable<T> c = new MaterializableIterable<T>(this, srcTarget);
+ if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
+ outputTargetsToMaterialize.put(pcollectionImpl, c);
+ }
+ return c;
+ }
+
+ /**
+ * Retrieve a ReadableSourceTarget that provides access to the contents of a
+ * {@link PCollection}. This is primarily intended as a helper method to
+ * {@link #materialize(PCollection)}. The underlying data of the
+ * ReadableSourceTarget may not be actually present until the pipeline is run.
+ *
+ * @param pcollection
+ * The collection for which the ReadableSourceTarget is to be
+ * retrieved
+ * @return The ReadableSourceTarget
+ * @throws IllegalArgumentException
+ * If no ReadableSourceTarget can be retrieved for the given
+ * PCollection
+ */
+ public <T> ReadableSourceTarget<T> getMaterializeSourceTarget(PCollection<T> pcollection) {
+ PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
SourceTarget<T> matTarget = impl.getMaterializedAt();
if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
- return new MaterializableIterable<T>(this, (ReadableSourceTarget<T>) matTarget);
+ return (ReadableSourceTarget<T>) matTarget;
+ }
+
+ ReadableSourceTarget<T> srcTarget = null;
+ if (outputTargets.containsKey(pcollection)) {
+ for (Target target : outputTargets.get(impl)) {
+ if (target instanceof ReadableSourceTarget) {
+ srcTarget = (ReadableSourceTarget<T>) target;
+ break;
+ }
+ }
}
-
- ReadableSourceTarget<T> srcTarget = null;
- if (outputTargets.containsKey(pcollection)) {
- for (Target target : outputTargets.get(impl)) {
- if (target instanceof ReadableSourceTarget) {
- srcTarget = (ReadableSourceTarget) target;
- break;
- }
- }
- }
-
- if (srcTarget == null) {
- SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
- if (!(st instanceof ReadableSourceTarget)) {
- throw new IllegalArgumentException("The PType for the given PCollection is not readable"
- + " and cannot be materialized");
- } else {
- srcTarget = (ReadableSourceTarget) st;
- addOutput(impl, srcTarget);
- }
- }
-
- MaterializableIterable<T> c = new MaterializableIterable<T>(this, srcTarget);
- outputTargetsToMaterialize.put(impl, c);
- return c;
+
+ if (srcTarget == null) {
+ SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
+ if (!(st instanceof ReadableSourceTarget)) {
+ throw new IllegalArgumentException("The PType for the given PCollection is not readable"
+ + " and cannot be materialized");
+ } else {
+ srcTarget = (ReadableSourceTarget<T>) st;
+ addOutput(impl, srcTarget);
+ }
+ }
+
+ return srcTarget;
+ }
+
+ /**
+ * Safely cast a PCollection into a PCollectionImpl, including handling the case of UnionCollections.
+ * @param pcollection The PCollection to be cast/transformed
+ * @return The PCollectionImpl representation
+ */
+ private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection) {
+ PCollectionImpl<T> pcollectionImpl = null;
+ if (pcollection instanceof UnionCollection) {
+ pcollectionImpl = (PCollectionImpl<T>) pcollection.parallelDo("UnionCollectionWrapper",
+ (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
+ } else {
+ pcollectionImpl = (PCollectionImpl<T>) pcollection;
+ }
+ return pcollectionImpl;
}
public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) {
- return ptype.getDefaultFileSource(createTempPath());
+ return ptype.getDefaultFileSource(createTempPath());
}
public Path createTempPath() {
tempFileIndex++;
return new Path(tempDirectory, "p" + tempFileIndex);
}
-
+
private static Path createTempDirectory(Configuration conf) {
Path dir = new Path("/tmp/crunch" + RANDOM.nextInt());
- try {
- FileSystem.get(conf).mkdirs(dir);
- } catch (IOException e) {
- LOG.error("Exception creating job output directory", e);
- throw new RuntimeException(e);
- }
+ try {
+ FileSystem.get(conf).mkdirs(dir);
+ } catch (IOException e) {
+ LOG.error("Exception creating job output directory", e);
+ throw new RuntimeException(e);
+ }
return dir;
}
-
+
@Override
public <T> void writeTextFile(PCollection<T> pcollection, String pathName) {
// Ensure that this is a writable pcollection instance.
- pcollection = pcollection.parallelDo("asText", IdentityFn.<T>getInstance(),
- WritableTypeFamily.getInstance().as(pcollection.getPType()));
+ pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(), WritableTypeFamily
+ .getInstance().as(pcollection.getPType()));
write(pcollection, At.textFile(pathName));
}
@@ -256,7 +291,7 @@ private void cleanup() {
LOG.info("Exception during cleanup", e);
}
}
-
+
public int getNextAnonymousStageId() {
return nextAnonymousStageId++;
}
@@ -265,7 +300,7 @@ public int getNextAnonymousStageId() {
public void enableDebug() {
// Turn on Crunch runtime error catching.
getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
-
+
// Write Hadoop's WARN logs to the console.
Logger crunchInfoLogger = LogManager.getLogger("com.cloudera.crunch");
Appender console = crunchInfoLogger.getAppender("A");
@@ -277,9 +312,9 @@ public void enableDebug() {
LOG.warn("Could not find console appender named 'A' for writing Hadoop warning logs");
}
}
-
+
@Override
public String getName() {
- return name;
+ return name;
}
}
60 src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
View
@@ -0,0 +1,60 @@
+package com.cloudera.crunch.impl.mr;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.cloudera.crunch.SourceTarget;
+import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
+import com.cloudera.crunch.io.ReadableSourceTarget;
+import com.cloudera.crunch.types.avro.Avros;
+
+public class MRPipelineTest {
+
+ private MRPipeline pipeline;
+
+ @Before
+ public void setUp() throws IOException {
+ pipeline = spy(new MRPipeline(MRPipelineTest.class));
+ }
+
+ @Test
+ public void testGetMaterializeSourceTarget_AlreadyMaterialized() {
+ PCollectionImpl<String> materializedPcollection = mock(PCollectionImpl.class);
+ ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class);
+ when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget);
+
+ assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(materializedPcollection));
+ }
+
+ @Test
+ public void testGetMaterializeSourceTarget_NotMaterialized_HasOutput() {
+
+ PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
+ ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class);
+ when(pcollection.getPType()).thenReturn(Avros.strings());
+ doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
+ when(pcollection.getMaterializedAt()).thenReturn(null);
+
+ assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(pcollection));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() {
+ PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
+ SourceTarget<String> nonReadableSourceTarget = mock(SourceTarget.class);
+ when(pcollection.getPType()).thenReturn(Avros.strings());
+ doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
+ when(pcollection.getMaterializedAt()).thenReturn(null);
+
+ pipeline.getMaterializeSourceTarget(pcollection);
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.