Permalink
Browse files

refactored and commented the testing traits

  • Loading branch information...
etorreborre committed Apr 18, 2012
1 parent 5de9b10 commit 02d1588ab2701a9f7469135eea505c5e15e71584
@@ -12,7 +12,7 @@ trait FileSystems {
* Upload additional local jars to a destination directory on the hdfs
*/
def uploadNewJars(sourceFiles: Seq[File], dest: String): Seq[File] = uploadNewFiles(sourceFiles, dest) { file =>
- DistributedCache.addArchiveToClassPath(new Path(dest+"/"+file.getName), Conf.conf)
+ DistributedCache.addFileToClassPath(new Path(dest+"/"+file.getName), Conf.conf)
file
}
@@ -0,0 +1,11 @@
+package com.nicta.scoobi.testing
+
+/**
+ * Definition of the Cluster addresses
+ */
+trait Cluster {
+ /** @return the filesystem address */
+ def fs: String
+ /** @return the jobtracker address */
+ def jobTracker: String
+}
@@ -1,20 +1,37 @@
package com.nicta.scoobi.testing
import org.specs2.specification.{AroundContextExample}
-import org.specs2.specification.{Tags, Around}
+import org.specs2.specification.{Around}
import org.specs2.execute._
import org.specs2.time.{SimpleTimer}
+/**
+ * This trait provides an Around context to be used in a Specification
+ *
+ * Subclasses need to define the context method:
+ *
+ * - def context = local // execute the code locally
+ * - def context = cluster // execute the code on the cluster
+ * - def context = localThenCluster // execute the code locally then on the cluster in case of a success
+ */
trait HadoopExamples extends WithHadoop with AroundContextExample[Around] {
protected def aroundContext = context
+
+ /** define the context to use: local, cluster or localThenCluster */
def context: Around
+ /**
+ * the execution time will not be displayed with this function, but by adding more information to the execution Result
+ */
override def displayTime(prefix: String) = (timer: SimpleTimer) => ()
- def local = new LocalHadoopContext
- def cluster = new ClusterHadoopContext
- def localThenCluster = new LocalThenClusterHadoopContext
+ /** context for local execution */
+ def local: Around = new LocalHadoopContext
+ /** context for cluster execution */
+ def cluster: Around = new ClusterHadoopContext
+ /** context for local then cluster execution */
+ def localThenCluster: Around = new LocalThenClusterHadoopContext
def cluster[R <% Result](r: =>R) = runOnCluster(r)
@@ -23,10 +40,14 @@ trait HadoopExamples extends WithHadoop with AroundContextExample[Around] {
class ClusterHadoopContext extends Around {
def around[T <% Result](t: =>T) = runOnCluster(t)
}
+
class LocalHadoopContext extends Around {
def around[T <% Result](t: =>T) = runOnLocal(t)
}
+
class LocalThenClusterHadoopContext extends Around {
+ private def trimSeparator(r: Result) = r.mapExpected((_:String).replace(";", ""))
+
def around[T <% Result](t: =>T) = {
val result = runOnLocal(t)
@@ -38,7 +59,6 @@ trait HadoopExamples extends WithHadoop with AroundContextExample[Around] {
def runOnLocal[T <% Result](t: =>T) = showResultTime("Local execution time", super.executeOnLocal(t))
def runOnCluster[T <% Result](t: =>T) = showResultTime("Cluster execution time", super.executeOnCluster(t))
- private def trimSeparator(r: Result) = r.mapExpected((_:String).replace(";", ""))
private def showResultTime[T](prefix: String, t: =>T)(implicit toResult: T => Result): Result = {
if (showTimes) {
val (result, timer) = withTimer(t)
@@ -48,6 +68,9 @@ trait HadoopExamples extends WithHadoop with AroundContextExample[Around] {
} else t
}
+ /**
+ * This code can be removed when specs2 1.10 is released
+ */
implicit def extendResult(r: Result): ExtendedResult = new ExtendedResult(r)
class ExtendedResult(r: Result) {
def mapExpected(map: String => String) = {
@@ -61,14 +84,3 @@ trait HadoopExamples extends WithHadoop with AroundContextExample[Around] {
}
-trait NictaHadoop extends HadoopExamples with Tags with NictaCluster {
-
- // all the examples will be tagged as "acceptance" since they are using the local hadoop installation
- // or the cluster
- section("acceptance")
-}
-
-trait NictaCluster {
- def fs = "hdfs://svm-hadoop1.ssrg.nicta.com.au"
- def jobTracker = "svm-hadoop1.ssrg.nicta.com.au:8021"
-}
@@ -0,0 +1,38 @@
+package com.nicta.scoobi.testing
+
+import java.net.{URLClassLoader, URL}
+import com.nicta.scoobi.io.FileSystems
+import java.io.File
+import com.nicta.scoobi.Conf
+import com.nicta.scoobi.impl.Configurations._
+
+/**
+ * This trait defines:
+ *
+ * - the library jars which can be uploaded to the cluster
+ * - a method to upload and reference them on the classpath for cluster jobs
+ */
+trait LibJars {
+
+ /**
+ * @return the name of the directory to use when loading jars to the filesystem.
+ * the path which will be used will be relative to the user home on the cluster
+ */
+ def libjarsDirectory = "libjars/"
+
+ /**
+ * @return the list of library jars to upload, provided by the jars loaded by the current classloader
+ */
+ def jars: Seq[URL] = Thread.currentThread.getContextClassLoader.asInstanceOf[URLClassLoader].getURLs.filter { url =>
+ Seq(".ivy2", ".m2").exists(url.getFile.contains) || url.getFile.contains("scala-library")
+ }
+
+ /**
+ * upload the jars which don't exist yet in the library directory on the cluster
+ */
+ def uploadLibJars = {
+ FileSystems.uploadNewJars(jars.map(url => new File(url.getFile)), libjarsDirectory)
+ Conf.conf.addValues("mapred.classpath", jars.map(j => libjarsDirectory+(new File(j.getFile).getName)):_*)
+ }
+}
+
@@ -0,0 +1,28 @@
+package com.nicta.scoobi.testing
+
+import org.specs2.specification.{Fragments, SpecificationStructure, Tags}
+import ExtendedFragments._
+
+/**
+ * This trait can be used to create Hadoop specifications on the NictaCluster
+ */
+trait NictaHadoop extends HadoopExamples with NictaTags with NictaCluster with UploadedLibJars { this: SpecificationStructure =>
+ override def map(fs: =>Fragments) = super.map(fs).insert(acceptanceSection)
+}
+
+/**
+ * examples running on the cluster will be tagged as "acceptance"
+ */
+trait NictaTags extends Tags {
+ // all the examples will be tagged as "acceptance" since they are using the local hadoop installation
+ // or the cluster
+ def acceptanceSection = section("acceptance")
+}
+
+/**
+ * Addresses for the filesystem and jobtracker for the Nicta cluster
+ */
+trait NictaCluster extends Cluster {
+ def fs = "hdfs://svm-hadoop1.ssrg.nicta.com.au"
+ def jobTracker = "svm-hadoop1.ssrg.nicta.com.au:8021"
+}
@@ -0,0 +1,30 @@
+package com.nicta.scoobi.testing
+
+import org.specs2.execute.Success
+import org.specs2.specification._
+import ExtendedFragments._
+
+/**
+ * This trait can be mixed in a Specification to automatically add a setup step uploading the library jars to the cluster
+ */
+trait UploadedLibJars extends LibJars { this: SpecificationStructure with Any { def context: Around }=>
+
+ /**
+ * add a first step to upload the library jars before doing anything else
+ */
+ override def map(fs: =>Fragments) = fs.insert(uploadStep)
+
+ /** create a Step to upload the jars on the cluster */
+ private def uploadStep = Step(context { uploadLibJars; Success() })
+
+}
+
+object ExtendedFragments {
+ /**
+ * this utility method won't be necessary with the next specs2 1.10
+ */
+ implicit def extend(fs: =>Fragments): ExtendFragments = new ExtendFragments(fs)
+ class ExtendFragments(fs: =>Fragments) {
+ def insert(f: Fragment) = Fragments.create(fs.start +: f +: fs.middle :+ fs.end:_*)
+ }
+}
@@ -1,48 +1,44 @@
package com.nicta.scoobi
package testing
-import java.net.{URLClassLoader, URL}
import com.nicta.scoobi.Scoobi._
import org.apache.hadoop.util.GenericOptionsParser
import impl.Configurations
import Configurations._
-import org.apache.hadoop.fs.{FileSystem, Path}
-import java.io.File
-import io.FileSystems
-trait WithHadoop extends WithLocalHadoop with LibJars {
+/**
+ * This trait provides methods to execute map-reduce code, either locally or on the cluster.
+ *
+ * @see WithLocalHadoop
+ *
+ * To use this trait, you need to define:
+ *
+ * - the file system address: def fs = "hdfs://svm-hadoop1.ssrg.nicta.com.au"
+ * - the job tracker address: def jobTracker = "svm-hadoop1.ssrg.nicta.com.au:8021"
+ */
- def fs: String
- def jobTracker: String
+trait WithHadoop extends WithLocalHadoop with Cluster with LibJars {
- def classDirs: Seq[String] = Seq("classes", "test-classes").map("target/scala-2.9.1/"+_)
+ /** @return true if you want to include the library jars in the jar that is sent to the cluster for each job */
+ def includeLibJars = false
+
+ /** @return the classes directories to include on a job classpath */
+ def classDirs: Seq[String] = Seq("classes", "test-classes").map("target/scala-"+util.Properties.releaseVersion.getOrElse("2.9.1")+"/"+_)
+ /** execute some code on the cluster, possibly showing the execution time */
def onCluster[T](t: =>T) = showTime(executeOnCluster(t))(displayTime("Cluster execution time"))
+ /**
+ * execute some code on the cluster, setting the filesystem / jobtracker addresses and setting up the classpath
+ */
def executeOnCluster[T](t: =>T) = {
withHadoopArgs(hadoopArgs) { args => }
Conf.conf.set("fs.default.name", fs)
Conf.conf.set("mapred.job.tracker", jobTracker)
-
if (includeLibJars)
new GenericOptionsParser(Conf.conf, Array("-libjars", jars.map(_.getFile).mkString(",")))
classDirs foreach Conf.setUserDir
t
}
}
-
-trait LibJars {
-
- def includeLibJars = false
-
- def jars: Seq[URL] = Thread.currentThread.getContextClassLoader.asInstanceOf[URLClassLoader].getURLs.filter { url =>
- Seq(".ivy2", ".m2").exists(url.getFile.contains) || url.getFile.contains("scala-library")
- }
-
- def uploadLibJars = {
- FileSystems.uploadNewJars(jars.map(url => new File(url.getFile)), "libjars")
- Conf.conf.addValues("mapred.classpath", jars.map(j => "libjars/"+(new java.io.File(j.getFile).getName)):_*)
- }
-}
-
@@ -5,13 +5,20 @@ import java.lang.Class
import org.apache.commons.logging.impl.{SimpleLog, NoOpLog, LogFactoryImpl}
import WithHadoopLogFactory._
+/**
+ * Log factory used for testing.
+ *
+ * It doesn't display any log message by default, unless the QUIET attribute is set to true.
+ *
+ * It can display SCOOBI_TIMES messages if the SHOW_TIMES attributes is true
+ */
class WithHadoopLogFactory() extends LogFactory {
def quiet = Option(getAttribute(QUIET)).map(_.asInstanceOf[Boolean]).getOrElse(true)
def showTimes = Option(getAttribute(SHOW_TIMES)).map(_.asInstanceOf[Boolean]).getOrElse(false)
- private val impl = new LogFactoryImpl
- private val noOps = new NoOpLog
+ private val impl = new LogFactoryImpl
+ private val noOps = new NoOpLog
private val simple = new SimpleLog("TESTING")
def getAttribute(name: String) = impl.getAttribute(name)
@@ -22,8 +29,8 @@ class WithHadoopLogFactory() extends LogFactory {
def getInstance(name: String) =
if (name == SCOOBI_TIMES) simple
- else if (quiet) noOps
- else impl.getInstance(name)
+ else if (quiet) noOps
+ else impl.getInstance(name)
def getInstance(klass: Class[_]) =
if (quiet) noOps
@@ -4,32 +4,49 @@ import org.apache.commons.logging.LogFactory
import org.specs2.time.SimpleTimer
import WithHadoopLogFactory._
+/**
+ * Execute Hadoop code locally
+ */
trait WithLocalHadoop {
def hadoopArgs = Array[String]()
+ /** @return true to suppress log messages */
def quiet = true
+ /** @return true to display execution times for each job */
def showTimes = false
+ /**
+ * Static setup to use a testing log factory
+ */
System.setProperty("org.apache.commons.logging.LogFactory", classOf[WithHadoopLogFactory].getName)
LogFactory.getFactory.setAttribute(QUIET, quiet)
LogFactory.getFactory.setAttribute(SHOW_TIMES, showTimes)
+ /** execute some code locally, possibly showing execution times */
def onLocal[T](t: =>T) = showTime(executeOnLocal(t))(displayTime("Local execution time"))
+ /** execute some code locally */
def executeOnLocal[T](t: =>T) = t
+ /** @return a function to display execution times. The default uses log messages */
def displayTime(prefix: String) = (timer: SimpleTimer) => {
LogFactory.getFactory.getInstance(SCOOBI_TIMES).info(prefix+": "+timer.time)
()
}
+ /**
+ * @return the time for the execution of a piece of code
+ */
def withTimer[T](t: =>T): (T, SimpleTimer) = {
val timer = (new SimpleTimer).start
val result = t
(result, timer.stop)
}
+ /**
+ * measure the time taken by some executed code and display the time with a specific display function
+ */
def showTime[T](t: =>T)(display: SimpleTimer => Unit): T = {
val (result, timer) = withTimer(t)
display(timer)
Oops, something went wrong.

0 comments on commit 02d1588

Please sign in to comment.