Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

add new test cases and code clean

  • Loading branch information...
commit ef2d43fea5089e5107f51c3453eb31674ad55b1f 1 parent ac63136
@CodingCat CodingCat authored
View
22 core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -30,7 +30,7 @@ import scala.reflect.ClassTag
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
@@ -606,7 +606,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val outpath = new Path(path)
NewFileOutputFormat.setOutputPath(job, outpath)
val jobFormat = outputFormatClass.newInstance
- jobFormat.checkOutputSpecs(new JobContext(wrappedConf.value, job.getJobID))
+ jobFormat.checkOutputSpecs(job)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
@@ -697,10 +697,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* MapReduce job.
*/
def saveAsHadoopDataset(conf: JobConf) {
- val outputFormatClass = conf.getOutputFormat
+ val outputFormatInstance = conf.getOutputFormat
val keyClass = conf.getOutputKeyClass
val valueClass = conf.getOutputValueClass
- if (outputFormatClass == null) {
+ if (outputFormatInstance == null) {
throw new SparkException("Output format class not set")
}
if (keyClass == null) {
@@ -713,16 +713,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")
- if (outputFormatClass.isInstanceOf[FileOutputFormat[_, _]]) {
- val outputPath = conf.get("mapred.output.dir")
- if (outputPath == null) {
- throw new SparkException("mapred.output.dir not set")
- }
- val path = new Path(outputPath)
- val fs = path.getFileSystem(conf)
- conf.getOutputFormat.checkOutputSpecs(fs, conf)
+ if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
+ // FileOutputFormat ignores the filesystem parameter
+ val ignoredFs = FileSystem.get(conf)
+ conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
}
-
+
val writer = new SparkHadoopWriter(conf)
writer.preSetup()
View
30 core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.FileAlreadyExistsException
import org.scalatest.FunSuite
import org.apache.spark.SparkContext._
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
class FileSuite extends FunSuite with LocalSparkContext {
@@ -210,24 +211,43 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(rdd.count() === 3)
}
- test ("prevent user from overwriting the empty directory") {
+ test ("prevent user from overwriting the empty directory (old Hadoop API)") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
- var randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
+ val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsTextFile(tempdir.getPath)
}
}
- test ("prevent user from overwriting the non-empty directory") {
+ test ("prevent user from overwriting the non-empty directory (old Hadoop API)") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
- var randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
+ val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
randomRDD.saveAsTextFile(tempdir.getPath + "/output")
assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
- randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsTextFile(tempdir.getPath + "/output")
}
}
+
+ test ("prevent user from overwriting the empty directory (new Hadoop API)") {
+ sc = new SparkContext("local", "test")
+ val tempdir = Files.createTempDir()
+ val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ intercept[FileAlreadyExistsException] {
+ randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
+ }
+ }
+
+ test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
+ sc = new SparkContext("local", "test")
+ val tempdir = Files.createTempDir()
+ val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ randomRDD.saveAsTextFile(tempdir.getPath + "/output")
+ assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
+ intercept[FileAlreadyExistsException] {
+ randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
+ }
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.