Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// If the file is currently not being tracked by the SHS, add an entry for it and try
// to parse it. This will allow the cleaner code to detect the file as stale later on
// if it was not possible to parse it.
listing.write(LogInfo(reader.rootPath.toString(), newLastScanTime, LogType.EventLogs,
listing.write(LogInfo(reader.rootPath.toString(), newLastScanTime, LogType.EVENT_LOGS,
None, None, reader.fileSizeForLastIndex, reader.lastIndex,
reader.completed))
reader.fileSizeForLastIndex > 0
Expand Down Expand Up @@ -744,7 +744,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// listing data is good.
invalidateUI(app.info.id, app.attempts.head.info.attemptId)
addListing(app)
listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, Some(app.info.id),
listing.write(LogInfo(logPath.toString(), scanTime, LogType.EVENT_LOGS, Some(app.info.id),
app.attempts.head.info.attemptId, reader.fileSizeForLastIndex,
reader.lastIndex, reader.completed))

Expand Down Expand Up @@ -777,7 +777,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// listing db, with an empty ID. This will make the log eligible for deletion if the app
// does not make progress after the configured max log age.
listing.write(
LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None,
LogInfo(logPath.toString(), scanTime, LogType.EVENT_LOGS, None, None,
reader.fileSizeForLastIndex, reader.lastIndex, reader.completed))
}
}
Expand Down Expand Up @@ -822,7 +822,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.reverse()
.first(maxTime)
.asScala
.filter { l => l.logType == null || l.logType == LogType.EventLogs }
.filter { l => l.logType == null || l.logType == LogType.EVENT_LOGS }
.toList
stale.foreach { log =>
if (log.appId.isEmpty) {
Expand Down Expand Up @@ -912,7 +912,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} catch {
case e: NoSuchElementException =>
// For every new driver log file discovered, create a new entry in listing
listing.write(LogInfo(f.getPath().toString(), currentTime, LogType.DriverLogs, None,
listing.write(LogInfo(f.getPath().toString(), currentTime, LogType.DRIVER_LOGS, None,
None, f.getLen(), None, false))
false
}
Expand All @@ -930,7 +930,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.reverse()
.first(maxTime)
.asScala
.filter { l => l.logType != null && l.logType == LogType.DriverLogs }
.filter { l => l.logType != null && l.logType == LogType.DRIVER_LOGS }
.toList
stale.foreach { log =>
logInfo(s"Deleting invalid driver log ${log.logPath}")
Expand Down Expand Up @@ -1152,7 +1152,7 @@ private[history] case class FsHistoryProviderMetadata(
logDir: String)

private[history] object LogType extends Enumeration {
val DriverLogs, EventLogs = Value
val DRIVER_LOGS, EVENT_LOGS = Value
}

/**
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,16 @@ import org.apache.spark.util.LongAccumulator
@DeveloperApi
object DataReadMethod extends Enumeration with Serializable {
type DataReadMethod = Value
val Memory, Disk, Hadoop, Network = Value
val MEMORY, DISK, HADOOP, NETWORK = Value

@deprecated("Use MEMORY instead.", "3.0.0")
val Memory = MEMORY
@deprecated("Use DISK instead.", "3.0.0")
val Disk = DISK
@deprecated("Use HADOOP instead.", "3.0.0")
val Hadoop = HADOOP
@deprecated("Use NETWORK instead.", "3.0.0")
val Network = NETWORK
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.apache.spark.util.LongAccumulator
@DeveloperApi
object DataWriteMethod extends Enumeration with Serializable {
type DataWriteMethod = Value
val Hadoop = Value
val HADOOP = Value
@deprecated("Use HADOOP instead.", "3.0.0")
val Hadoop = HADOOP
}


Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.Partition
*/
private[spark] object CheckpointState extends Enumeration {
type CheckpointState = Value
val Initialized, CheckpointingInProgress, Checkpointed = Value
val INITIALIZED, CHECKPOINTING_IN_PROGRESS, CHECKPOINTED = Value
}

/**
Expand All @@ -43,7 +43,7 @@ private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private
import CheckpointState._

// The checkpoint state of the associated RDD.
protected var cpState = Initialized
protected var cpState = INITIALIZED

// The RDD that contains our checkpointed data
private var cpRDD: Option[CheckpointRDD[T]] = None
Expand All @@ -54,7 +54,7 @@ private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private
* Return whether the checkpoint data for this RDD is already persisted.
*/
def isCheckpointed: Boolean = RDDCheckpointData.synchronized {
cpState == Checkpointed
cpState == CHECKPOINTED
}

/**
Expand All @@ -65,8 +65,8 @@ private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the state of this RDDCheckpointData
RDDCheckpointData.synchronized {
if (cpState == Initialized) {
cpState = CheckpointingInProgress
if (cpState == INITIALIZED) {
cpState = CHECKPOINTING_IN_PROGRESS
} else {
return
}
Expand All @@ -77,7 +77,7 @@ private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private
// Update our state and truncate the RDD lineage
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
cpState = CHECKPOINTED
rdd.markCheckpointed()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ private[spark] class BlockManager(
val ci = CompletionIterator[Any, Iterator[Any]](iter, {
releaseLock(blockId, taskContext)
})
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
Some(new BlockResult(ci, DataReadMethod.MEMORY, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) {
val diskData = diskStore.getBytes(blockId)
val iterToReturn: Iterator[Any] = {
Expand All @@ -769,7 +769,7 @@ private[spark] class BlockManager(
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
releaseLockAndDispose(blockId, diskData, taskContext)
})
Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
Some(new BlockResult(ci, DataReadMethod.DISK, info.size))
} else {
handleLocalReadFailure(blockId)
}
Expand Down Expand Up @@ -835,7 +835,7 @@ private[spark] class BlockManager(
getRemoteBlock(blockId, (data: ManagedBuffer) => {
val values =
serializerManager.dataDeserializeStream(blockId, data.createInputStream())(ct)
new BlockResult(values, DataReadMethod.Network, data.size)
new BlockResult(values, DataReadMethod.NETWORK, data.size)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// FileStatus.getLen is more than logInfo fileSize
var fileStatus = new FileStatus(200, false, 0, 0, 0, path)
when(mockedFs.getFileStatus(path)).thenReturn(fileStatus)
var logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"),
var logInfo = new LogInfo(path.toString, 0, LogType.EVENT_LOGS, Some("appId"),
Some("attemptId"), 100, None, false)
var reader = EventLogFileReader(mockedFs, path)
assert(reader.isDefined)
Expand All @@ -1176,14 +1176,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
fileStatus.setPath(path)
when(mockedFs.getFileStatus(path)).thenReturn(fileStatus)
// DFSInputStream.getFileLength is more than logInfo fileSize
logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"),
logInfo = new LogInfo(path.toString, 0, LogType.EVENT_LOGS, Some("appId"),
Some("attemptId"), 100, None, false)
reader = EventLogFileReader(mockedFs, path)
assert(reader.isDefined)
assert(mockedProvider.shouldReloadLog(logInfo, reader.get))

// DFSInputStream.getFileLength is equal to logInfo fileSize
logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"),
logInfo = new LogInfo(path.toString, 0, LogType.EVENT_LOGS, Some("appId"),
Some("attemptId"), 200, None, false)
reader = EventLogFileReader(mockedFs, path)
assert(reader.isDefined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,18 +545,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(list1Get.isDefined, "list1 expected to be in store")
assert(list1Get.get.data.size === 2)
assert(list1Get.get.bytes === list1SizeEstimate)
assert(list1Get.get.readMethod === DataReadMethod.Memory)
assert(list1Get.get.readMethod === DataReadMethod.MEMORY)
val list2MemoryGet = store.get("list2memory")
assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
assert(list2MemoryGet.get.data.size === 3)
assert(list2MemoryGet.get.bytes === list2SizeEstimate)
assert(list2MemoryGet.get.readMethod === DataReadMethod.Memory)
assert(list2MemoryGet.get.readMethod === DataReadMethod.MEMORY)
val list2DiskGet = store.get("list2disk")
assert(list2DiskGet.isDefined, "list2memory expected to be in store")
assert(list2DiskGet.get.data.size === 3)
// We don't know the exact size of the data on disk, but it should certainly be > 0.
assert(list2DiskGet.get.bytes > 0)
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
assert(list2DiskGet.get.readMethod === DataReadMethod.DISK)
}

test("optimize a location order of blocks without topology information") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object DecisionTreeRunner {

object ImpurityType extends Enumeration {
type ImpurityType = Value
val Gini, Entropy, Variance = Value
val GINI, ENTROPY, VARIANCE = Value
}

import ImpurityType._
Expand All @@ -54,9 +54,9 @@ object DecisionTreeRunner {
input: String = null,
testInput: String = "",
dataFormat: String = "libsvm",
algo: Algo = Classification,
algo: Algo = CLASSIFICATION,
maxDepth: Int = 5,
impurity: ImpurityType = Gini,
impurity: ImpurityType = GINI,
maxBins: Int = 32,
minInstancesPerNode: Int = 1,
minInfoGain: Double = 0.0,
Expand Down Expand Up @@ -135,10 +135,10 @@ object DecisionTreeRunner {
if (params.fracTest < 0 || params.fracTest > 1) {
failure(s"fracTest ${params.fracTest} value incorrect; should be in [0,1].")
} else {
if (params.algo == Classification &&
(params.impurity == Gini || params.impurity == Entropy)) {
if (params.algo == CLASSIFICATION &&
(params.impurity == GINI || params.impurity == ENTROPY)) {
success
} else if (params.algo == Regression && params.impurity == Variance) {
} else if (params.algo == REGRESSION && params.impurity == VARIANCE) {
success
} else {
failure(s"Algo ${params.algo} is not compatible with impurity ${params.impurity}.")
Expand Down Expand Up @@ -177,7 +177,7 @@ object DecisionTreeRunner {
}
// For classification, re-index classes if needed.
val (examples, classIndexMap, numClasses) = algo match {
case Classification =>
case CLASSIFICATION =>
// classCounts: class --> # examples in class
val classCounts = origExamples.map(_.label).countByValue()
val sortedClasses = classCounts.keys.toList.sorted
Expand Down Expand Up @@ -206,7 +206,7 @@ object DecisionTreeRunner {
println(s"$c\t$frac\t${classCounts(c)}")
}
(examples, classIndexMap, numClasses)
case Regression =>
case REGRESSION =>
(origExamples, null, 0)
case _ =>
throw new IllegalArgumentException(s"Algo $algo not supported.")
Expand All @@ -221,7 +221,7 @@ object DecisionTreeRunner {
case "libsvm" => MLUtils.loadLibSVMFile(sc, testInput, numFeatures)
}
algo match {
case Classification =>
case CLASSIFICATION =>
// classCounts: class --> # examples in class
val testExamples = {
if (classIndexMap.isEmpty) {
Expand All @@ -231,7 +231,7 @@ object DecisionTreeRunner {
}
}
Array(examples, testExamples)
case Regression =>
case REGRESSION =>
Array(examples, origTestExamples)
}
} else {
Expand Down Expand Up @@ -262,9 +262,9 @@ object DecisionTreeRunner {
params.testInput, params.algo, params.fracTest)

val impurityCalculator = params.impurity match {
case Gini => impurity.Gini
case Entropy => impurity.Entropy
case Variance => impurity.Variance
case GINI => impurity.Gini
case ENTROPY => impurity.Entropy
case VARIANCE => impurity.Variance
}

params.checkpointDir.foreach(sc.setCheckpointDir)
Expand All @@ -290,23 +290,23 @@ object DecisionTreeRunner {
} else {
println(model) // Print model summary.
}
if (params.algo == Classification) {
if (params.algo == CLASSIFICATION) {
val trainAccuracy =
new MulticlassMetrics(training.map(lp => (model.predict(lp.features), lp.label))).accuracy
println(s"Train accuracy = $trainAccuracy")
val testAccuracy =
new MulticlassMetrics(test.map(lp => (model.predict(lp.features), lp.label))).accuracy
println(s"Test accuracy = $testAccuracy")
}
if (params.algo == Regression) {
if (params.algo == REGRESSION) {
val trainMSE = meanSquaredError(model, training)
println(s"Train mean squared error = $trainMSE")
val testMSE = meanSquaredError(model, test)
println(s"Test mean squared error = $testMSE")
}
} else {
val randomSeed = Utils.random.nextInt()
if (params.algo == Classification) {
if (params.algo == CLASSIFICATION) {
val startTime = System.nanoTime()
val model = RandomForest.trainClassifier(training, strategy, params.numTrees,
params.featureSubsetStrategy, randomSeed)
Expand All @@ -324,7 +324,7 @@ object DecisionTreeRunner {
new MulticlassMetrics(test.map(lp => (model.predict(lp.features), lp.label))).accuracy
println(s"Test accuracy = $testAccuracy")
}
if (params.algo == Regression) {
if (params.algo == REGRESSION) {
val startTime = System.nanoTime()
val model = RandomForest.trainRegressor(training, strategy, params.numTrees,
params.featureSubsetStrategy, randomSeed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object DenseKMeans {

object InitializationMode extends Enumeration {
type InitializationMode = Value
val Random, Parallel = Value
val RANDOM, PARALLEL = Value
}

import InitializationMode._
Expand All @@ -45,7 +45,7 @@ object DenseKMeans {
input: String = null,
k: Int = -1,
numIterations: Int = 10,
initializationMode: InitializationMode = Parallel) extends AbstractParams[Params]
initializationMode: InitializationMode = PARALLEL) extends AbstractParams[Params]

def main(args: Array[String]): Unit = {
val defaultParams = Params()
Expand Down Expand Up @@ -90,8 +90,8 @@ object DenseKMeans {
println(s"numExamples = $numExamples.")

val initMode = params.initializationMode match {
case Random => KMeans.RANDOM
case Parallel => KMeans.K_MEANS_PARALLEL
case RANDOM => KMeans.RANDOM
case PARALLEL => KMeans.K_MEANS_PARALLEL
}

val model = new KMeans()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class DecisionTreeClassifier @Since("1.4.0") (
private[ml] def getOldStrategy(
categoricalFeatures: Map[Int, Int],
numClasses: Int): OldStrategy = {
super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity,
super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.CLASSIFICATION, getOldImpurity,
subsamplingRate = 1.0)
}

Expand Down Expand Up @@ -261,7 +261,7 @@ class DecisionTreeClassificationModel private[ml] (

/** Convert to spark.mllib DecisionTreeModel (losing some information) */
override private[spark] def toOld: OldDecisionTreeModel = {
new OldDecisionTreeModel(rootNode.toOld(1), OldAlgo.Classification)
new OldDecisionTreeModel(rootNode.toOld(1), OldAlgo.CLASSIFICATION)
}

@Since("2.0.0")
Expand Down Expand Up @@ -318,7 +318,7 @@ object DecisionTreeClassificationModel extends MLReadable[DecisionTreeClassifica
parent: DecisionTreeClassifier,
categoricalFeatures: Map[Int, Int],
numFeatures: Int = -1): DecisionTreeClassificationModel = {
require(oldModel.algo == OldAlgo.Classification,
require(oldModel.algo == OldAlgo.CLASSIFICATION,
s"Cannot convert non-classification DecisionTreeModel (old API) to" +
s" DecisionTreeClassificationModel (new API). Algo is: ${oldModel.algo}")
val rootNode = Node.fromOld(oldModel.topNode, categoricalFeatures)
Expand Down
Loading