Permalink
Browse files

Have MemPipeline write outputs to directories to be consistent with M…

…RPipeline
  • Loading branch information...
1 parent e74fc23 commit 4b45e134fc28d3d2a83bb1db7c00487f724d54dc @jwills jwills committed Jun 21, 2012
@@ -21,6 +21,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -145,7 +146,7 @@ public void write(PCollection<?> collection, Target target) {
Path path = ((PathTarget) target).getPath();
try {
FileSystem fs = FileSystem.get(conf);
- FSDataOutputStream os = fs.create(path);
+ FSDataOutputStream os = fs.create(new Path(path, "out"));
if (collection instanceof PTable) {
for (Object o : collection.materialize()) {
Pair p = (Pair) o;
@@ -0,0 +1,50 @@
+/**
+ * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch.impl.mem;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.Pipeline;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import org.junit.Test;
+
+public class MemPipelineFileWritingTest {
+ @Test
+ public void testMemPipelineFileWriter() throws Exception {
+ File tmpDir = Files.createTempDir();
+ tmpDir.delete();
+ Pipeline p = MemPipeline.getInstance();
+ PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
+ p.writeTextFile(lines, tmpDir.getAbsolutePath());
+ p.done();
+ assertTrue(tmpDir.exists());
+ File[] files = tmpDir.listFiles();
+ assertTrue(files != null && files.length > 0);
+ for (File f : files) {
+ if (!f.getName().startsWith(".")) {
+ List<String> txt = Files.readLines(f, Charsets.UTF_8);
+ assertEquals(ImmutableList.of("hello", "world"), txt);
+ }
+ }
+ }
+}

0 comments on commit 4b45e13

Please sign in to comment.