Skip to content
Browse files

Use the distributed cache for map side joins

  • Loading branch information...
1 parent b13bd4f commit f70a6df8b2fe48efa14b432fddf731c3e8d94d86 @gabrielreid gabrielreid committed
View
3 src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
@@ -45,7 +45,8 @@ public String toString() {
@Override
public Iterable<T> read(Configuration conf) throws IOException {
- return CompositePathIterable.create(FileSystem.get(conf), path, new AvroFileReaderFactory<T>(
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>(
(AvroType<T>) ptype, conf));
}
}
View
12 src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
@@ -26,18 +26,16 @@
import com.cloudera.crunch.io.impl.FileSourceImpl;
import com.cloudera.crunch.types.PType;
-public class SeqFileSource<T> extends FileSourceImpl<T> implements
- ReadableSource<T> {
+public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
public SeqFileSource(Path path, PType<T> ptype) {
- super(path, ptype, SequenceFileInputFormat.class);
+ super(path, ptype, SequenceFileInputFormat.class);
}
-
+
@Override
public Iterable<T> read(Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- return CompositePathIterable.create(fs, path,
- new SeqFileReaderFactory<T>(ptype, conf));
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype, conf));
}
@Override
View
2 src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
@@ -42,7 +42,7 @@ public SeqFileTableSource(Path path, PTableType<K, V> ptype) {
@Override
public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
return CompositePathIterable.create(fs, path,
new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf));
}
View
4 src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
@@ -67,7 +67,7 @@ public String toString() {
@Override
public Iterable<T> read(Configuration conf) throws IOException {
- return CompositePathIterable.create(FileSystem.get(conf), path,
- new TextFileReaderFactory<T>(ptype, conf));
+ return CompositePathIterable.create(FileSystem.get(path.toUri(), conf), path,
+ new TextFileReaderFactory<T>(ptype, conf));
}
}
View
45 src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
@@ -2,6 +2,8 @@
import java.io.IOException;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.cloudera.crunch.DoFn;
@@ -53,21 +55,25 @@
MRPipeline pipeline = (MRPipeline) right.getPipeline();
pipeline.materialize(right);
- // TODO Make this method internal to MRPipeline so that we don't run once
- // for every separate MapsideJoin at the same level
+ // TODO Move necessary logic to MRPipeline so that we can theoretically
+ // optimize his by running the setup of multiple map-side joins concurrently
pipeline.run();
- // TODO Verify that this cast is safe -- are there any situations where this
- // wouldn't work?
- SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K, V>>) pipeline
+ ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline
.getMaterializeSourceTarget(right);
+ if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
+ throw new CrunchRuntimeException("Right-side contents can't be read from a path");
+ }
- // TODO Put the data in the distributed cache
+ // Suppress warnings because we've just checked this cast via instanceof
+ @SuppressWarnings("unchecked")
+ SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget;
Path path = sourcePathTarget.getPath();
- PType<Pair<K, V>> pType = right.getPType();
+ DistributedCache.addCacheFile(path.toUri(), pipeline.getConfiguration());
- MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(), pType);
+ MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(),
+ right.getPType());
PTypeFamily typeFamily = left.getTypeFamily();
return left.parallelDo(
"mapjoin",
@@ -79,21 +85,36 @@
static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
- private String path;
+ private String inputPath;
private PType<Pair<K, V>> ptype;
private Multimap<K, V> joinMap;
- public MapsideJoinDoFn(String path, PType<Pair<K, V>> ptype) {
- this.path = path;
+ public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
+ this.inputPath = inputPath;
this.ptype = ptype;
}
+ private Path getCacheFilePath() {
+ try {
+ for (Path localPath : DistributedCache.getLocalCacheFiles(getConfiguration())) {
+ if (localPath.toString().endsWith(inputPath)) {
+ return localPath.makeQualified(FileSystem.getLocal(getConfiguration()));
+
+ }
+ }
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+
+ throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
+ }
+
@Override
public void initialize() {
super.initialize();
ReadableSourceTarget<Pair<K, V>> sourceTarget = (ReadableSourceTarget<Pair<K, V>>) ptype
- .getDefaultFileSource(new Path(path));
+ .getDefaultFileSource(getCacheFilePath());
Iterable<Pair<K, V>> iterable = null;
try {
iterable = sourceTarget.read(getConfiguration());

0 comments on commit f70a6df

Please sign in to comment.
Something went wrong with that request. Please try again.