Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into apache-…
Browse files Browse the repository at this point in the history
…master
  • Loading branch information
Asif Shahid committed Nov 24, 2020
2 parents 8f6d86c + 048a982 commit 00553cb
Show file tree
Hide file tree
Showing 53 changed files with 376 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public void onFailure(Throwable t) {
* @param blockIds block ids to be pushed
* @param buffers buffers to be pushed
* @param listener the listener to receive block push status.
*
* @since 3.1.0
*/
public void pushBlocks(
String host,
Expand All @@ -156,4 +158,24 @@ public void pushBlocks(
BlockFetchingListener listener) {
throw new UnsupportedOperationException();
}

/**
* Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
* for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
* finishing the shuffle merge process associated with the shuffle mapper stage.
*
* @param host host of shuffle server
* @param port port of shuffle server.
* @param shuffleId shuffle ID of the shuffle to be finalized
* @param listener the listener to receive MergeStatuses
*
* @since 3.1.0
*/
public void finalizeShuffleMerge(
String host,
int port,
int shuffleId,
MergeFinalizerListener listener) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,35 @@ public void pushBlocks(
}
}

@Override
public void finalizeShuffleMerge(
String host,
int port,
int shuffleId,
MergeFinalizerListener listener) {
checkInit();
try {
TransportClient client = clientFactory.createClient(host, port);
ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
listener.onShuffleMergeSuccess(
(MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
}

@Override
public void onFailure(Throwable e) {
listener.onShuffleMergeFailure(e);
}
});
} catch (Exception e) {
logger.error("Exception while sending finalizeShuffleMerge request to {}:{}",
host, port, e);
listener.onShuffleMergeFailure(e);
}
}

