Skip to content

Commit

Permalink
Fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed Feb 26, 2020
2 parents 71810f6 + e9fd522 commit c67da83
Show file tree
Hide file tree
Showing 74 changed files with 787 additions and 876 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) {
.attr("cy", function(d) { return y(d.y); })
.attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "3";})
.on('mouseover', function(d) {
var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x];
var tip = formatYValue(d.y) + " " + unitY + " at " + timeTipStrings[d.x];
showBootstrapTooltip(d3.select(this).node(), tip);
// show the point
d3.select(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ function drawAreaStack(id, labels, values, minX, maxX, minY, maxY) {
.on('mouseover', function(d) {
var tip = '';
var idx = 0;
var _values = timeToValues[d._x]
var _values = formattedTimeToValues[d._x];
_values.forEach(function (k) {
tip += labels[idx] + ': ' + k + ' ';
idx += 1;
});
tip += " at " + d._x
tip += " at " + formattedTimeTipStrings[d._x];
showBootstrapTooltip(d3.select(this).node(), tip);
})
.on('mouseout', function() {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2798,7 +2798,7 @@ object SparkContext extends Logging {
defaultProf.maxTasksPerExecutor(sc.conf) < cpuSlots) {
throw new IllegalArgumentException("The number of slots on an executor has to be " +
"limited by the number of cores, otherwise you waste resources and " +
"dynamic allocation doesn't work properly. Your configuration has " +
"some scheduling doesn't work properly. Your configuration has " +
s"core/task cpu slots = ${cpuSlots} and " +
s"${limitingResource} = " +
s"${defaultProf.maxTasksPerExecutor(sc.conf)}. Please adjust your configuration " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ package object config {
.createOptional

private[spark] val RESOURCES_DISCOVERY_PLUGIN =
ConfigBuilder("spark.resources.discovery.plugin")
ConfigBuilder("spark.resources.discoveryPlugin")
.doc("Comma-separated list of class names implementing" +
"org.apache.spark.api.resource.ResourceDiscoveryPlugin to load into the application." +
"This is for advanced users to replace the resource discovery class with a " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.util.Optional

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.resource.ResourceDiscoveryPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils.executeAndGetOutput
Expand All @@ -32,6 +33,7 @@ import org.apache.spark.util.Utils.executeAndGetOutput
* If the user specifies custom plugins, this is the last one to be executed and
* throws if the resource isn't discovered.
*/
@DeveloperApi
class ResourceDiscoveryScriptPlugin extends ResourceDiscoveryPlugin with Logging {
override def discoverResource(
request: ResourceRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class ResourceProfile(
// limiting resource because the scheduler code uses that for slots
throw new IllegalArgumentException("The number of slots on an executor has to be " +
"limited by the number of cores, otherwise you waste resources and " +
"dynamic allocation doesn't work properly. Your configuration has " +
"some scheduling doesn't work properly. Your configuration has " +
s"core/task cpu slots = ${taskLimit} and " +
s"${execReq.resourceName} = ${numTasks}. " +
"Please adjust your configuration so that all resources require same number " +
Expand All @@ -183,12 +183,11 @@ class ResourceProfile(
"no corresponding task resource request was specified.")
}
}
if(!shouldCheckExecCores && Utils.isDynamicAllocationEnabled(sparkConf)) {
if(!shouldCheckExecCores && execResourceToCheck.nonEmpty) {
// if we can't rely on the executor cores config throw a warning for user
logWarning("Please ensure that the number of slots available on your " +
"executors is limited by the number of cores to task cpus and not another " +
"custom resource. If cores is not the limiting resource then dynamic " +
"allocation will not work properly!")
"custom resource.")
}
if (taskResourcesToCheck.nonEmpty) {
throw new SparkException("No executor resource configs were not specified for the " +
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ of the most common options to set are:
</td>
</tr>
<tr>
<td><code>spark.resources.discovery.plugin</code></td>
<td><code>spark.resources.discoveryPlugin</code></td>
<td>org.apache.spark.resource.ResourceDiscoveryScriptPlugin</td>
<td>
Comma-separated list of class names implementing
Expand Down
57 changes: 44 additions & 13 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,48 @@ The history server can be configured as follows:
</tr>
</table>

### Applying compaction on rolling event log files

A long-running application (e.g. streaming) can bring a huge single event log file which may cost a lot to maintain and
also requires a bunch of resource to replay per each update in Spark History Server.

Enabling <code>spark.eventLog.rolling.enabled</code> and <code>spark.eventLog.rolling.maxFileSize</code> would
let you have rolling event log files instead of single huge event log file which may help some scenarios on its own,
but it still doesn't help you reducing the overall size of logs.

Spark History Server can apply compaction on the rolling event log files to reduce the overall size of
logs, via setting the configuration <code>spark.history.fs.eventLog.rolling.maxFilesToRetain</code> on the
Spark History Server.

Details will be described below, but please note in prior that compaction is LOSSY operation.
Compaction will discard some events which will be no longer seen on UI - you may want to check which events will be discarded
before enabling the option.

When the compaction happens, the History Server lists all the available event log files for the application, and considers
the event log files having less index than the file with smallest index which will be retained as target of compaction.
For example, if the application A has 5 event log files and <code>spark.history.fs.eventLog.rolling.maxFilesToRetain</code> is set to 2, then first 3 log files will be selected to be compacted.

Once it selects the target, it analyzes them to figure out which events can be excluded, and rewrites them
into one compact file with discarding events which are decided to exclude.

The compaction tries to exclude the events which point to the outdated data. As of now, below describes the candidates of events to be excluded:

* Events for the job which is finished, and related stage/tasks events
* Events for the executor which is terminated
* Events for the SQL execution which is finished, and related job/stage/tasks events

Once rewriting is done, original log files will be deleted, via best-effort manner. The History Server may not be able to delete
the original log files, but it will not affect the operation of the History Server.

Please note that Spark History Server may not compact the old event log files if figures out not a lot of space
would be reduced during compaction. For streaming query we normally expect compaction
will run as each micro-batch will trigger one or more jobs which will be finished shortly, but compaction won't run
in many cases for batch query.

Please also note that this is a new feature introduced in Spark 3.0, and may not be completely stable. Under some circumstances,
the compaction may exclude more events than you expect, leading some UI issues on History Server for the application.
Use it with caution.

### Spark History Server Configuration Options

Security options for the Spark History Server are covered more detail in the
Expand Down Expand Up @@ -303,19 +345,8 @@ Security options for the Spark History Server are covered more detail in the
<td>Int.MaxValue</td>
<td>
The maximum number of event log files which will be retained as non-compacted. By default,
all event log files will be retained.<br/>
Please note that compaction will happen in Spark History Server, which means this configuration
should be set to the configuration of Spark History server, and the same value will be applied
across applications which are being loaded in Spark History Server. This also means compaction
and cleanup would require running Spark History Server.<br/>
Please set the configuration in Spark History Server, and <code>spark.eventLog.rolling.maxFileSize</code>
in each application accordingly if you want to control the overall size of event log files.
The event log files older than these retained files will be compacted into single file and
deleted afterwards.<br/>
NOTE: Spark History Server may not compact the old event log files if it figures
out not a lot of space would be reduced during compaction. For streaming query
(including Structured Streaming) we normally expect compaction will run, but for
batch query compaction won't run in many cases.
all event log files will be retained. The lowest value is 1 for technical reason.<br/>
Please read the section of "Applying compaction of old event log files" for more details.
</td>
</tr>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,6 @@ private[spark] object BLAS extends Serializable {

val xTemp = xValues(k) * alpha
while (i < indEnd) {
val rowIndex = Arows(i)
yValues(Arows(i)) += Avals(i) * xTemp
i += 1
}
Expand Down Expand Up @@ -734,8 +733,7 @@ private[spark] object BLAS extends Serializable {
val indEnd = Acols(colCounterForA + 1)
val xVal = xValues(colCounterForA) * alpha
while (i < indEnd) {
val rowIndex = Arows(i)
yValues(rowIndex) += Avals(i) * xVal
yValues(Arows(i)) += Avals(i) * xVal
i += 1
}
colCounterForA += 1
Expand Down
4 changes: 2 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class Pipeline @Since("1.4.0") (
val theStages = $(stages)
// Search for the last estimator.
var indexOfLastEstimator = -1
theStages.view.zipWithIndex.foreach { case (stage, index) =>
theStages.iterator.zipWithIndex.foreach { case (stage, index) =>
stage match {
case _: Estimator[_] =>
indexOfLastEstimator = index
Expand All @@ -148,7 +148,7 @@ class Pipeline @Since("1.4.0") (
}
var curDataset = dataset
val transformers = ListBuffer.empty[Transformer]
theStages.view.zipWithIndex.foreach { case (stage, index) =>
theStages.iterator.zipWithIndex.foreach { case (stage, index) =>
if (index <= indexOfLastEstimator) {
val transformer = stage match {
case estimator: Estimator[_] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,12 @@ class AttributeGroup private (
/**
* Optional array of attributes. At most one of `numAttributes` and `attributes` can be defined.
*/
val attributes: Option[Array[Attribute]] = attrs.map(_.view.zipWithIndex.map { case (attr, i) =>
attr.withIndex(i)
}.toArray)
val attributes: Option[Array[Attribute]] = attrs.map(_.iterator.zipWithIndex
.map { case (attr, i) => attr.withIndex(i) }.toArray)

private lazy val nameToIndex: Map[String, Int] = {
attributes.map(_.view.flatMap { attr =>
attr.name.map(_ -> attr.index.get)
}.toMap).getOrElse(Map.empty)
attributes.map(_.iterator.flatMap { attr => attr.name.map(_ -> attr.index.get)}.toMap)
.getOrElse(Map.empty)
}

/** Size of the attribute group. Returns -1 if the size is unknown. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.mllib.linalg.{Vector => OldVector}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions.col
import org.apache.spark.storage.StorageLevel

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.ml.classification

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Since
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class RandomForestClassificationModel private[ml] (
// Classifies using majority votes.
// Ignore the tree weights since all are 1.0 for now.
val votes = Array.ofDim[Double](numClasses)
_trees.view.foreach { tree =>
_trees.foreach { tree =>
val classCounts = tree.rootNode.predictImpl(features).impurityStats.stats
val total = classCounts.sum
if (total != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.annotation.Since
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, MetadataUtils, SchemaUtils}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -104,7 +104,9 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va
SchemaUtils.checkNumericType(schema, $(weightCol))
}

// TODO: When dataset metadata has been implemented, check rawPredictionCol vector length = 2.
MetadataUtils.getNumFeatures(schema($(rawPredictionCol)))
.foreach(n => require(n == 2, s"rawPredictionCol vectors must have length=2, but got $n"))

val scoreAndLabelsWithWeights =
dataset.select(
col($(rawPredictionCol)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.ml.evaluation
import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors}
import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
import org.apache.spark.ml.util._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ object MinMaxScaler extends DefaultParamsReadable[MinMaxScaler] {
*
* @param originalMin min value for each original column during fitting
* @param originalMax max value for each original column during fitting
*
* TODO: The transformer does not yet set the metadata in the output column (SPARK-8529).
*/
@Since("1.5.0")
class MinMaxScalerModel private[ml] (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.regression._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
.join(userFactors)
.mapPartitions({ items =>
items.flatMap { case (_, (ids, factors)) =>
ids.view.zip(factors)
ids.iterator.zip(factors.iterator)
}
// Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks
// and userFactors.
Expand All @@ -1061,7 +1061,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
.join(itemFactors)
.mapPartitions({ items =>
items.flatMap { case (_, (ids, factors)) =>
ids.view.zip(factors)
ids.iterator.zip(factors.iterator)
}
}, preservesPartitioning = true)
.setName("itemFactors")
Expand Down Expand Up @@ -1376,7 +1376,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
Iterator.empty
}
} ++ {
builders.view.zipWithIndex.filter(_._1.size > 0).map { case (block, idx) =>
builders.iterator.zipWithIndex.filter(_._1.size > 0).map { case (block, idx) =>
val srcBlockId = idx % srcPart.numPartitions
val dstBlockId = idx / srcPart.numPartitions
((srcBlockId, dstBlockId), block.build())
Expand Down Expand Up @@ -1695,7 +1695,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
val YtY = if (implicitPrefs) Some(computeYtY(srcFactorBlocks, rank)) else None
val srcOut = srcOutBlocks.join(srcFactorBlocks).flatMap {
case (srcBlockId, (srcOutBlock, srcFactors)) =>
srcOutBlock.view.zipWithIndex.map { case (activeIndices, dstBlockId) =>
srcOutBlock.iterator.zipWithIndex.map { case (activeIndices, dstBlockId) =>
(dstBlockId, (srcBlockId, activeIndices.map(idx => srcFactors(idx))))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.apache.spark.mllib.optimization.{Gradient, GradientDescent, SquaredL2
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions.col
import org.apache.spark.storage.StorageLevel

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import org.apache.spark.ml.image.ImageSchema
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeRow}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ private[spark] object RandomForest extends Logging with Serializable {
}

val validFeatureSplits =
Range(0, binAggregates.metadata.numFeaturesPerNode).view.map { featureIndexIdx =>
Iterator.range(0, binAggregates.metadata.numFeaturesPerNode).map { featureIndexIdx =>
featuresForNode.map(features => (featureIndexIdx, features(featureIndexIdx)))
.getOrElse((featureIndexIdx, featureIndexIdx))
}.withFilter { case (_, featureIndex) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class StreamingKMeansModel @Since("1.2.0") (
val discount = timeUnit match {
case StreamingKMeans.BATCHES => decayFactor
case StreamingKMeans.POINTS =>
val numNewPoints = pointStats.view.map { case (_, (_, n)) =>
val numNewPoints = pointStats.iterator.map { case (_, (_, n)) =>
n
}.sum
math.pow(decayFactor, numNewPoints)
Expand Down Expand Up @@ -125,9 +125,8 @@ class StreamingKMeansModel @Since("1.2.0") (
}

// Check whether the smallest cluster is dying. If so, split the largest cluster.
val weightsWithIndex = clusterWeights.view.zipWithIndex
val (maxWeight, largest) = weightsWithIndex.maxBy(_._1)
val (minWeight, smallest) = weightsWithIndex.minBy(_._1)
val (maxWeight, largest) = clusterWeights.iterator.zipWithIndex.maxBy(_._1)
val (minWeight, smallest) = clusterWeights.iterator.zipWithIndex.minBy(_._1)
if (minWeight < 1e-8 * maxWeight) {
logInfo(s"Cluster $smallest is dying. Split the largest cluster $largest into two.")
val weight = (maxWeight + minWeight) / 2.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ class ChiSqSelectorModel @Since("1.3.0") (
}
}

private[spark] def compressSparse(indices: Array[Int],
values: Array[Double]): (Array[Int], Array[Double]) = {
private[spark] def compressSparse(
indices: Array[Int],
values: Array[Double]): (Array[Int], Array[Double]) = {
val newValues = new ArrayBuilder.ofDouble
val newIndices = new ArrayBuilder.ofInt
var i = 0
Expand Down

0 comments on commit c67da83

Please sign in to comment.