diff --git a/CHANGELOG b/CHANGELOG index 96e9e17ae..17a007186 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-20. Move temporary test files from the project directory. (ssc) + GIRAPH-37. Implement Netty-backed IPC. (aching) GIRAPH-184. Upgrade to junit4. (Devaraj K via jghoman) diff --git a/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java b/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java index 855557a9a..4c3778187 100644 --- a/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java +++ b/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java @@ -135,12 +135,9 @@ public void postApplication() { @Override public void preSuperstep() { - LongSumAggregator sumAggreg = - (LongSumAggregator) getAggregator("sum"); - MinAggregator minAggreg = - (MinAggregator) getAggregator("min"); - MaxAggregator maxAggreg = - (MaxAggregator) getAggregator("max"); + LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum"); + MinAggregator minAggreg = (MinAggregator) getAggregator("min"); + MaxAggregator maxAggreg = (MaxAggregator) getAggregator("max"); if (getSuperstep() >= 3) { LOG.info("aggregatedNumVertices=" + diff --git a/src/main/java/org/apache/giraph/graph/GraphMapper.java b/src/main/java/org/apache/giraph/graph/GraphMapper.java index 5e878f91b..e6e9d4d71 100644 --- a/src/main/java/org/apache/giraph/graph/GraphMapper.java +++ b/src/main/java/org/apache/giraph/graph/GraphMapper.java @@ -130,8 +130,7 @@ class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler { public void uncaughtException(Thread t, Throwable e) { LOG.fatal( "uncaughtException: OverrideExceptionHandler on thread " + - t.getName() + ", msg = " + e.getMessage() + - ", exiting...", e); + t.getName() + ", msg = " + e.getMessage() + ", exiting...", e); System.exit(1); } } diff --git a/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java b/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java index 8012e5731..4727cb6df 100644 --- a/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java +++ b/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Map.Entry; +import com.google.common.base.Charsets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -42,6 +43,8 @@ public class TextAggregatorWriter implements AggregatorWriter { public static final int NEVER = 0; /** Signal for "write only the final values" frequency */ public static final int AT_THE_END = -1; + /** Signal for "write values in every superstep" frequency */ + public static final int ALWAYS = -1; /** The frequency of writing: * - NEVER: never write, files aren't created at all * - AT_THE_END: aggregators are written only when the computation is over @@ -78,11 +81,10 @@ public final void writeAggregator( Map> aggregators, long superstep) throws IOException { if (shouldWrite(superstep)) { - for (Entry> a: - aggregators.entrySet()) { - output.writeUTF(aggregatorToString(a.getKey(), - a.getValue(), - superstep)); + for (Entry> a: aggregators.entrySet()) { + byte[] bytes = aggregatorToString(a.getKey(), a.getValue(), superstep) + .getBytes(Charsets.UTF_8); + output.write(bytes, 0, bytes.length); } output.flush(); } diff --git a/src/main/java/org/apache/giraph/utils/FileUtils.java b/src/main/java/org/apache/giraph/utils/FileUtils.java new file mode 100644 index 000000000..1742f1e7f --- /dev/null +++ b/src/main/java/org/apache/giraph/utils/FileUtils.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import com.google.common.base.Charsets; +import com.google.common.io.Closeables; +import com.google.common.io.Files; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.io.Writer; + +/** + * Helper class for filesystem operations during testing + */ +public class FileUtils { + + /** + * Utility class should not be instantiatable + */ + private FileUtils() { + } + + /** + * Create a temporary folder that will be removed after the test. + * + * @param vertexClass Used for generating the folder name. + * @return File object for the directory. + */ + public static File createTestDir(Class vertexClass) + throws IOException { + String systemTmpDir = System.getProperty("java.io.tmpdir"); + long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random()); + File testTempDir = new File(systemTmpDir, "giraph-" + + vertexClass.getSimpleName() + '-' + simpleRandomLong); + if (!testTempDir.mkdir()) { + throw new IOException("Could not create " + testTempDir); + } + testTempDir.deleteOnExit(); + return testTempDir; + } + + /** + * Make a temporary file. + * + * @param parent Parent directory. + * @param name File name. + * @return File object to temporary file. + * @throws IOException + */ + public static File createTempFile(File parent, String name) + throws IOException { + return createTestTempFileOrDir(parent, name, false); + } + + /** + * Make a temporary directory. + * + * @param parent Parent directory. + * @param name Directory name. + * @return File object to temporary file. + * @throws IOException + */ + public static File createTempDir(File parent, String name) + throws IOException { + File dir = createTestTempFileOrDir(parent, name, true); + dir.delete(); + return dir; + } + + /** + * Create a test temp file or directory. + * + * @param parent Parent directory + * @param name Name of file + * @param dir Is directory? + * @return File object + * @throws IOException + */ + public static File createTestTempFileOrDir(File parent, String name, + boolean dir) throws IOException { + File f = new File(parent, name); + f.deleteOnExit(); + if (dir && !f.mkdirs()) { + throw new IOException("Could not make directory " + f); + } + return f; + } + + /** + * Write lines to a file. + * + * @param file File to write lines to + * @param lines Strings written to the file + * @throws IOException + */ + public static void writeLines(File file, String... lines) + throws IOException { + Writer writer = Files.newWriter(file, Charsets.UTF_8); + try { + for (String line : lines) { + writer.write(line); + writer.write('\n'); + } + } finally { + Closeables.closeQuietly(writer); + } + } + + /** + * Recursively delete a directory + * + * @param dir Directory to delete + */ + public static void delete(File dir) { + if (dir != null) { + new DeletingVisitor().accept(dir); + } + } + + /** + * Deletes files. + */ + private static class DeletingVisitor implements FileFilter { + @Override + public boolean accept(File f) { + if (!f.isFile()) { + f.listFiles(this); + } + f.delete(); + return false; + } + } + + /** + * Helper method to remove a path if it exists. + * + * @param conf Configuration to load FileSystem from + * @param path Path to remove + * @throws IOException + */ + public static void deletePath(Configuration conf, String path) + throws IOException { + deletePath(conf, new Path(path)); + } + + /** + * Helper method to remove a path if it exists. + * + * @param conf Configuration to load FileSystem from + * @param path Path to remove + * @throws IOException + */ + public static void deletePath(Configuration conf, Path path) + throws IOException { + FileSystem fs = FileSystem.get(conf); + fs.delete(path, true); + } +} diff --git a/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java index 6c4266a7c..ba1611995 100644 --- a/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java +++ b/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java @@ -19,7 +19,6 @@ package org.apache.giraph.utils; import com.google.common.base.Charsets; -import com.google.common.io.Closeables; import com.google.common.io.Files; import org.apache.giraph.graph.GiraphJob; import org.apache.hadoop.conf.Configuration; @@ -31,9 +30,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import java.io.File; -import java.io.FileFilter; import java.io.IOException; -import java.io.Writer; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -99,16 +96,18 @@ public static Iterable run(Class vertexClass, File tmpDir = null; try { - // prepare input file, output folder and zookeeper folder - tmpDir = createTestDir(vertexClass); - File inputFile = createTempFile(tmpDir, "graph.txt"); - File outputDir = createTempDir(tmpDir, "output"); - File zkDir = createTempDir(tmpDir, "zooKeeper"); - - // write input data to disk - writeLines(inputFile, data); - - // create and configure the job to run the vertex + // Prepare input file, output folder and temporary folders + tmpDir = FileUtils.createTestDir(vertexClass); + File inputFile = FileUtils.createTempFile(tmpDir, "graph.txt"); + File outputDir = FileUtils.createTempDir(tmpDir, "output"); + File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper"); + File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir"); + File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints"); + + // Write input data to disk + FileUtils.writeLines(inputFile, data); + + // Create and configure the job to run the vertex GiraphJob job = new GiraphJob(vertexClass.getName()); job.setVertexClass(vertexClass); job.setVertexInputFormatClass(vertexInputFormatClass); @@ -125,6 +124,11 @@ public static Iterable run(Class vertexClass, conf.set(GiraphJob.ZOOKEEPER_LIST, "localhost:" + String.valueOf(LOCAL_ZOOKEEPER_PORT)); + conf.set(GiraphJob.ZOOKEEPER_DIR, zkDir.toString()); + conf.set(GiraphJob.ZOOKEEPER_MANAGER_DIRECTORY, + zkMgrDir.toString()); + conf.set(GiraphJob.CHECKPOINT_DIRECTORY, checkpointsDir.toString()); + for (Map.Entry param : params.entrySet()) { conf.set(param.getKey(), param.getValue()); } @@ -134,7 +138,7 @@ public static Iterable run(Class vertexClass, FileOutputFormat.setOutputPath(job.getInternalJob(), new Path(outputDir.toString())); - // configure a local zookeeper instance + // Configure a local zookeeper instance Properties zkProperties = new Properties(); zkProperties.setProperty("tickTime", "2000"); zkProperties.setProperty("dataDir", zkDir.getAbsolutePath()); @@ -150,7 +154,7 @@ public static Iterable run(Class vertexClass, QuorumPeerConfig qpConfig = new QuorumPeerConfig(); qpConfig.parseProperties(zkProperties); - // create and run the zookeeper instance + // Create and run the zookeeper instance final InternalZooKeeper zookeeper = new InternalZooKeeper(); final ServerConfig zkConfig = new ServerConfig(); zkConfig.readFrom(qpConfig); @@ -176,111 +180,11 @@ public void run() { return Files.readLines(new File(outputDir, "part-m-00000"), Charsets.UTF_8); } finally { - if (tmpDir != null) { - new DeletingVisitor().accept(tmpDir); - } - } - } - - /** - * Create a temporary folder that will be removed after the test. - * - * @param vertexClass Used for generating the folder name. - * @return File object for the directory. - */ - private static File createTestDir(Class vertexClass) - throws IOException { - String systemTmpDir = System.getProperty("java.io.tmpdir"); - long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random()); - File testTempDir = new File(systemTmpDir, "giraph-" + - vertexClass.getSimpleName() + '-' + simpleRandomLong); - if (!testTempDir.mkdir()) { - throw new IOException("Could not create " + testTempDir); + FileUtils.delete(tmpDir); } - testTempDir.deleteOnExit(); - return testTempDir; } - /** - * Make a temporary file. - * - * @param parent Parent directory. - * @param name File name. - * @return File object to temporary file. - * @throws IOException - */ - private static File createTempFile(File parent, String name) - throws IOException { - return createTestTempFileOrDir(parent, name, false); - } - - /** - * Make a temporary directory. - * - * @param parent Parent directory. - * @param name Directory name. - * @return File object to temporary file. - * @throws IOException - */ - private static File createTempDir(File parent, String name) - throws IOException { - File dir = createTestTempFileOrDir(parent, name, true); - dir.delete(); - return dir; - } - /** - * Creae a test temp file or directory. - * - * @param parent Parent directory - * @param name Name of file - * @param dir Is directory? - * @return File object - * @throws IOException - */ - private static File createTestTempFileOrDir(File parent, String name, - boolean dir) throws IOException { - File f = new File(parent, name); - f.deleteOnExit(); - if (dir && !f.mkdirs()) { - throw new IOException("Could not make directory " + f); - } - return f; - } - - /** - * Write lines to a file. - * - * @param file File to write lines to - * @param lines Strings written to the file - * @throws IOException - */ - private static void writeLines(File file, String... lines) - throws IOException { - Writer writer = Files.newWriter(file, Charsets.UTF_8); - try { - for (String line : lines) { - writer.write(line); - writer.write('\n'); - } - } finally { - Closeables.closeQuietly(writer); - } - } - - /** - * Deletes files. - */ - private static class DeletingVisitor implements FileFilter { - @Override - public boolean accept(File f) { - if (!f.isFile()) { - f.listFiles(this); - } - f.delete(); - return false; - } - } /** * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown diff --git a/src/test/java/org/apache/giraph/BspCase.java b/src/test/java/org/apache/giraph/BspCase.java index 5ed2ecd80..5e451a317 100644 --- a/src/test/java/org/apache/giraph/BspCase.java +++ b/src/test/java/org/apache/giraph/BspCase.java @@ -18,20 +18,24 @@ package org.apache.giraph; -import java.io.FileNotFoundException; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.util.List; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.io.Closeables; import org.apache.giraph.examples.GeneratedVertexReader; import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.utils.FileUtils; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.junit.After; import org.junit.Before; /** @@ -50,39 +54,142 @@ public class BspCase implements Watcher { private final String zkList = System.getProperty("prop.zookeeper.list"); private String testName; + /** Default path for temporary files */ + static final Path DEFAULT_TEMP_DIR = + new Path(System.getProperty("java.io.tmpdir"), "_giraphTests"); + + /** A filter for listing parts files */ + static final PathFilter PARTS_FILTER = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith("part-"); + } + }; + /** * Adjust the configuration to the basic test case */ - public final void setupConfiguration(GiraphJob job) { + public final Configuration setupConfiguration(GiraphJob job) + throws IOException { Configuration conf = job.getConfiguration(); conf.set("mapred.jar", getJarLocation()); // Allow this test to be run on a real Hadoop setup - if (getJobTracker() != null) { + if (runningInDistributedMode()) { System.out.println("setup: Sending job to job tracker " + - getJobTracker() + " with jar path " + getJarLocation() + jobTracker + " with jar path " + getJarLocation() + " for " + getName()); - conf.set("mapred.job.tracker", getJobTracker()); - job.setWorkerConfiguration(getNumWorkers(), - getNumWorkers(), - 100.0f); + conf.set("mapred.job.tracker", jobTracker); + job.setWorkerConfiguration(getNumWorkers(), getNumWorkers(), 100.0f); } else { System.out.println("setup: Using local job runner with " + - "location " + getJarLocation() + " for " - + getName()); + "location " + getJarLocation() + " for " + getName()); job.setWorkerConfiguration(1, 1, 100.0f); // Single node testing conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false); } conf.setInt(GiraphJob.POLL_ATTEMPTS, 10); - conf.setInt(GiraphJob.POLL_MSECS, 3*1000); + conf.setInt(GiraphJob.POLL_MSECS, 3 * 1000); conf.setInt(GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500); if (getZooKeeperList() != null) { job.setZooKeeperConfiguration(getZooKeeperList()); } // GeneratedInputSplit will generate 5 vertices conf.setLong(GeneratedVertexReader.READER_VERTICES, 5); + + // Setup pathes for temporary files + Path zookeeperDir = getTempPath("_bspZooKeeper"); + Path zkManagerDir = getTempPath("_defaultZkManagerDir"); + Path checkPointDir = getTempPath("_checkpoints"); + + // We might start several jobs per test, so we need to clean up here + FileUtils.deletePath(conf, zookeeperDir); + FileUtils.deletePath(conf, zkManagerDir); + FileUtils.deletePath(conf, checkPointDir); + + conf.set(GiraphJob.ZOOKEEPER_DIR, zookeeperDir.toString()); + conf.set(GiraphJob.ZOOKEEPER_MANAGER_DIRECTORY, + zkManagerDir.toString()); + conf.set(GiraphJob.CHECKPOINT_DIRECTORY, checkPointDir.toString()); + + return conf; + } + + /** + * Create a temporary path + * + * @param name name of the file to create in the temporary folder + * @return newly created temporary path + */ + protected Path getTempPath(String name) { + return new Path(DEFAULT_TEMP_DIR, name); + } + + /** + * Prepare a GiraphJob for test purposes + * + * @param name identifying name for the job + * @param vertexClass class of the vertex to run + * @param vertexInputFormatClass inputformat to use + * @return fully configured job instance + * @throws IOException + */ + protected GiraphJob prepareJob(String name, Class vertexClass, + Class vertexInputFormatClass) throws IOException { + return prepareJob(name, vertexClass, vertexInputFormatClass, null, + null); + } + + /** + * Prepare a GiraphJob for test purposes + * + * @param name identifying name for the job + * @param vertexClass class of the vertex to run + * @param vertexInputFormatClass inputformat to use + * @param vertexOutputFormatClass outputformat to use + * @param outputPath destination path for the output + * @return fully configured job instance + * @throws IOException + */ + protected GiraphJob prepareJob(String name, Class vertexClass, + Class vertexInputFormatClass, Class vertexOutputFormatClass, + Path outputPath) throws IOException { + return prepareJob(name, vertexClass, null, vertexInputFormatClass, + vertexOutputFormatClass, outputPath); + } + + /** + * Prepare a GiraphJob for test purposes + * + * @param name identifying name for the job + * @param vertexClass class of the vertex to run + * @param workerContextClass class of the workercontext to use + * @param vertexInputFormatClass inputformat to use + * @param vertexOutputFormatClass outputformat to use + * @param outputPath destination path for the output + * @return fully configured job instance + * @throws IOException + */ + protected GiraphJob prepareJob(String name, Class vertexClass, + Class workerContextClass, Class vertexInputFormatClass, + Class vertexOutputFormatClass, Path outputPath) throws IOException { + GiraphJob job = new GiraphJob(name); + setupConfiguration(job); + job.setVertexClass(vertexClass); + job.setVertexInputFormatClass(vertexInputFormatClass); + + if (workerContextClass != null) { + job.setWorkerContextClass(workerContextClass); + } + if (vertexOutputFormatClass != null) { + job.setVertexOutputFormatClass(vertexOutputFormatClass); + } + if (outputPath != null) { + removeAndSetOutput(job, outputPath); + } + + return job; } private String getName() { @@ -102,7 +209,7 @@ public BspCase(String testName) { /** * Get the number of workers used in the BSP application * - * @param numProcs number of processes to use + * @return number of workers */ public int getNumWorkers() { return numWorkers; @@ -125,29 +232,28 @@ String getJarLocation() { } /** - * Get the job tracker location + * Are the tests executed on a real hadoop instance? * - * @return job tracker location as a string + * @return whether we use a real hadoop instance or not */ - String getJobTracker() { - return jobTracker; + boolean runningInDistributedMode() { + return jobTracker != null; } /** * Get the single part file status and make sure there is only one part * - * @param job Job to get the file system from + * @param conf Configuration to get the file system from * @param partDirPath Directory where the single part file should exist * @return Single part file status * @throws IOException */ - public static FileStatus getSinglePartFileStatus(GiraphJob job, + public static FileStatus getSinglePartFileStatus(Configuration conf, Path partDirPath) throws IOException { - FileSystem fs = FileSystem.get(job.getConfiguration()); - FileStatus[] statusArray = fs.listStatus(partDirPath); + FileSystem fs = FileSystem.get(conf); FileStatus singlePartFileStatus = null; int partFiles = 0; - for (FileStatus fileStatus : statusArray) { + for (FileStatus fileStatus : fs.listStatus(partDirPath)) { if (fileStatus.getPath().getName().equals("part-m-00000")) { singlePartFileStatus = fileStatus; } @@ -155,46 +261,58 @@ public static FileStatus getSinglePartFileStatus(GiraphJob job, ++partFiles; } } - if (partFiles != 1) { - throw new ArithmeticException( - "getSinglePartFile: Part file count should be 1, but is " + - partFiles); - } + + Preconditions.checkState(partFiles == 1, "getSinglePartFile: Part file " + + "count should be 1, but is " + partFiles); + return singlePartFileStatus; } + /** + * Read all parts- files in the output and count their lines. This works only for textual output! + * + * @param conf + * @param outputPath + * @return + * @throws IOException + */ + public int getNumResults(Configuration conf, Path outputPath) + throws IOException { + FileSystem fs = FileSystem.get(conf); + int numResults = 0; + for (FileStatus status : fs.listStatus(outputPath, PARTS_FILTER)) { + FSDataInputStream in = null; + BufferedReader reader = null; + try { + in = fs.open(status.getPath()); + reader = new BufferedReader(new InputStreamReader(in, Charsets.UTF_8)); + while (reader.readLine() != null) { + numResults++; + } + } finally { + Closeables.closeQuietly(in); + Closeables.closeQuietly(reader); + } + } + return numResults; + } + @Before public void setUp() { - if (jobTracker != null) { + if (runningInDistributedMode()) { System.out.println("Setting tasks to 3 for " + getName() + " since JobTracker exists..."); numWorkers = 3; } try { - Configuration conf = new Configuration(); - FileSystem hdfs = FileSystem.get(conf); - // Since local jobs always use the same paths, remove them - Path oldLocalJobPaths = new Path( - GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT); - FileStatus[] fileStatusArr; - try { - fileStatusArr = hdfs.listStatus(oldLocalJobPaths); - for (FileStatus fileStatus : fileStatusArr) { - if (fileStatus.isDir() && - fileStatus.getPath().getName().contains("job_local")) { - System.out.println("Cleaning up local job path " + - fileStatus.getPath().getName()); - hdfs.delete(oldLocalJobPaths, true); - } - } - } catch (FileNotFoundException e) { - // ignore this FileNotFound exception and continue. - } + + cleanupTemporaryFiles(); + if (zkList == null) { return; } ZooKeeperExt zooKeeperExt = - new ZooKeeperExt(zkList, 30*1000, this); + new ZooKeeperExt(zkList, 30 * 1000, this); List rootChildren = zooKeeperExt.getChildren("/", false); for (String rootChild : rootChildren) { if (rootChild.startsWith("_hadoopBsp")) { @@ -216,6 +334,18 @@ public void setUp() { } } + @After + public void tearDown() throws IOException { + cleanupTemporaryFiles(); + } + + /** + * Remove temporary files + */ + private void cleanupTemporaryFiles() throws IOException { + FileUtils.deletePath(new Configuration(), DEFAULT_TEMP_DIR); + } + @Override public void process(WatchedEvent event) { // Do nothing @@ -227,28 +357,15 @@ public void process(WatchedEvent event) { * FileOutputFormat. * * @param job Job to set the output path for - * @param outputPathString Path to output as a string + * @param outputPath Path to output * @throws IOException */ public static void removeAndSetOutput(GiraphJob job, Path outputPath) throws IOException { - remove(job.getConfiguration(), outputPath); + FileUtils.deletePath(job.getConfiguration(), outputPath); FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath); } - /** - * Helper method to remove a path if it exists. - * - * @param conf Configuration to load FileSystem from - * @param path Path to remove - * @throws IOException - */ - public static void remove(Configuration conf, Path path) - throws IOException { - FileSystem hdfs = FileSystem.get(conf); - hdfs.delete(path, true); - } - public static String getCallingMethodName() { return Thread.currentThread().getStackTrace()[2].getMethodName(); } diff --git a/src/test/java/org/apache/giraph/TestAutoCheckpoint.java b/src/test/java/org/apache/giraph/TestAutoCheckpoint.java index 807e9b2a0..2aaa810cd 100644 --- a/src/test/java/org/apache/giraph/TestAutoCheckpoint.java +++ b/src/test/java/org/apache/giraph/TestAutoCheckpoint.java @@ -26,6 +26,7 @@ import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat; import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat; import org.apache.giraph.graph.GiraphJob; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.junit.Test; @@ -33,19 +34,6 @@ * Unit test for automated checkpoint restarting */ public class TestAutoCheckpoint extends BspCase { - /** Where the checkpoints will be stored and restarted */ - private final String HDFS_CHECKPOINT_DIR = - "/tmp/testBspCheckpoints"; - - /** - * Create the test case - * - * @param testName name of the test case - */ - public TestAutoCheckpoint(String testName) { - super(testName); - } - public TestAutoCheckpoint() { super(TestAutoCheckpoint.class.getName()); @@ -62,31 +50,30 @@ public TestAutoCheckpoint() { @Test public void testSingleFault() throws IOException, InterruptedException, ClassNotFoundException { - if (getJobTracker() == null) { + if (!runningInDistributedMode()) { System.out.println( "testSingleFault: Ignore this test in local mode."); return; } - GiraphJob job = new GiraphJob(getCallingMethodName()); - setupConfiguration(job); - job.getConfiguration().setBoolean(SimpleCheckpointVertex.ENABLE_FAULT, - true); - job.getConfiguration().setInt("mapred.map.max.attempts", 4); + Path outputPath = getTempPath(getCallingMethodName()); + GiraphJob job = prepareJob(getCallingMethodName(), + SimpleCheckpointVertex.class, + SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class, + SimpleSuperstepVertexInputFormat.class, + SimpleSuperstepVertexOutputFormat.class, + outputPath); + + Configuration conf = job.getConfiguration(); + conf.setBoolean(SimpleCheckpointVertex.ENABLE_FAULT, true); + conf.setInt("mapred.map.max.attempts", 4); // Trigger failure faster - job.getConfiguration().setInt("mapred.task.timeout", 30000); - job.getConfiguration().setInt(GiraphJob.POLL_MSECS, 5000); - job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 2); - job.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY, - HDFS_CHECKPOINT_DIR); - job.getConfiguration().setBoolean( - GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false); - job.setVertexClass(SimpleCheckpointVertex.class); - job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); - job.setWorkerContextClass( - SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class); - Path outputPath = new Path("/tmp/" + getCallingMethodName()); - removeAndSetOutput(job, outputPath); + conf.setInt("mapred.task.timeout", 30000); + conf.setInt(GiraphJob.POLL_MSECS, 5000); + conf.setInt(GiraphJob.CHECKPOINT_FREQUENCY, 2); + conf.set(GiraphJob.CHECKPOINT_DIRECTORY, + getTempPath("_singleFaultCheckpoints").toString()); + conf.setBoolean(GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false); + assertTrue(job.run(true)); } } diff --git a/src/test/java/org/apache/giraph/TestBspBasic.java b/src/test/java/org/apache/giraph/TestBspBasic.java index 8fcad72ef..461333541 100644 --- a/src/test/java/org/apache/giraph/TestBspBasic.java +++ b/src/test/java/org/apache/giraph/TestBspBasic.java @@ -20,62 +20,63 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; -import org.apache.giraph.examples.SimpleAggregatorWriter; -import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat; -import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat; -import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat; -import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat; +import com.google.common.base.Charsets; +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; import org.apache.giraph.examples.GeneratedVertexReader; +import org.apache.giraph.examples.LongSumAggregator; +import org.apache.giraph.examples.MaxAggregator; +import org.apache.giraph.examples.MinAggregator; import org.apache.giraph.examples.SimpleCombinerVertex; import org.apache.giraph.examples.SimpleFailVertex; import org.apache.giraph.examples.SimpleMsgVertex; import org.apache.giraph.examples.SimplePageRankVertex; +import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat; import org.apache.giraph.examples.SimpleShortestPathsVertex; +import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat; import org.apache.giraph.examples.SimpleSumCombiner; import org.apache.giraph.examples.SimpleSuperstepVertex; +import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat; +import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat; +import org.apache.giraph.graph.BasicVertex; import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.TextAggregatorWriter; import org.apache.giraph.graph.VertexInputFormat; -import org.apache.giraph.graph.BasicVertex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobContext; /*if[HADOOP_NON_SASL_RPC] else[HADOOP_NON_SASL_RPC]*/ import org.apache.hadoop.mapreduce.task.JobContextImpl; /*end[HADOOP_NON_SASL_RPC]*/ +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.lang.reflect.InvocationTargetException; import java.util.List; +import java.util.Map; + import org.junit.Test; /** * Unit test for many simple BSP applications. */ public class TestBspBasic extends BspCase { - /** - * Create the test case - * - * @param testName name of the test case - */ - public TestBspBasic(String testName) { - super(testName); - } public TestBspBasic() { super(TestBspBasic.class.getName()); @@ -101,9 +102,8 @@ public void testInstantiateVertex() InvocationTargetException, SecurityException, NoSuchMethodException { System.out.println("testInstantiateVertex: java.class.path=" + System.getProperty("java.class.path")); - GiraphJob job = new GiraphJob(getCallingMethodName()); - job.setVertexClass(SimpleSuperstepVertex.class); - job.setVertexInputFormatClass( + GiraphJob job = prepareJob(getCallingMethodName(), + SimpleSuperstepVertex.class, SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class); GraphState gs = new GraphState 34.030 && maxPageRank < 34.0301); - assertTrue("0.03 !< " + minPageRank + " !< " + "0.03001", - minPageRank > 0.03 && minPageRank < 0.03001); - assertTrue("numVertices = " + numVertices + " != 5", numVertices == 5); + " minPageRank=" + minPageRank + " numVertices=" + numVertices); + assertEquals(34.03, maxPageRank, 0.001); + assertEquals(0.03, minPageRank, 0.00001); + assertEquals(5l, numVertices); } } @@ -327,32 +306,21 @@ public void testBspPageRank() @Test public void testBspShortestPaths() throws IOException, InterruptedException, ClassNotFoundException { - GiraphJob job = new GiraphJob(getCallingMethodName()); - setupConfiguration(job); - job.setVertexClass(SimpleShortestPathsVertex.class); - job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class); - job.setVertexOutputFormatClass( - SimpleShortestPathsVertexOutputFormat.class); - job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID, 0); - Path outputPath = new Path("/tmp/" + getCallingMethodName()); - removeAndSetOutput(job, outputPath); - assertTrue(job.run(true)); + Path outputPath = getTempPath(getCallingMethodName()); + GiraphJob job = prepareJob(getCallingMethodName(), + SimpleShortestPathsVertex.class, + SimplePageRankVertex.SimplePageRankVertexInputFormat.class, + SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat.class, + outputPath); + Configuration conf = job.getConfiguration(); + conf.setLong(SimpleShortestPathsVertex.SOURCE_ID, 0); - job = new GiraphJob(getCallingMethodName()); - setupConfiguration(job); - job.setVertexClass(SimpleShortestPathsVertex.class); - job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class); - job.setVertexOutputFormatClass( - SimpleShortestPathsVertexOutputFormat.class); - job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID, 0); - Path outputPath2 = new Path("/tmp/" + getCallingMethodName() + "2"); - removeAndSetOutput(job, outputPath2); assertTrue(job.run(true)); - if (getJobTracker() == null) { - FileStatus fileStatus = getSinglePartFileStatus(job, outputPath); - FileStatus fileStatus2 = getSinglePartFileStatus(job, outputPath2); - assertTrue(fileStatus.getLen() == fileStatus2.getLen()); - } + + int numResults = getNumResults(job.getConfiguration(), outputPath); + + int expectedNumResults = runningInDistributedMode() ? 15 : 5; + assertEquals(expectedNumResults, numResults); } /** @@ -365,56 +333,82 @@ public void testBspShortestPaths() @Test public void testBspPageRankWithAggregatorWriter() throws IOException, InterruptedException, ClassNotFoundException { - GiraphJob job = new GiraphJob(getCallingMethodName()); - setupConfiguration(job); - job.setVertexClass(SimplePageRankVertex.class); + Path outputPath = getTempPath(getCallingMethodName()); + GiraphJob job = prepareJob(getCallingMethodName(), + SimplePageRankVertex.class, + SimplePageRankVertex.SimplePageRankVertexInputFormat.class, + SimplePageRankVertex.SimplePageRankVertexOutputFormat.class, + outputPath); job.setWorkerContextClass( SimplePageRankVertex.SimplePageRankVertexWorkerContext.class); - job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class); - job.setAggregatorWriterClass(SimpleAggregatorWriter.class); - Path outputPath = new Path("/tmp/" + getCallingMethodName()); - removeAndSetOutput(job, outputPath); + + Configuration conf = job.getConfiguration(); + + job.setAggregatorWriterClass(TextAggregatorWriter.class); + Path aggregatorValues = getTempPath("aggregatorValues"); + conf.setInt(TextAggregatorWriter.FREQUENCY, TextAggregatorWriter.ALWAYS); + conf.set(TextAggregatorWriter.FILENAME, aggregatorValues.toString()); + assertTrue(job.run(true)); - if (getJobTracker() == null) { - double maxPageRank = - SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax(); - double minPageRank = - SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin(); - long numVertices = - SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum(); - System.out.println("testBspPageRank: maxPageRank=" + maxPageRank + - " minPageRank=" + minPageRank + - " numVertices=" + numVertices); - FileSystem fs = FileSystem.get(new Configuration()); - FSDataInputStream input = - fs.open(new Path(SimpleAggregatorWriter.getFilename())); - int i, all; - for (i = 0; ; i++) { - all = 0; + + FileSystem fs = FileSystem.get(conf); + Path valuesFile = new Path(aggregatorValues.toString() + "_0"); + + try { + if (!runningInDistributedMode()) { + double maxPageRank = + SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax(); + double minPageRank = + SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin(); + long numVertices = + SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum(); + System.out.println("testBspPageRank: maxPageRank=" + maxPageRank + + " minPageRank=" + minPageRank + " numVertices=" + numVertices); + + FSDataInputStream in = null; + BufferedReader reader = null; try { - DoubleWritable max = new DoubleWritable(); - max.readFields(input); - all++; - DoubleWritable min = new DoubleWritable(); - min.readFields(input); - all++; - LongWritable sum = new LongWritable(); - sum.readFields(input); - all++; - if (i > 0) { - assertTrue(max.get() == maxPageRank); - assertTrue(min.get() == minPageRank); - assertTrue(sum.get() == numVertices); + Map minValues = Maps.newHashMap(); + Map maxValues = Maps.newHashMap(); + Map vertexCounts = Maps.newHashMap(); + + in = fs.open(valuesFile); + reader = new BufferedReader(new InputStreamReader(in, + Charsets.UTF_8)); + String line; + while ((line = reader.readLine()) != null) { + String[] tokens = line.split("\t"); + int superstep = Integer.parseInt(tokens[0].split("=")[1]); + String value = (tokens[1].split("=")[1]); + String aggregatorName = tokens[2]; + + if (MinAggregator.class.getName().equals(aggregatorName)) { + minValues.put(superstep, Double.parseDouble(value)); + } + if (MaxAggregator.class.getName().equals(aggregatorName)) { + maxValues.put(superstep, Double.parseDouble(value)); + } + if (LongSumAggregator.class.getName().equals(aggregatorName)) { + vertexCounts.put(superstep, Long.parseLong(value)); + } } - } catch (IOException e) { - break; + + int maxSuperstep = SimplePageRankVertex.MAX_SUPERSTEPS; + assertEquals(maxSuperstep + 1, minValues.size()); + assertEquals(maxSuperstep + 1, maxValues.size()); + assertEquals(maxSuperstep + 1, vertexCounts.size()); + + assertEquals(maxPageRank, maxValues.get(maxSuperstep)); + assertEquals(minPageRank, minValues.get(maxSuperstep)); + assertEquals(numVertices, vertexCounts.get(maxSuperstep)); + + } finally { + Closeables.closeQuietly(in); + Closeables.closeQuietly(reader); } } - input.close(); - // contained all supersteps - assertTrue(i == SimplePageRankVertex.MAX_SUPERSTEPS+1 && all == 0); - remove(new Configuration(), - new Path(SimpleAggregatorWriter.getFilename())); + } finally { + fs.delete(valuesFile, false); } } } diff --git a/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/src/test/java/org/apache/giraph/TestGraphPartitioner.java index 10b68969c..bbd870bb1 100644 --- a/src/test/java/org/apache/giraph/TestGraphPartitioner.java +++ b/src/test/java/org/apache/giraph/TestGraphPartitioner.java @@ -18,6 +18,7 @@ package org.apache.giraph; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -39,19 +40,26 @@ * Unit test for manual checkpoint restarting */ public class TestGraphPartitioner extends BspCase { - /** - * Create the test case - * - * @param testName name of the test case - */ - public TestGraphPartitioner(String testName) { - super(testName); - } - + public TestGraphPartitioner() { super(TestGraphPartitioner.class.getName()); } + private void verifyOutput(FileSystem fs, Path outputPath) + throws IOException { + final int correctLen = 123; + if (runningInDistributedMode()) { + FileStatus [] fileStatusArr = fs.listStatus(outputPath); + int totalLen = 0; + for (FileStatus fileStatus : fileStatusArr) { + if (fileStatus.getPath().toString().contains("/part-m-")) { + totalLen += fileStatus.getLen(); + } + } + assertEquals(correctLen, totalLen); + } + } + /** * Run a sample BSP job locally and test various partitioners and * partition algorithms. @@ -63,76 +71,43 @@ public TestGraphPartitioner() { @Test public void testPartitioners() throws IOException, InterruptedException, ClassNotFoundException { - final int correctLen = 123; - GiraphJob job = new GiraphJob("testVertexBalancer"); - setupConfiguration(job); - job.setVertexClass(SimpleCheckpointVertex.class); - job.setWorkerContextClass( - SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class); - job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); + Path outputPath = getTempPath("testVertexBalancer"); + GiraphJob job = prepareJob("testVertexBalancer", + SimpleCheckpointVertex.class, + SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class, + SimpleSuperstepVertexInputFormat.class, + SimpleSuperstepVertexOutputFormat.class, outputPath); + job.getConfiguration().set( PartitionBalancer.PARTITION_BALANCE_ALGORITHM, PartitionBalancer.VERTICES_BALANCE_ALGORITHM); - Path outputPath = new Path("/tmp/testVertexBalancer"); - removeAndSetOutput(job, outputPath); + assertTrue(job.run(true)); FileSystem hdfs = FileSystem.get(job.getConfiguration()); - if (getJobTracker() != null) { - FileStatus [] fileStatusArr = hdfs.listStatus(outputPath); - int totalLen = 0; - for (FileStatus fileStatus : fileStatusArr) { - if (fileStatus.getPath().toString().contains("/part-m-")) { - totalLen += fileStatus.getLen(); - } - } - assertTrue(totalLen == correctLen); - } - job = new GiraphJob("testHashPartitioner"); - setupConfiguration(job); - job.setVertexClass(SimpleCheckpointVertex.class); - job.setWorkerContextClass( - SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class); - job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); - outputPath = new Path("/tmp/testHashPartitioner"); - removeAndSetOutput(job, outputPath); + + outputPath = getTempPath("testHashPartitioner"); + job = prepareJob("testHashPartitioner", SimpleCheckpointVertex.class, + SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class, + SimpleSuperstepVertexInputFormat.class, + SimpleSuperstepVertexOutputFormat.class, outputPath); assertTrue(job.run(true)); - if (getJobTracker() != null) { - FileStatus [] fileStatusArr = hdfs.listStatus(outputPath); - int totalLen = 0; - for (FileStatus fileStatus : fileStatusArr) { - if (fileStatus.getPath().toString().contains("/part-m-")) { - totalLen += fileStatus.getLen(); - } - } - assertTrue(totalLen == correctLen); - } + verifyOutput(hdfs, outputPath); + + outputPath = getTempPath("testSuperstepHashPartitioner"); + job = prepareJob("testSuperstepHashPartitioner", + SimpleCheckpointVertex.class, + SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class, + SimpleSuperstepVertexInputFormat.class, + SimpleSuperstepVertexOutputFormat.class, + outputPath); - job = new GiraphJob("testSuperstepHashPartitioner"); - setupConfiguration(job); - job.setVertexClass(SimpleCheckpointVertex.class); - job.setWorkerContextClass( - SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class); - job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); job.setGraphPartitionerFactoryClass( SuperstepHashPartitionerFactory.class); - outputPath = new Path("/tmp/testSuperstepHashPartitioner"); - removeAndSetOutput(job, outputPath); + assertTrue(job.run(true)); - if (getJobTracker() != null) { - FileStatus [] fileStatusArr = hdfs.listStatus(outputPath); - int totalLen = 0; - for (FileStatus fileStatus : fileStatusArr) { - if (fileStatus.getPath().toString().contains("/part-m-")) { - totalLen += fileStatus.getLen(); - } - } - assertTrue(totalLen == correctLen); - } + verifyOutput(hdfs, outputPath); job = new GiraphJob("testHashRangePartitioner"); setupConfiguration(job); @@ -146,41 +121,19 @@ public void testPartitioners() outputPath = new Path("/tmp/testHashRangePartitioner"); removeAndSetOutput(job, outputPath); assertTrue(job.run(true)); - if (getJobTracker() != null) { - FileStatus [] fileStatusArr = hdfs.listStatus(outputPath); - int totalLen = 0; - for (FileStatus fileStatus : fileStatusArr) { - if (fileStatus.getPath().toString().contains("/part-m-")) { - totalLen += fileStatus.getLen(); - } - } - assertTrue(totalLen == correctLen); - } + verifyOutput(hdfs, outputPath); - job = new GiraphJob("testReverseIdSuperstepHashPartitioner"); - setupConfiguration(job); - job.setVertexClass(SimpleCheckpointVertex.class); - job.setWorkerContextClass( - SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class); - job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); + outputPath = getTempPath("testReverseIdSuperstepHashPartitioner"); + job = prepareJob("testReverseIdSuperstepHashPartitioner", + SimpleCheckpointVertex.class, + SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class, + SimpleSuperstepVertexInputFormat.class, + SimpleSuperstepVertexOutputFormat.class, outputPath); job.setGraphPartitionerFactoryClass( SuperstepHashPartitionerFactory.class); job.getConfiguration().setBoolean( - GeneratedVertexReader.REVERSE_ID_ORDER, - true); - outputPath = new Path("/tmp/testReverseIdSuperstepHashPartitioner"); - removeAndSetOutput(job, outputPath); + GeneratedVertexReader.REVERSE_ID_ORDER, true); assertTrue(job.run(true)); - if (getJobTracker() != null) { - FileStatus [] fileStatusArr = hdfs.listStatus(outputPath); - int totalLen = 0; - for (FileStatus fileStatus : fileStatusArr) { - if (fileStatus.getPath().toString().contains("/part-m-")) { - totalLen += fileStatus.getLen(); - } - } - assertTrue(totalLen == correctLen); - } + verifyOutput(hdfs, outputPath); } } diff --git a/src/test/java/org/apache/giraph/TestJsonBase64Format.java b/src/test/java/org/apache/giraph/TestJsonBase64Format.java index be3e25c97..a94180213 100644 --- a/src/test/java/org/apache/giraph/TestJsonBase64Format.java +++ b/src/test/java/org/apache/giraph/TestJsonBase64Format.java @@ -17,6 +17,7 @@ */ package org.apache.giraph; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -27,6 +28,7 @@ import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.lib.JsonBase64VertexInputFormat; import org.apache.giraph.lib.JsonBase64VertexOutputFormat; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -60,54 +62,43 @@ public TestJsonBase64Format() { */ @Test public void testContinue() - throws IOException, InterruptedException, ClassNotFoundException { - GiraphJob job = new GiraphJob(getCallingMethodName()); - setupConfiguration(job); - job.setVertexClass(PageRankBenchmark.class); - job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class); - job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class); + throws IOException, InterruptedException, ClassNotFoundException { + + Path outputPath = getTempPath(getCallingMethodName()); + GiraphJob job = prepareJob(getCallingMethodName(), PageRankBenchmark.class, + PseudoRandomVertexInputFormat.class, + JsonBase64VertexOutputFormat.class, outputPath); job.getConfiguration().setLong( PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101); job.getConfiguration().setLong( PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2); job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 2); - Path outputPath = new Path("/tmp/" + getCallingMethodName()); - removeAndSetOutput(job, outputPath); + assertTrue(job.run(true)); - job = new GiraphJob(getCallingMethodName()); - setupConfiguration(job); - job.setVertexClass(PageRankBenchmark.class); - job.setVertexInputFormatClass(JsonBase64VertexInputFormat.class); - job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class); + Path outputPath2 = getTempPath(getCallingMethodName() + "2"); + job = prepareJob(getCallingMethodName(), PageRankBenchmark.class, + JsonBase64VertexInputFormat.class, JsonBase64VertexOutputFormat.class, + outputPath2); job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 3); FileInputFormat.setInputPaths(job.getInternalJob(), outputPath); - Path outputPath2 = new Path("/tmp/" + getCallingMethodName() + "2"); - removeAndSetOutput(job, outputPath2); assertTrue(job.run(true)); - FileStatus twoJobsFile = null; - if (getJobTracker() == null) { - twoJobsFile = getSinglePartFileStatus(job, outputPath); - } - - job = new GiraphJob(getCallingMethodName()); - setupConfiguration(job); - job.setVertexClass(PageRankBenchmark.class); - job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class); - job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class); + Path outputPath3 = getTempPath(getCallingMethodName() + "3"); + job = prepareJob(getCallingMethodName(), PageRankBenchmark.class, + PseudoRandomVertexInputFormat.class, + JsonBase64VertexOutputFormat.class, outputPath3); job.getConfiguration().setLong( PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101); job.getConfiguration().setLong( PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2); job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 5); - Path outputPath3 = new Path("/tmp/" + getCallingMethodName() + "3"); - removeAndSetOutput(job, outputPath3); assertTrue(job.run(true)); - if (getJobTracker() == null) { - FileStatus oneJobFile = getSinglePartFileStatus(job, outputPath3); - assertTrue(twoJobsFile.getLen() == oneJobFile.getLen()); - } + Configuration conf = job.getConfiguration(); + + assertEquals(101, getNumResults(conf, outputPath)); + assertEquals(101, getNumResults(conf, outputPath2)); + assertEquals(101, getNumResults(conf, outputPath3)); } } diff --git a/src/test/java/org/apache/giraph/TestManualCheckpoint.java b/src/test/java/org/apache/giraph/TestManualCheckpoint.java index cf00bb2eb..82ec6a565 100644 --- a/src/test/java/org/apache/giraph/TestManualCheckpoint.java +++ b/src/test/java/org/apache/giraph/TestManualCheckpoint.java @@ -17,6 +17,7 @@ */ package org.apache.giraph; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -33,18 +34,6 @@ * Unit test for manual checkpoint restarting */ public class TestManualCheckpoint extends BspCase { - /** Where the checkpoints will be stored and restarted */ - private final String HDFS_CHECKPOINT_DIR = - "/tmp/testBspCheckpoints"; - - /** - * Create the test case - * - * @param testName name of the test case - */ - public TestManualCheckpoint(String testName) { - super(testName); - } public TestManualCheckpoint() { super(TestManualCheckpoint.class.getName()); @@ -59,61 +48,52 @@ public TestManualCheckpoint() { @Test public void testBspCheckpoint() throws IOException, InterruptedException, ClassNotFoundException { - GiraphJob job = new GiraphJob(getCallingMethodName()); - setupConfiguration(job); + Path checkpointsDir = getTempPath("checkPointsForTesting"); + Path outputPath = getTempPath(getCallingMethodName()); + GiraphJob job = prepareJob(getCallingMethodName(), + SimpleCheckpointVertex.class, + SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class, + SimpleSuperstepVertexInputFormat.class, + SimpleSuperstepVertexOutputFormat.class, outputPath); + job.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY, - HDFS_CHECKPOINT_DIR); + checkpointsDir.toString()); job.getConfiguration().setBoolean( GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false); job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 2); - job.setVertexClass(SimpleCheckpointVertex.class); - job.setWorkerContextClass( - SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class); - job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); - Path outputPath = new Path("/tmp/" + getCallingMethodName()); - removeAndSetOutput(job, outputPath); + assertTrue(job.run(true)); - long fileLen = 0; + long idSum = 0; - if (getJobTracker() == null) { - FileStatus fileStatus = getSinglePartFileStatus(job, outputPath); - fileLen = fileStatus.getLen(); - idSum = - SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.getFinalSum(); + if (!runningInDistributedMode()) { + FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(), + outputPath); + idSum = SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext + .getFinalSum(); System.out.println("testBspCheckpoint: idSum = " + idSum + - " fileLen = " + fileLen); + " fileLen = " + fileStatus.getLen()); } // Restart the test from superstep 2 - System.out.println( - "testBspCheckpoint: Restarting from superstep 2" + - " with checkpoint path = " + HDFS_CHECKPOINT_DIR); - GiraphJob restartedJob = new GiraphJob(getCallingMethodName() + - "Restarted"); - setupConfiguration(restartedJob); + System.out.println("testBspCheckpoint: Restarting from superstep 2" + + " with checkpoint path = " + checkpointsDir); + outputPath = getTempPath(getCallingMethodName() + "Restarted"); + GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted", + SimpleCheckpointVertex.class, + SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class, + SimpleSuperstepVertexInputFormat.class, + SimpleSuperstepVertexOutputFormat.class, outputPath); restartedJob.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY, - HDFS_CHECKPOINT_DIR); - restartedJob.getConfiguration().setLong(GiraphJob.RESTART_SUPERSTEP, 2); - restartedJob.setVertexClass(SimpleCheckpointVertex.class); - restartedJob.setWorkerContextClass( - SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class); - restartedJob.setVertexInputFormatClass( - SimpleSuperstepVertexInputFormat.class); - restartedJob.setVertexOutputFormatClass( - SimpleSuperstepVertexOutputFormat.class); - outputPath = new Path("/tmp/" + getCallingMethodName() + "Restarted"); - removeAndSetOutput(restartedJob, outputPath); + checkpointsDir.toString()); + assertTrue(restartedJob.run(true)); - if (getJobTracker() == null) { - FileStatus fileStatus = getSinglePartFileStatus(job, outputPath); - fileLen = fileStatus.getLen(); - assertTrue(fileStatus.getLen() == fileLen); + if (!runningInDistributedMode()) { long idSumRestarted = - SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.getFinalSum(); + SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext + .getFinalSum(); System.out.println("testBspCheckpoint: idSumRestarted = " + idSumRestarted); - assertTrue(idSum == idSumRestarted); + assertEquals(idSum, idSumRestarted); } } } diff --git a/src/test/java/org/apache/giraph/TestMutateGraphVertex.java b/src/test/java/org/apache/giraph/TestMutateGraphVertex.java index abeda02b4..7a50854df 100644 --- a/src/test/java/org/apache/giraph/TestMutateGraphVertex.java +++ b/src/test/java/org/apache/giraph/TestMutateGraphVertex.java @@ -33,15 +33,7 @@ * Unit test for graph mutation */ public class TestMutateGraphVertex extends BspCase { - /** - * Create the test case - * - * @param testName name of the test case - */ - public TestMutateGraphVertex(String testName) { - super(testName); - } - + public TestMutateGraphVertex() { super(TestMutateGraphVertex.class.getName()); } @@ -56,15 +48,12 @@ public TestMutateGraphVertex() { @Test public void testMutateGraph() throws IOException, InterruptedException, ClassNotFoundException { - GiraphJob job = new GiraphJob(getCallingMethodName()); - setupConfiguration(job); - job.setVertexClass(SimpleMutateGraphVertex.class); - job.setWorkerContextClass( - SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class); - job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class); - job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class); - Path outputPath = new Path("/tmp/" + getCallingMethodName()); - removeAndSetOutput(job, outputPath); + GiraphJob job = prepareJob(getCallingMethodName(), + SimpleMutateGraphVertex.class, + SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class, + SimplePageRankVertexInputFormat.class, + SimplePageRankVertexOutputFormat.class, + getTempPath(getCallingMethodName())); assertTrue(job.run(true)); } } diff --git a/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java b/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java index adb0807b9..e4cb2b113 100644 --- a/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java +++ b/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java @@ -33,15 +33,7 @@ * Unit test for not enough map tasks */ public class TestNotEnoughMapTasks extends BspCase { - /** - * Create the test case - * - * @param testName name of the test case - */ - public TestNotEnoughMapTasks(String testName) { - super(testName); - } - + public TestNotEnoughMapTasks() { super(TestNotEnoughMapTasks.class.getName()); } @@ -56,25 +48,23 @@ public TestNotEnoughMapTasks() { @Test public void testNotEnoughMapTasks() throws IOException, InterruptedException, ClassNotFoundException { - if (getJobTracker() == null) { + if (!runningInDistributedMode()) { System.out.println( "testNotEnoughMapTasks: Ignore this test in local mode."); return; } - GiraphJob job = new GiraphJob(getCallingMethodName()); - setupConfiguration(job); + Path outputPath = getTempPath(getCallingMethodName()); + GiraphJob job = prepareJob(getCallingMethodName(), + SimpleCheckpointVertex.class, + SimpleSuperstepVertexInputFormat.class, + SimpleSuperstepVertexOutputFormat.class, outputPath); + // An unlikely impossible number of workers to achieve final int unlikelyWorkers = Short.MAX_VALUE; - job.setWorkerConfiguration( - unlikelyWorkers, unlikelyWorkers, 100.0f); + job.setWorkerConfiguration(unlikelyWorkers, unlikelyWorkers, 100.0f); // Only one poll attempt of one second to make failure faster job.getConfiguration().setInt(GiraphJob.POLL_ATTEMPTS, 1); job.getConfiguration().setInt(GiraphJob.POLL_MSECS, 1); - job.setVertexClass(SimpleCheckpointVertex.class); - job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); - Path outputPath = new Path("/tmp/" + getCallingMethodName()); - removeAndSetOutput(job, outputPath); assertFalse(job.run(false)); } } diff --git a/src/test/java/org/apache/giraph/TestZooKeeperExt.java b/src/test/java/org/apache/giraph/TestZooKeeperExt.java index e2ffd4608..98e151682 100644 --- a/src/test/java/org/apache/giraph/TestZooKeeperExt.java +++ b/src/test/java/org/apache/giraph/TestZooKeeperExt.java @@ -18,6 +18,7 @@ package org.apache.giraph; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.List; @@ -52,7 +53,7 @@ public void setUp() { return; } zooKeeperExt = - new ZooKeeperExt(zkList, 30*1000, this); + new ZooKeeperExt(zkList, 30 * 1000, this); zooKeeperExt.deleteExt(BASE_PATH, -1, true); } catch (KeeperException.NoNodeException e) { System.out.println("Clean start: No node " + BASE_PATH); @@ -153,7 +154,7 @@ public void testGetChildrenExt() for (String fullPath : sequenceOrderedList) { assertTrue(fullPath.contains(BASE_PATH + "/")); } - assertTrue(sequenceOrderedList.size() == 4); + assertEquals(4, sequenceOrderedList.size()); assertTrue(sequenceOrderedList.get(0).contains("/b")); assertTrue(sequenceOrderedList.get(1).contains("/a")); assertTrue(sequenceOrderedList.get(2).contains("/d")); diff --git a/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java b/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java index 767735dfc..14fe79d60 100644 --- a/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java +++ b/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; @@ -50,7 +51,7 @@ public class TestEdgeListVertex { private GiraphJob job; /** - * Simple instantiable class that extends {@link EdgeArrayVertex}. + * Simple instantiable class that extends {@link EdgeListVertex}. */ private static class IFDLEdgeListVertex extends EdgeListVertex - createVertex(job.getConfiguration()); + createVertex(conf); } @Test