Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19810][BUILD][CORE] Remove support for Scala 2.10 #17150

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ sparkR.stop <- function() {
#' list(spark.executor.memory="4g"),
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
#' c("one.jar", "two.jar", "three.jar"),
#' c("com.databricks:spark-avro_2.10:2.0.1"))
#' c("com.databricks:spark-avro_2.11:2.0.1"))
#'}
#' @note sparkR.init since 1.4.0
sparkR.init <- function(
Expand Down Expand Up @@ -357,7 +357,7 @@ sparkRHive.init <- function(jsc = NULL) {
#' sparkR.session("yarn-client", "SparkR", "/home/spark",
#' list(spark.executor.memory="4g"),
#' c("one.jar", "two.jar", "three.jar"),
#' c("com.databricks:spark-avro_2.10:2.0.1"))
#' c("com.databricks:spark-avro_2.11:2.0.1"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes to dummy artifacts used in examples aren't that important, but figured it'd be tidy

#' sparkR.session(spark.master = "yarn-client", spark.executor.memory = "4g")
#'}
#' @note sparkR.session since 2.0.0
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/tests/fulltests/test_client.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ test_that("multiple packages don't produce a warning", {

test_that("sparkJars sparkPackages as character vectors", {
args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
c("com.databricks:spark-avro_2.10:2.0.1"))
c("com.databricks:spark-avro_2.11:2.0.1"))
expect_match(args, "--jars one.jar,two.jar,three.jar")
expect_match(args, "--packages com.databricks:spark-avro_2.10:2.0.1")
expect_match(args, "--packages com.databricks:spark-avro_2.11:2.0.1")
})
22 changes: 11 additions & 11 deletions bin/load-spark-env.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ if [%SPARK_ENV_LOADED%] == [] (

rem Setting SPARK_SCALA_VERSION if not already set.

set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.10"
rem set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
rem set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented bits like this out, but updated it as well for when 2.12 needs to be supported as an alternative


if [%SPARK_SCALA_VERSION%] == [] (

if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
echo "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected."
echo "Either clean one of them or, set SPARK_SCALA_VERSION=2.11 in spark-env.cmd."
exit 1
)
if exist %ASSEMBLY_DIR2% (
rem if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
rem echo "Presence of build for multiple Scala versions detected."
rem echo "Either clean one of them or, set SPARK_SCALA_VERSION=2.11 in spark-env.cmd."
rem exit 1
rem )
rem if exist %ASSEMBLY_DIR2% (
set SPARK_SCALA_VERSION=2.11
) else (
set SPARK_SCALA_VERSION=2.10
)
rem ) else (
rem set SPARK_SCALA_VERSION=2.12
rem )
)
exit /b 0

Expand Down
22 changes: 11 additions & 11 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ fi

if [ -z "$SPARK_SCALA_VERSION" ]; then

ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.10"
#ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
#ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"

if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
exit 1
fi
#if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
# echo -e "Presence of build for multiple Scala versions detected." 1>&2
# echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
# exit 1
#fi

if [ -d "$ASSEMBLY_DIR2" ]; then
#if [ -d "$ASSEMBLY_DIR2" ]; then
export SPARK_SCALA_VERSION="2.11"
else
export SPARK_SCALA_VERSION="2.10"
fi
#else
# export SPARK_SCALA_VERSION="2.12"
#fi
fi
6 changes: 3 additions & 3 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ install_mvn() {

# Install zinc under the build/ folder
install_zinc() {
local zinc_path="zinc-0.3.11/bin/zinc"
local zinc_path="zinc-0.3.15/bin/zinc"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and updated zinc along the way here

[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com}

install_app \
"${TYPESAFE_MIRROR}/zinc/0.3.11" \
"zinc-0.3.11.tgz" \
"${TYPESAFE_MIRROR}/zinc/0.3.15" \
"zinc-0.3.15.tgz" \
"${zinc_path}"
ZINC_BIN="${_DIR}/${zinc_path}"
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/Accumulable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ trait AccumulableParam[R, T] extends Serializable {

@deprecated("use AccumulatorV2", "2.0.0")
private[spark] class
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
GrowableAccumulableParam[R : ClassTag, T]
(implicit rg: R => Growable[T] with TraversableOnce[T] with Serializable)
extends AccumulableParam[R, T] {

def addAccumulator(growable: R, elem: T): R = {
Expand Down
15 changes: 4 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ class SparkContext(config: SparkConf) extends Logging {
// log out Spark Version in Spark driver log
logInfo(s"Running Spark version $SPARK_VERSION")

warnDeprecatedVersions()

/* ------------------------------------------------------------------------------------- *
| Private variables. These variables keep the internal state of the context, and are |
| not accessible by the outside world. They're mutable since we want to initialize all |
Expand Down Expand Up @@ -349,13 +347,6 @@ class SparkContext(config: SparkConf) extends Logging {
value
}

private def warnDeprecatedVersions(): Unit = {
val javaVersion = System.getProperty("java.version").split("[+.\\-]+", 3)
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.10"))) {
logWarning("Support for Scala 2.10 is deprecated as of Spark 2.1.0")
}
}

/** Control our logLevel. This overrides any user-defined log settings.
* @param logLevel The desired log level as a string.
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
Expand Down Expand Up @@ -1396,6 +1387,8 @@ class SparkContext(config: SparkConf) extends Logging {
@deprecated("use AccumulatorV2", "2.0.0")
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R): Accumulable[R, T] = {
// TODO the context bound (<%) above should be replaced with simple type bound and implicit
// conversion but is a breaking change. This should be fixed in Spark 3.x.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so context bound is not recommended in scala?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It generates a warning in scala 2.11, yeah

val param = new GrowableAccumulableParam[R, T]
val acc = new Accumulable(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
Expand Down Expand Up @@ -2605,9 +2598,9 @@ object SparkContext extends Logging {
*/
private[spark] val LEGACY_DRIVER_IDENTIFIER = "<driver>"

private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
private implicit def arrayToArrayWritable[T <: Writable : ClassTag](arr: Traversable[T])
: ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
def anyToWritable[U <: Writable](u: U): Writable = u

new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]],
arr.map(x => anyToWritable(x)).toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.rdd

import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag

import org.apache.hadoop.io.Writable
import org.apache.hadoop.io.compress.CompressionCodec
Expand All @@ -39,40 +39,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
extends Logging
with Serializable {

private val keyWritableClass =
if (_keyWritableClass == null) {
// pre 1.3.0, we need to use Reflection to get the Writable class
getWritableClass[K]()
} else {
_keyWritableClass
}

private val valueWritableClass =
if (_valueWritableClass == null) {
// pre 1.3.0, we need to use Reflection to get the Writable class
getWritableClass[V]()
} else {
_valueWritableClass
}

private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) {
classTag[T].runtimeClass
} else {
// We get the type of the Writable class by looking at the apply method which converts
// from T to Writable. Since we have two apply methods we filter out the one which
// is not of the form "java.lang.Object apply(java.lang.Object)"
implicitly[T => Writable].getClass.getDeclaredMethods().filter(
m => m.getReturnType().toString != "class java.lang.Object" &&
m.getName() == "apply")(0).getReturnType

}
// TODO: use something like WritableConverter to avoid reflection
}
c.asInstanceOf[Class[_ <: Writable]]
}

// TODO the context bound (<%) above should be replaced with simple type bound and implicit
// conversion but is a breaking change. This should be fixed in Spark 3.x.

/**
* Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
Expand All @@ -90,24 +58,24 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
// valueWritableClass at the compile time. To implement that, we need to add type parameters to
// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a
// breaking change.
val convertKey = self.keyClass != keyWritableClass
val convertValue = self.valueClass != valueWritableClass
val convertKey = self.keyClass != _keyWritableClass
val convertValue = self.valueClass != _valueWritableClass

logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +
valueWritableClass.getSimpleName + ")" )
logInfo("Saving as sequence file of type " +
s"(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName})" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
val jobConf = new JobConf(self.context.hadoopConfiguration)
if (!convertKey && !convertValue) {
self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
} else if (!convertKey && convertValue) {
self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
} else if (convertKey && !convertValue) {
self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
} else if (convertKey && convertValue) {
self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ private[spark] object RpcTimeout {
var foundProp: Option[(String, String)] = None
while (itr.hasNext && foundProp.isEmpty) {
val propKey = itr.next()
conf.getOption(propKey).foreach { prop => foundProp = Some(propKey, prop) }
conf.getOption(propKey).foreach { prop => foundProp = Some((propKey, prop)) }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of the changes are of this form. Whereas scalac will still accept two args to a method that accepts one Tuple2 arg, it's a warning in Scala 2.11 in some cases. I actually also enabled all warnings of this form.

}
val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue)
val finalProp = foundProp.getOrElse((timeoutPropList.head, defaultValue))
val timeout = { Utils.timeStringAsSeconds(finalProp._2).seconds }
new RpcTimeout(timeout, finalProp._1)
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] object JettyUtils extends Logging {
// implicit conversion from many types of functions to jetty Handlers.
type Responder[T] = HttpServletRequest => T

class ServletParams[T <% AnyRef](val responder: Responder[T],
class ServletParams[T <: AnyRef](val responder: Responder[T],
val contentType: String,
val extractFn: T => String = (in: Any) => in.toString) {}

Expand All @@ -68,7 +68,7 @@ private[spark] object JettyUtils extends Logging {
implicit def textResponderToServlet(responder: Responder[String]): ServletParams[String] =
new ServletParams(responder, "text/plain")

def createServlet[T <% AnyRef](
def createServlet[T <: AnyRef](
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These didn't need to be context bounds in the first place

servletParams: ServletParams[T],
securityMgr: SecurityManager,
conf: SparkConf): HttpServlet = {
Expand Down Expand Up @@ -113,7 +113,7 @@ private[spark] object JettyUtils extends Logging {
}

/** Create a context handler that responds to a request with the given path prefix */
def createServletHandler[T <% AnyRef](
def createServletHandler[T <: AnyRef](
path: String,
servletParams: ServletParams[T],
securityMgr: SecurityManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,16 @@ private[spark] object FileAppender extends Logging {
val validatedParams: Option[(Long, String)] = rollingInterval match {
case "daily" =>
logInfo(s"Rolling executor logs enabled for $file with daily rolling")
Some(24 * 60 * 60 * 1000L, "--yyyy-MM-dd")
Some((24 * 60 * 60 * 1000L, "--yyyy-MM-dd"))
case "hourly" =>
logInfo(s"Rolling executor logs enabled for $file with hourly rolling")
Some(60 * 60 * 1000L, "--yyyy-MM-dd--HH")
Some((60 * 60 * 1000L, "--yyyy-MM-dd--HH"))
case "minutely" =>
logInfo(s"Rolling executor logs enabled for $file with rolling every minute")
Some(60 * 1000L, "--yyyy-MM-dd--HH-mm")
Some((60 * 1000L, "--yyyy-MM-dd--HH-mm"))
case IntParam(seconds) =>
logInfo(s"Rolling executor logs enabled for $file with rolling $seconds seconds")
Some(seconds * 1000L, "--yyyy-MM-dd--HH-mm-ss")
Some((seconds * 1000L, "--yyyy-MM-dd--HH-mm-ss"))
case _ =>
logWarning(s"Illegal interval for rolling executor logs [$rollingInterval], " +
s"rolling logs not enabled")
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {

val normalFile = new File(normalDir, "part-00000")
val normalContent = sc.sequenceFile[String, String](normalDir).collect
assert(normalContent === Array.fill(100)("abc", "abc"))
assert(normalContent === Array.fill(100)(("abc", "abc")))

val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
assert(compressedContent === Array.fill(100)("abc", "abc"))
assert(compressedContent === Array.fill(100)(("abc", "abc")))

assert(compressedFile.length < normalFile.length)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {

test("add dependencies works correctly") {
val md = SparkSubmitUtils.getModuleDescriptor
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," +
"com.databricks:spark-avro_2.10:0.1")
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.11:0.1," +
"com.databricks:spark-avro_2.11:0.1")

SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default")
assert(md.getDependencies.length === 2)
Expand Down Expand Up @@ -196,7 +196,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(path === "", "should return empty path")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.11", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(
coordinates + "," + main.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
case Some(appId) =>
apps.remove(appId)
master.send(UnregisterApplication(appId))
case None =>
}
driverIdToAppId.remove(driverId)
}
Expand Down Expand Up @@ -575,7 +576,7 @@ class MasterSuite extends SparkFunSuite
override val rpcEnv: RpcEnv = master.rpcEnv

override def receive: PartialFunction[Any, Unit] = {
case KillExecutor(_, appId, execId) => killedExecutors.add(appId, execId)
case KillExecutor(_, appId, execId) => killedExecutors.add((appId, execId))
case KillDriver(driverId) => killedDrivers.add(driverId)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.collection.mutable.Map
import scala.concurrent.duration._
import scala.language.postfixOps

import org.mockito.ArgumentCaptor
import org.mockito.Matchers.{any, eq => meq}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.rdd

import scala.concurrent.duration._
import scala.language.postfixOps

import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = {
if (!hasDequeuedSpeculatedTask) {
hasDequeuedSpeculatedTask = true
Some(0, TaskLocality.PROCESS_LOCAL)
Some((0, TaskLocality.PROCESS_LOCAL))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2")
val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) =>
w(k) -> (v1.size, v2.size)
(w(k), (v1.size, v2.size))
}
d4.setName("A Cogroup")
d4.collectAsMap()
Expand Down
Loading