@Override
public MetricSet shuffleMetrics() {
checkInit();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.util.EventListener;

import org.apache.spark.network.shuffle.protocol.MergeStatuses;

/**
* :: DeveloperApi ::
*
* Listener providing a callback function to invoke when driver receives the response for the
* finalize shuffle merge request sent to remote shuffle service.
*
* @since 3.1.0
*/
public interface MergeFinalizerListener extends EventListener {
/**
* Called once upon successful response on finalize shuffle merge on a remote shuffle service.
* The returned {@link MergeStatuses} is passed to the listener for further processing
*/
void onShuffleMergeSuccess(MergeStatuses statuses);

/**
* Called once upon failure response on finalize shuffle merge on a remote shuffle service.
*/
void onShuffleMergeFailure(Throwable e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private[spark] class StorageStatus(
.getOrElse((0L, 0L))
case _ if !level.useOffHeap =>
(_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage)
case _ if level.useOffHeap =>
case _ =>
(_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage)
}
val newMem = math.max(oldMem + changeInMem, 0L)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -757,15 +757,15 @@ private[spark] object JsonProtocol {

def taskResourceRequestMapFromJson(json: JValue): Map[String, TaskResourceRequest] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
jsonFields.collect { case JField(k, v) =>
val req = taskResourceRequestFromJson(v)
(k, req)
}.toMap
}

def executorResourceRequestMapFromJson(json: JValue): Map[String, ExecutorResourceRequest] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
jsonFields.collect { case JField(k, v) =>
val req = executorResourceRequestFromJson(v)
(k, req)
}.toMap
Expand Down Expand Up @@ -1229,7 +1229,7 @@ private[spark] object JsonProtocol {

def resourcesMapFromJson(json: JValue): Map[String, ResourceInformation] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
jsonFields.collect { case JField(k, v) =>
val resourceInfo = ResourceInformation.parseJson(v)
(k, resourceInfo)
}.toMap
Expand All @@ -1241,7 +1241,7 @@ private[spark] object JsonProtocol {

def mapFromJson(json: JValue): Map[String, String] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap
jsonFields.collect { case JField(k, JString(v)) => (k, v) }.toMap
}

def propertiesFromJson(json: JValue): Properties = {
Expand Down
1 change: 1 addition & 0 deletions dev/run-tests-jenkins
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ FWDIR="$( cd "$( dirname "$0" )/.." && pwd )"
cd "$FWDIR"

export PATH=/home/anaconda/envs/py36/bin:$PATH
export LANG="en_US.UTF-8"

PYTHON_VERSION_CHECK=$(python3 -c 'import sys; print(sys.version_info < (3, 6, 0))')
if [[ "$PYTHON_VERSION_CHECK" == "True" ]]; then
Expand Down
13 changes: 9 additions & 4 deletions docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,20 @@ SCALA_VERSION: "2.12.10"
MESOS_VERSION: 1.0.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
# Before a new release, we should apply a new `apiKey` for the new Spark documentation
# on https://docsearch.algolia.com/. Otherwise, after release, the search results are always based
# on the latest documentation(https://spark.apache.org/docs/latest/) even when visiting the
# documentation of previous releases.
# Before a new release, we should:
# 1. update the `version` array for the new Spark documentation
# on https://github.com/algolia/docsearch-configs/blob/master/configs/apache_spark.json.
# 2. update the value of `facetFilters.version` in `algoliaOptions` on the new release branch.
# Otherwise, after release, the search results are always based on the latest documentation
# (https://spark.apache.org/docs/latest/) even when visiting the documentation of previous releases.
DOCSEARCH_SCRIPT: |
docsearch({
apiKey: 'b18ca3732c502995563043aa17bc6ecb',
indexName: 'apache_spark',
inputSelector: '#docsearch-input',
enhancedSearchInput: true,
algoliaOptions: {
'facetFilters': ["version:latest"]
},
debug: false // Set debug to true if you want to inspect the dropdown
});
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ private[spark] object BLAS extends Serializable {
j += 1
prevCol = col
}
case _ =>
throw new IllegalArgumentException(s"spr doesn't support vector type ${v.getClass}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ private[ml] object RFormulaParser extends RegexParsers {

private val pow: Parser[Term] = term ~ "^" ~ "^[1-9]\\d*".r ^^ {
case base ~ "^" ~ degree => power(base, degree.toInt)
case t => throw new IllegalArgumentException(s"Invalid term: $t")
} | term

private val interaction: Parser[Term] = pow * (":" ^^^ { interact _ })
Expand All @@ -298,7 +299,10 @@ private[ml] object RFormulaParser extends RegexParsers {
private val expr = (sum | term)

private val formula: Parser[ParsedRFormula] =
(label ~ "~" ~ expr) ^^ { case r ~ "~" ~ t => ParsedRFormula(r, t.asTerms.terms) }
(label ~ "~" ~ expr) ^^ {
case r ~ "~" ~ t => ParsedRFormula(r, t.asTerms.terms)
case t => throw new IllegalArgumentException(s"Invalid term: $t")
}

def parse(value: String): ParsedRFormula = parseAll(formula, value) match {
case Success(result, _) => result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ object StandardScalerModel extends MLReadable[StandardScalerModel] {
case SparseVector(size, indices, values) =>
val newValues = transformSparseWithScale(scale, indices, values.clone())
Vectors.sparse(size, indices, newValues)
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}

case (false, false) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ private[ml] object JsonMatrixConverter {
("values" -> values.toSeq) ~
("isTransposed" -> isTransposed)
compact(render(jValue))
case _ =>
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ private[ml] object JsonVectorConverter {
case DenseVector(values) =>
val jValue = ("type" -> 1) ~ ("values" -> values.toSeq)
compact(render(jValue))
case _ =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
row.setNullAt(2)
row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
row
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ private[ml] class BlockHingeAggregator(
case sm: SparseMatrix if !fitIntercept =>
val gradSumVec = new DenseVector(gradientSumArray)
BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)

case m =>
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
}

if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,9 @@ private[ml] class BlockLogisticAggregator(
case sm: SparseMatrix if !fitIntercept =>
val gradSumVec = new DenseVector(gradientSumArray)
BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)

case m =>
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
}

if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ private[spark] object Instrumentation {
case Failure(NonFatal(e)) =>
instr.logFailure(e)
throw e
case Failure(e) =>
throw e
case Success(result) =>
instr.logSuccess()
result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class StandardScalerModel @Since("1.3.0") (
val newValues = NewStandardScalerModel
.transformSparseWithScale(localScale, indices, values.clone())
Vectors.sparse(size, indices, newValues)
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}

case _ => vector
Expand Down
2 changes: 2 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ private[spark] object BLAS extends Serializable with Logging {
j += 1
prevCol = col
}
case _ =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ class VectorUDT extends UserDefinedType[Vector] {
row.setNullAt(2)
row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
row
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class IndexedRowMatrix @Since("1.0.0") (
.map { case (values, blockColumn) =>
((blockRow.toInt, blockColumn), (rowInBlock.toInt, values.zipWithIndex))
}
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}.groupByKey(GridPartitioner(numRowBlocks, numColBlocks, rows.getNumPartitions)).map {
case ((blockRow, blockColumn), itr) =>
Expand Down Expand Up @@ -187,6 +189,8 @@ class IndexedRowMatrix @Since("1.0.0") (
Iterator.tabulate(indices.length)(i => MatrixEntry(rowIndex, indices(i), values(i)))
case DenseVector(values) =>
Iterator.tabulate(values.length)(i => MatrixEntry(rowIndex, i, values(i)))
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}
new CoordinateMatrix(entries, numRows(), numCols())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,8 @@ class RowMatrix @Since("1.0.0") (
}
buf
}.flatten
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}
}.reduceByKey(_ + _).map { case ((i, j), sim) =>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3264,7 +3264,7 @@
<profile>
<id>scala-2.13</id>
<properties>
<scala.version>2.13.3</scala.version>
<scala.version>2.13.4</scala.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ trait MesosSchedulerUtils extends Logging {
// offer has the required attribute and subsumes the required values for that attribute
case (name, requiredValues) =>
offerAttributes.get(name) match {
case None => false
case Some(_) if requiredValues.isEmpty => true // empty value matches presence
case Some(scalarValue: Value.Scalar) =>
// check if provided values is less than equal to the offered values
Expand All @@ -332,6 +331,7 @@ trait MesosSchedulerUtils extends Logging {
// check if the specified value is equal, if multiple values are specified
// we succeed if any of them match.
requiredValues.contains(textValue.getValue)
case _ => false
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class MesosFineGrainedSchedulerBackendSuite
val (execInfo, _) = backend.createExecutorInfo(
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
assert(execInfo.getContainer.getDocker.getForcePullImage)
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
assert(portmaps.get(0).getContainerPort.equals(8080))
Expand Down
Loading

0 comments on commit 00553cb

Please sign in to comment.