Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into SPARK-34187
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 24, 2021
2 parents ce9d1ea + 4fce05d commit 8716a16
Show file tree
Hide file tree
Showing 921 changed files with 64,398 additions and 55,897 deletions.
2 changes: 1 addition & 1 deletion .asf.yaml
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# https://cwiki.apache.org/confluence/display/INFRA/.asf.yaml+features+for+git+repositories
# https://cwiki.apache.org/confluence/display/INFRA/git+-+.asf.yaml+features
---
github:
description: "Apache Spark - A unified analytics engine for large-scale data processing"
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Expand Up @@ -59,7 +59,7 @@ Collate:
'types.R'
'utils.R'
'window.R'
RoxygenNote: 5.0.1
RoxygenNote: 7.1.1
VignetteBuilder: knitr
NeedsCompilation: no
Encoding: UTF-8
8 changes: 4 additions & 4 deletions R/pkg/R/functions.R
Expand Up @@ -247,7 +247,7 @@ NULL
#' used to transform the data. The first argument is the key, the second argument
#' is the value.
#' }
#' @param zero a \code{Column} used as the initial value in \code{array_aggregate}
#' @param initialValue a \code{Column} used as the initial value in \code{array_aggregate}
#' @param merge a \code{function} a binary function \code{(Column, Column) -> Column}
#' used in \code{array_aggregate}to merge values (the second argument)
#' into accumulator (the first argument).
Expand Down Expand Up @@ -3666,11 +3666,11 @@ invoke_higher_order_function <- function(name, cols, funs) {
#' @aliases array_aggregate array_aggregate,characterOrColumn,Column,function-method
#' @note array_aggregate since 3.1.0
setMethod("array_aggregate",
signature(x = "characterOrColumn", zero = "Column", merge = "function"),
function(x, zero, merge, finish = NULL) {
signature(x = "characterOrColumn", initialValue = "Column", merge = "function"),
function(x, initialValue, merge, finish = NULL) {
invoke_higher_order_function(
"ArrayAggregate",
cols = list(x, zero),
cols = list(x, initialValue),
funs = if (is.null(finish)) {
list(merge)
} else {
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/generics.R
Expand Up @@ -780,7 +780,8 @@ setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCoun

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_aggregate", function(x, zero, merge, ...) { standardGeneric("array_aggregate") })
setGeneric("array_aggregate",
function(x, initialValue, merge, ...) { standardGeneric("array_aggregate") })

#' @rdname column_collection_functions
#' @name NULL
Expand Down
8 changes: 7 additions & 1 deletion common/network-yarn/pom.xml
Expand Up @@ -65,7 +65,13 @@
<!-- Provided dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<artifactId>${hadoop-client-api.artifact}</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>${hadoop-client-runtime.artifact}</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Expand Up @@ -683,6 +683,16 @@ public UTF8String trimRight() {
return copyUTF8String(0, e);
}

/**
* Trims at most `numSpaces` space characters (ASCII 32) from the end of this string.
*/
public UTF8String trimTrailingSpaces(int numSpaces) {
int endIdx = numBytes - 1;
int trimTo = numBytes - numSpaces;
while (endIdx >= trimTo && getByte(endIdx) == 0x20) endIdx--;
return copyUTF8String(0, endIdx);
}

/**
* Trims instances of the given trim string from the end of this string.
*
Expand Down Expand Up @@ -1065,16 +1075,20 @@ public UTF8String replace(UTF8String search, UTF8String replace) {
return buf.build();
}

// TODO: Need to use `Code Point` here instead of Char in case the character longer than 2 bytes
public UTF8String translate(Map<Character, Character> dict) {
public UTF8String translate(Map<String, String> dict) {
String srcStr = this.toString();

StringBuilder sb = new StringBuilder();
for(int k = 0; k< srcStr.length(); k++) {
if (null == dict.get(srcStr.charAt(k))) {
sb.append(srcStr.charAt(k));
} else if ('\0' != dict.get(srcStr.charAt(k))){
sb.append(dict.get(srcStr.charAt(k)));
int charCount = 0;
for (int k = 0; k < srcStr.length(); k += charCount) {
int codePoint = srcStr.codePointAt(k);
charCount = Character.charCount(codePoint);
String subStr = srcStr.substring(k, k + charCount);
String translated = dict.get(subStr);
if (null == translated) {
sb.append(subStr);
} else if (!"\0".equals(translated)) {
sb.append(translated);
}
}
return fromString(sb.toString());
Expand Down
Expand Up @@ -465,27 +465,27 @@ public void translate() {
assertEquals(
fromString("1a2s3ae"),
fromString("translate").translate(ImmutableMap.of(
'r', '1',
'n', '2',
'l', '3',
't', '\0'
"r", "1",
"n", "2",
"l", "3",
"t", "\0"
)));
assertEquals(
fromString("translate"),
fromString("translate").translate(new HashMap<>()));
assertEquals(
fromString("asae"),
fromString("translate").translate(ImmutableMap.of(
'r', '\0',
'n', '\0',
'l', '\0',
't', '\0'
"r", "\0",
"n", "\0",
"l", "\0",
"t", "\0"
)));
assertEquals(
fromString("aa世b"),
fromString("花花世界").translate(ImmutableMap.of(
'花', 'a',
'界', 'b'
"花", "a",
"界", "b"
)));
}

Expand Down
25 changes: 19 additions & 6 deletions core/pom.xml
Expand Up @@ -35,18 +35,13 @@
</properties>

<dependencies>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<classifier>${avro.mapred.classifier}</classifier>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand All @@ -66,7 +61,13 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<artifactId>${hadoop-client-api.artifact}</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>${hadoop-client-runtime.artifact}</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -165,6 +166,10 @@
<artifactId>jakarta.servlet-api</artifactId>
<version>${jakartaservlet.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand All @@ -177,6 +182,14 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
Expand Up @@ -20,7 +20,7 @@
import java.util.Comparator;
import java.util.LinkedList;

import org.apache.avro.reflect.Nullable;
import javax.annotation.Nullable;

import org.apache.spark.TaskContext;
import org.apache.spark.memory.MemoryConsumer;
Expand Down
Expand Up @@ -75,26 +75,6 @@
</th>
</thead>
<tbody>
{{#applications}}
<tr>
{{#attempts}}
<td {{#hasMultipleAttempts}}style="background-color:#fff"{{/hasMultipleAttempts}}>{{version}}</td>
<td {{#hasMultipleAttempts}}style="background-color:#fff"{{/hasMultipleAttempts}}><span title="{{id}}"><a href="{{uiroot}}/history/{{id}}{{#hasAttemptId}}/{{attemptId}}{{/hasAttemptId}}/jobs/">{{id}}</a></span></td>
<td {{#hasMultipleAttempts}}style="background-color:#fff"{{/hasMultipleAttempts}}>{{name}}</td>
{{#hasMultipleAttempts}}
<td><a href="{{uiroot}}/history/{{id}}{{#hasAttemptId}}/{{attemptId}}{{/hasAttemptId}}/jobs/">{{attemptId}}</a></td>
{{/hasMultipleAttempts}}
<td>{{startTime}}</td>
{{#showCompletedColumns}}
<td>{{endTime}}</td>
<td><span title="{{durationMillisec}}">{{duration}}</span></td>
{{/showCompletedColumns}}
<td>{{sparkUser}}</td>
<td>{{lastUpdated}}</td>
<td><a href="{{log}}" class="btn btn-info btn-mini">Download</a></td>
{{/attempts}}
</tr>
{{/applications}}
</tbody>
</table>
</script>
54 changes: 41 additions & 13 deletions core/src/main/resources/org/apache/spark/ui/static/historypage.js
Expand Up @@ -140,9 +140,13 @@ $(document).ready(function() {
(attempt.hasOwnProperty("attemptId") ? attempt["attemptId"] + "/" : "") + "logs";
attempt["durationMillisec"] = attempt["duration"];
attempt["duration"] = formatDuration(attempt["duration"]);
var hasAttemptId = attempt.hasOwnProperty("attemptId");
var app_clone = {"id" : id, "name" : name, "version": version, "hasAttemptId" : hasAttemptId, "attempts" : [attempt]};
array.push(app_clone);
attempt["id"] = id;
attempt["name"] = name;
attempt["version"] = version;
attempt["attemptUrl"] = uiRoot + "/history/" + id + "/" +
(attempt.hasOwnProperty("attemptId") ? attempt["attemptId"] + "/" : "") + "jobs/";

array.push(attempt);
}
}
if(array.length < 20) {
Expand All @@ -165,17 +169,41 @@ $(document).ready(function() {
var completedColumnName = 'completed';
var durationColumnName = 'duration';
var conf = {
"data": array,
"columns": [
{name: 'version'},
{name: 'appId', type: "appid-numeric"},
{name: 'appName'},
{name: attemptIdColumnName},
{name: startedColumnName},
{name: completedColumnName},
{name: durationColumnName, type: "title-numeric"},
{name: 'user'},
{name: 'lastUpdated'},
{name: 'eventLog'},
{name: 'version', data: 'version' },
{
name: 'appId',
type: "appid-numeric",
data: 'id',
render: (id, type, row) => `<span title="${id}"><a href="${row.attemptUrl}">${id}</a></span>`
},
{name: 'appName', data: 'name' },
{
name: attemptIdColumnName,
data: 'attemptId',
render: (attemptId, type, row) => (attemptId ? `<a href="${row.attemptUrl}">${attemptId}</a>` : '')
},
{name: startedColumnName, data: 'startTime' },
{name: completedColumnName, data: 'endTime' },
{name: durationColumnName, type: "title-numeric", data: 'duration' },
{name: 'user', data: 'sparkUser' },
{name: 'lastUpdated', data: 'lastUpdated' },
{
name: 'eventLog',
data: 'log',
render: (log, type, row) => `<a href="${log}" class="btn btn-info btn-mini">Download</a>`
},
],
"aoColumnDefs": [
{
aTargets: [0, 1, 2],
fnCreatedCell: (nTd, sData, oData, iRow, iCol) => {
if (hasMultipleAttempts) {
$(nTd).css('background-color', '#fff');
}
}
},
],
"autoWidth": false,
"deferRender": true
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -2096,6 +2096,7 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_plugins.foreach(_.shutdown())
}
FallbackStorage.cleanUp(_conf, _hadoopConfiguration)
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
Expand Down
Expand Up @@ -240,7 +240,7 @@ private[spark] class SparkSubmit extends Logging {
}

// Set the deploy mode; default is client mode
var deployMode: Int = args.deployMode match {
val deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ =>
Expand Down
Expand Up @@ -33,9 +33,11 @@ import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DRIVER_RESOURCES_FILE, SPARK_DRIVER_PREFIX}
import org.apache.spark.internal.config.UI.UI_REVERSE_PROXY
import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils}

/**
Expand All @@ -50,6 +52,7 @@ private[deploy] class DriverRunner(
val driverDesc: DriverDescription,
val worker: RpcEndpointRef,
val workerUrl: String,
val workerWebUiUrl: String,
val securityManager: SecurityManager,
val resources: Map[String, ResourceInformation] = Map.empty)
extends Logging {
Expand Down Expand Up @@ -189,6 +192,14 @@ private[deploy] class DriverRunner(
val builder = CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts),
securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

// add WebUI driver log url to environment
val reverseProxy = conf.get(UI_REVERSE_PROXY)
val workerUrlRef = UIUtils.makeHref(reverseProxy, driverId, workerWebUiUrl)
builder.environment.put("SPARK_DRIVER_LOG_URL_STDOUT",
s"$workerUrlRef/logPage?driverId=$driverId&logType=stdout")
builder.environment.put("SPARK_DRIVER_LOG_URL_STDERR",
s"$workerUrlRef/logPage?driverId=$driverId&logType=stderr")

runDriver(builder, driverDir, driverDesc.supervise)
}

Expand Down
Expand Up @@ -652,6 +652,7 @@ private[deploy] class Worker(
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
workerWebUiUrl,
securityMgr,
resources_)
drivers(driverId) = driver
Expand Down
Expand Up @@ -481,6 +481,13 @@ package object config {
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
.doc("If true, Spark cleans up its fallback storage data during shutting down.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
ConfigBuilder("spark.storage.replication.topologyFile")
.version("2.1.0")
Expand Down
Expand Up @@ -125,7 +125,7 @@ private[spark] class TaskSetManager(
val weight = 1
val minShare = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
val stageId = taskSet.stageId
val name = "TaskSet_" + taskSet.id
var parent: Pool = null
private var totalResultSize = 0L
Expand Down

0 comments on commit 8716a16

Please sign in to comment.