Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into concurrent-sql-…
Browse files Browse the repository at this point in the history
…executions
  • Loading branch information
Andrew Or committed Sep 13, 2015
2 parents 3c00cc6 + b3a7480 commit 801fbe0
Show file tree
Hide file tree
Showing 47 changed files with 1,394 additions and 847 deletions.
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ test_that("describe() and summarize() on a DataFrame", {
stats <- describe(df, "age")
expect_equal(collect(stats)[1, "summary"], "count")
expect_equal(collect(stats)[2, "age"], "24.5")
expect_equal(collect(stats)[3, "age"], "5.5")
expect_equal(collect(stats)[3, "age"], "7.7781745930520225")
stats <- describe(df)
expect_equal(collect(stats)[4, "name"], "Andy")
expect_equal(collect(stats)[5, "age"], "30")
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
Expand Down Expand Up @@ -924,7 +924,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
Expand Down Expand Up @@ -1106,7 +1106,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updatedConf = job.getConfiguration
val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ class SparkHadoopUtil extends Logging {
* while it's interface in Hadoop 2.+.
*/
def getConfigurationFromJobContext(context: JobContext): Configuration = {
// scalastyle:off jobconfig
val method = context.getClass.getMethod("getConfiguration")
// scalastyle:on jobconfig
method.invoke(context).asInstanceOf[Configuration]
}

Expand All @@ -204,7 +206,9 @@ class SparkHadoopUtil extends Logging {
*/
def getTaskAttemptIDFromTaskAttemptContext(
context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
// scalastyle:off jobconfig
val method = context.getClass.getMethod("getTaskAttemptID")
// scalastyle:on jobconfig
method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
try {
Snappy.getNativeLibraryVersion
} catch {
case e: Error => throw new IllegalArgumentException
case e: Error => throw new IllegalArgumentException(e)
}

override def compressedOutputStream(s: OutputStream): OutputStream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -996,8 +996,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
job.getConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(job.getConfiguration)
val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
jobConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(jobConfiguration)
}

/**
Expand Down Expand Up @@ -1064,7 +1065,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
val wrappedConf = new SerializableConfiguration(job.getConfiguration)
val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val wrappedConf = new SerializableConfiguration(jobConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
if (isDriverSide) {
initDriverSideJobFuncOpt.map(f => f(job))
}
job.getConfiguration
SparkHadoopUtil.get.getConfigurationFromJobContext(job)
}

private val jobTrackerId: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon

private def doStop(): Unit = {
// Only perform cleanup if an external service is not serving our shuffle files.
if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) {
// Also blockManagerId could be null if block manager is not initialized properly.
if (!blockManager.externalShuffleServiceEnabled ||
(blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver)) {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
Expand Down
Loading

0 comments on commit 801fbe0

Please sign in to comment.