Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-8372
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Jun 30, 2015
2 parents 7b91b74 + a48e619 commit 112ae8f
Show file tree
Hide file tree
Showing 113 changed files with 2,739 additions and 1,344 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,6 @@ The following components are provided under the MIT License. See project link fo
(MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.5 - http://www.slf4j.org)
(MIT License) pyrolite (org.spark-project:pyrolite:2.0.1 - http://pythonhosted.org/Pyro4/)
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
(The MIT License) Mockito (org.mockito:mockito-core:1.8.5 - http://www.mockito.org)
(The MIT License) Mockito (org.mockito:mockito-core:1.9.5 - http://www.mockito.org)
(MIT License) jquery (https://jquery.org/license/)
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)
4 changes: 2 additions & 2 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ setMethod("isLocal",
#'}
setMethod("showDF",
signature(x = "DataFrame"),
function(x, numRows = 20) {
s <- callJMethod(x@sdf, "showString", numToInt(numRows))
function(x, numRows = 20, truncate = TRUE) {
s <- callJMethod(x@sdf, "showString", numToInt(numRows), truncate)
cat(s)
})

Expand Down
10 changes: 0 additions & 10 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
43 changes: 38 additions & 5 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark

import java.io.File
import java.io.{File, FileInputStream}
import java.security.{KeyStore, NoSuchAlgorithmException}
import javax.net.ssl.{KeyManager, KeyManagerFactory, SSLContext, TrustManager, TrustManagerFactory}

import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory
Expand All @@ -38,7 +40,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms to use
* @param enabledAlgorithms a set of encryption algorithms that may be used
*/
private[spark] case class SSLOptions(
enabled: Boolean = false,
Expand All @@ -48,7 +50,8 @@ private[spark] case class SSLOptions(
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty) {
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {

/**
* Creates a Jetty SSL context factory according to the SSL settings represented by this object.
Expand All @@ -63,7 +66,7 @@ private[spark] case class SSLOptions(
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
protocol.foreach(sslContextFactory.setProtocol)
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)

Some(sslContextFactory)
} else {
Expand Down Expand Up @@ -94,14 +97,44 @@ private[spark] case class SSLOptions(
.withValue("akka.remote.netty.tcp.security.protocol",
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
ConfigValueFactory.fromIterable(supportedAlgorithms.toSeq))
.withValue("akka.remote.netty.tcp.enable-ssl",
ConfigValueFactory.fromAnyRef(true)))
} else {
None
}
}

/*
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
* are supported by the current Java security provider for this protocol.
*/
private val supportedAlgorithms: Set[String] = {
var context: SSLContext = null
try {
context = SSLContext.getInstance(protocol.orNull)
/* The set of supported algorithms does not depend upon the keys, trust, or
rng, although they will influence which algorithms are eventually used. */
context.init(null, null, null)
} catch {
case npe: NullPointerException =>
logDebug("No SSL protocol specified")
context = SSLContext.getDefault
case nsa: NoSuchAlgorithmException =>
logDebug(s"No support for requested SSL protocol ${protocol.get}")
context = SSLContext.getDefault
}

val providerAlgorithms = context.getServerSocketFactory.getSupportedCipherSuites.toSet

// Log which algorithms we are discarding
(enabledAlgorithms &~ providerAlgorithms).foreach { cipher =>
logDebug(s"Discarding unsupported cipher $cipher")
}

enabledAlgorithms & providerAlgorithms
}

/** Returns a string representation of this SSLOptions with all the passwords masked. */
override def toString: String = s"SSLOptions{enabled=$enabled, " +
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_dagScheduler = ds
}

/**
* A unique identifier for the Spark application.
* Its format depends on the scheduler implementation.
* (i.e.
* in case of local spark app something like 'local-1433865536131'
* in case of YARN something like 'application_1433865536131_34483'
* )
*/
def applicationId: String = _applicationId
def applicationAttemptId: Option[String] = _applicationAttemptId

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ private[r] object RRDD {
}

private def createRProcess(rLibDir: String, port: Int, script: String): BufferedStreamThread = {
val rCommand = "Rscript"
val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript")
val rOptions = "--vanilla"
val rExecScript = rLibDir + "/SparkR/worker/" + script
val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))
Expand Down
37 changes: 23 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,20 @@ private[spark] object SparkSubmitUtils {
val cr = new ChainResolver
cr.setName("list")

val repositoryList = remoteRepos.getOrElse("")
// add any other remote repositories other than maven central
if (repositoryList.trim.nonEmpty) {
repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
brr.setUsepoms(true)
brr.setRoot(repo)
brr.setName(s"repo-${i + 1}")
cr.add(brr)
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
}
}

val localM2 = new IBiblioResolver
localM2.setM2compatible(true)
localM2.setRoot(m2Path.toURI.toString)
Expand Down Expand Up @@ -786,20 +800,6 @@ private[spark] object SparkSubmitUtils {
sp.setRoot("http://dl.bintray.com/spark-packages/maven")
sp.setName("spark-packages")
cr.add(sp)

val repositoryList = remoteRepos.getOrElse("")
// add any other remote repositories other than maven central
if (repositoryList.trim.nonEmpty) {
repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
brr.setUsepoms(true)
brr.setRoot(repo)
brr.setName(s"repo-${i + 1}")
cr.add(brr)
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
}
}
cr
}

Expand Down Expand Up @@ -922,6 +922,15 @@ private[spark] object SparkSubmitUtils {

// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
// clear ivy resolution from previous launches. The resolution file is usually at
// ~/.ivy2/org.apache.spark-spark-submit-parent-default.xml. In between runs, this file
// leads to confusion with Ivy when the files can no longer be found at the repository
// declared in that file/
val mdId = md.getModuleRevisionId
val previousResolution = new File(ivySettings.getDefaultCache,
s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml")
if (previousResolution.exists) previousResolution.delete

md.setDefaultConf(ivyConfName)

// Add exclusion rules for Spark and Scala Library
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2333,3 +2333,36 @@ private[spark] class RedirectThread(
}
}
}

/**
* An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it
* in a circular buffer. The current contents of the buffer can be accessed using
* the toString method.
*/
private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](sizeInBytes)

def write(i: Int): Unit = {
buffer(pos) = i
pos = (pos + 1) % buffer.length
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while (line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
}
stringBuilder.toString()
}
}
20 changes: 14 additions & 6 deletions core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io.File
import javax.net.ssl.SSLContext

import com.google.common.io.Files
import org.apache.spark.util.Utils
Expand All @@ -29,16 +30,24 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath

// Pick two cipher suites that the provider knows about
val sslContext = SSLContext.getInstance("TLSv1.2")
sslContext.init(null, null, null)
val algorithms = sslContext
.getServerSocketFactory
.getDefaultCipherSuites
.take(2)
.toSet

val conf = new SparkConf
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", keyStorePath)
conf.set("spark.ssl.keyStorePassword", "password")
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
conf.set("spark.ssl.enabledAlgorithms",
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.protocol", "SSLv3")
conf.set("spark.ssl.enabledAlgorithms", algorithms.mkString(","))
conf.set("spark.ssl.protocol", "TLSv1.2")

val opts = SSLOptions.parse(conf, "spark.ssl")

Expand All @@ -52,9 +61,8 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(opts.trustStorePassword === Some("password"))
assert(opts.keyStorePassword === Some("password"))
assert(opts.keyPassword === Some("password"))
assert(opts.protocol === Some("SSLv3"))
assert(opts.enabledAlgorithms ===
Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
assert(opts.protocol === Some("TLSv1.2"))
assert(opts.enabledAlgorithms === algorithms)
}

test("test resolving property with defaults specified ") {
Expand Down
24 changes: 18 additions & 6 deletions core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ object SSLSampleConfigs {
this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath

val enabledAlgorithms =
// A reasonable set of TLSv1.2 Oracle security provider suites
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384, " +
"TLS_RSA_WITH_AES_256_CBC_SHA256, " +
"TLS_DHE_RSA_WITH_AES_256_CBC_SHA256, " +
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256, " +
"TLS_DHE_RSA_WITH_AES_128_CBC_SHA256, " +
// and their equivalent names in the IBM Security provider
"SSL_ECDHE_RSA_WITH_AES_256_CBC_SHA384, " +
"SSL_RSA_WITH_AES_256_CBC_SHA256, " +
"SSL_DHE_RSA_WITH_AES_256_CBC_SHA256, " +
"SSL_ECDHE_RSA_WITH_AES_128_CBC_SHA256, " +
"SSL_DHE_RSA_WITH_AES_128_CBC_SHA256"

def sparkSSLConfig(): SparkConf = {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.ssl.enabled", "true")
Expand All @@ -33,9 +47,8 @@ object SSLSampleConfigs {
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
conf.set("spark.ssl.enabledAlgorithms",
"SSL_RSA_WITH_RC4_128_SHA, SSL_RSA_WITH_DES_CBC_SHA")
conf.set("spark.ssl.protocol", "TLSv1")
conf.set("spark.ssl.enabledAlgorithms", enabledAlgorithms)
conf.set("spark.ssl.protocol", "TLSv1.2")
conf
}

Expand All @@ -47,9 +60,8 @@ object SSLSampleConfigs {
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
conf.set("spark.ssl.enabledAlgorithms",
"SSL_RSA_WITH_RC4_128_SHA, SSL_RSA_WITH_DES_CBC_SHA")
conf.set("spark.ssl.protocol", "TLSv1")
conf.set("spark.ssl.enabledAlgorithms", enabledAlgorithms)
conf.set("spark.ssl.protocol", "TLSv1.2")
conf
}

Expand Down
21 changes: 15 additions & 6 deletions core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ class SecurityManagerSuite extends SparkFunSuite {

test("ssl on setup") {
val conf = SSLSampleConfigs.sparkSSLConfig()
val expectedAlgorithms = Set(
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
"TLS_RSA_WITH_AES_256_CBC_SHA256",
"TLS_DHE_RSA_WITH_AES_256_CBC_SHA256",
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
"TLS_DHE_RSA_WITH_AES_128_CBC_SHA256",
"SSL_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
"SSL_RSA_WITH_AES_256_CBC_SHA256",
"SSL_DHE_RSA_WITH_AES_256_CBC_SHA256",
"SSL_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
"SSL_DHE_RSA_WITH_AES_128_CBC_SHA256")

val securityManager = new SecurityManager(conf)

Expand All @@ -143,9 +154,8 @@ class SecurityManagerSuite extends SparkFunSuite {
assert(securityManager.fileServerSSLOptions.trustStorePassword === Some("password"))
assert(securityManager.fileServerSSLOptions.keyStorePassword === Some("password"))
assert(securityManager.fileServerSSLOptions.keyPassword === Some("password"))
assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1"))
assert(securityManager.fileServerSSLOptions.enabledAlgorithms ===
Set("SSL_RSA_WITH_RC4_128_SHA", "SSL_RSA_WITH_DES_CBC_SHA"))
assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2"))
assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms)

assert(securityManager.akkaSSLOptions.trustStore.isDefined === true)
assert(securityManager.akkaSSLOptions.trustStore.get.getName === "truststore")
Expand All @@ -154,9 +164,8 @@ class SecurityManagerSuite extends SparkFunSuite {
assert(securityManager.akkaSSLOptions.trustStorePassword === Some("password"))
assert(securityManager.akkaSSLOptions.keyStorePassword === Some("password"))
assert(securityManager.akkaSSLOptions.keyPassword === Some("password"))
assert(securityManager.akkaSSLOptions.protocol === Some("TLSv1"))
assert(securityManager.akkaSSLOptions.enabledAlgorithms ===
Set("SSL_RSA_WITH_RC4_128_SHA", "SSL_RSA_WITH_DES_CBC_SHA"))
assert(securityManager.akkaSSLOptions.protocol === Some("TLSv1.2"))
assert(securityManager.akkaSSLOptions.enabledAlgorithms === expectedAlgorithms)
}

test("ssl off setup") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ object JarCreationTest extends Logging {
if (result.nonEmpty) {
throw new Exception("Could not load user class from jar:\n" + result(0))
}
sc.stop()
}
}

Expand All @@ -573,6 +574,7 @@ object SimpleApplicationTest {
s"Master had $config=$masterValue but executor had $config=$executorValue")
}
}
sc.stop()
}
}

Expand Down
Loading

0 comments on commit 112ae8f

Please sign in to comment.