Skip to content

Commit

Permalink
Restore old method for backwards compat.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Aug 26, 2014
1 parent fc45067 commit 3d345cb
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 18 deletions.
37 changes: 23 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,37 @@ class SparkHadoopUtil extends Logging {
}
}

@Deprecated
def newConfiguration(): Configuration = newConfiguration(null)

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
*/
def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new Configuration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)

// Note: this null check is around more than just access to the "conf" object to maintain
// the behavior of the old implementation of this code, for backwards compatibility.
if (conf != null) {
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)

hadoopConf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import java.util.Random
import scala.math.exp

import breeze.linalg.{Vector, DenseVector}
import org.apache.hadoop.conf.Configuration

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo


Expand Down Expand Up @@ -70,7 +70,7 @@ object SparkHdfsLR {

val sparkConf = new SparkConf().setAppName("SparkHdfsLR")
val inputPath = args(0)
val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val conf = new Configuration()
val sc = new SparkContext(sparkConf,
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import java.util.Random
import scala.math.exp

import breeze.linalg.{Vector, DenseVector}
import org.apache.hadoop.conf.Configuration

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -53,7 +53,7 @@ object SparkTachyonHdfsLR {
def main(args: Array[String]) {
val inputPath = args(0)
val sparkConf = new SparkConf().setAppName("SparkTachyonHdfsLR")
val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val conf = new Configuration()
val sc = new SparkContext(sparkConf,
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
Expand Down

0 comments on commit 3d345cb

Please sign in to comment.